Linux等待队列

1. 等待队列概述

等待队列是Linux内核中用于实现进程睡眠和唤醒的机制,主要用于:

  • 进程在等待特定条件时进入睡眠状态
  • 当条件满足时唤醒等待的进程
  • 实现高效的同步和事件等待

2. 等待队列基本操作

2.1 定义和初始化

#include <linux/wait.h>
#include <linux/sched.h>

// 静态定义和初始化
DECLARE_WAIT_QUEUE_HEAD(my_wq);

// 动态初始化
wait_queue_head_t my_wq;
init_waitqueue_head(&my_wq);

// 定义等待队列条目
DECLARE_WAITQUEUE(wait, current);

2.2 基本睡眠函数

// 不可中断睡眠(通常不建议使用)
wait_event(wq, condition);
wait_event_timeout(wq, condition, timeout);

// 可中断睡眠(推荐使用)
wait_event_interruptible(wq, condition);
wait_event_interruptible_timeout(wq, condition, timeout);

// 可终止睡眠
wait_event_killable(wq, condition);
wait_event_killable_timeout(wq, condition, timeout);

2.3 唤醒函数

// 唤醒一个进程
wake_up(wq_head);
wake_up_interruptible(wq_head);

// 唤醒所有进程
wake_up_all(wq_head);
wake_up_interruptible_all(wq_head);

// 唤醒一个进程并同步
wake_up_sync(wq_head);
wake_up_interruptible_sync(wq_head);

3. 等待队列使用模式

3.1 基本使用示例

#include <linux/module.h>
#include <linux/fs.h>
#include <linux/wait.h>
#include <linux/sched.h>

static DECLARE_WAIT_QUEUE_HEAD(data_wq);
static int data_ready = 0;
static char shared_data[256];

// 生产者:产生数据后唤醒消费者
static void producer_function(const char *data)
{
    // 生产数据
    strncpy(shared_data, data, sizeof(shared_data) - 1);
    shared_data[sizeof(shared_data) - 1] = '\0';
    
    // 设置条件变量
    data_ready = 1;
    
    // 唤醒等待队列中的所有进程
    wake_up_interruptible(&data_wq);
    
    printk(KERN_INFO "Producer: data ready, waking up consumers\n");
}

// 消费者:等待数据可用
static ssize_t consumer_function(struct file *filp, char __user *buf, size_t count)
{
    int ret;
    
    // 等待数据就绪(可中断睡眠)
    ret = wait_event_interruptible(data_wq, data_ready != 0);
    if (ret) {
        // 被信号中断
        return -ERESTARTSYS;
    }
    
    // 数据已就绪,进行处理
    if (copy_to_user(buf, shared_data, min(count, sizeof(shared_data)))) {
        return -EFAULT;
    }
    
    // 重置条件
    data_ready = 0;
    
    return min(count, sizeof(shared_data));
}

3.2 手动等待队列操作

static ssize_t advanced_wait_example(struct file *filp, char __user *buf, size_t count)
{
    wait_queue_entry_t wait;
    int ret = 0;
    
    // 初始化等待队列条目
    init_waitqueue_entry(&wait, current);
    
    // 添加到等待队列
    add_wait_queue(&data_wq, &wait);
    
    // 设置进程状态
    set_current_state(TASK_INTERRUPTIBLE);
    
    while (!data_ready) {
        // 检查是否有信号 pending
        if (signal_pending(current)) {
            ret = -ERESTARTSYS;
            break;
        }
        
        // 调度其他进程运行
        schedule();
        set_current_state(TASK_INTERRUPTIBLE);
    }
    
    // 恢复运行状态
    set_current_state(TASK_RUNNING);
    
    // 从等待队列移除
    remove_wait_queue(&data_wq, &wait);
    
    if (!ret) {
        // 处理数据
        if (copy_to_user(buf, shared_data, min(count, sizeof(shared_data)))) {
            ret = -EFAULT;
        } else {
            ret = min(count, sizeof(shared_data));
            data_ready = 0;
        }
    }
    
    return ret;
}

4. 完整驱动示例

4.1 简单的字符设备驱动

#include <linux/module.h>
#include <linux/fs.h>
#include <linux/cdev.h>
#include <linux/wait.h>
#include <linux/sched.h>
#include <linux/uaccess.h>
#include <linux/slab.h>

#define DEVICE_NAME "waitq_example"
#define BUFFER_SIZE 1024

struct waitq_device {
    struct cdev cdev;
    dev_t devno;
    wait_queue_head_t read_wq;
    wait_queue_head_t write_wq;
    char *buffer;
    size_t data_len;
    int read_avail;
    int write_avail;
    struct mutex lock;
};

static struct waitq_device *waitq_dev;

static int waitq_open(struct inode *inode, struct file *filp)
{
    struct waitq_device *dev = container_of(inode->i_cdev, 
                                          struct waitq_device, cdev);
    filp->private_data = dev;
    return 0;
}

static int waitq_release(struct inode *inode, struct file *filp)
{
    return 0;
}

static ssize_t waitq_read(struct file *filp, char __user *buf, 
                         size_t count, loff_t *f_pos)
{
    struct waitq_device *dev = filp->private_data;
    ssize_t ret;
    
    // 等待数据可读
    if (wait_event_interruptible(dev->read_wq, dev->read_avail)) {
        return -ERESTARTSYS;
    }
    
    // 保护共享数据
    mutex_lock(&dev->lock);
    
    // 读取数据
    count = min(count, dev->data_len);
    if (copy_to_user(buf, dev->buffer, count)) {
        ret = -EFAULT;
    } else {
        ret = count;
        dev->data_len = 0;
        dev->read_avail = 0;
        dev->write_avail = 1;
        
        // 唤醒可能的写者
        wake_up_interruptible(&dev->write_wq);
    }
    
    mutex_unlock(&dev->lock);
    return ret;
}

static ssize_t waitq_write(struct file *filp, const char __user *buf,
                          size_t count, loff_t *f_pos)
{
    struct waitq_device *dev = filp->private_data;
    ssize_t ret;
    
    count = min(count, (size_t)BUFFER_SIZE);
    
    // 等待缓冲区可写
    if (wait_event_interruptible(dev->write_wq, dev->write_avail)) {
        return -ERESTARTSYS;
    }
    
    mutex_lock(&dev->lock);
    
    // 写入数据
    if (copy_from_user(dev->buffer, buf, count)) {
        ret = -EFAULT;
    } else {
        dev->data_len = count;
        dev->read_avail = 1;
        dev->write_avail = 0;
        ret = count;
        
        // 唤醒读者
        wake_up_interruptible(&dev->read_wq);
    }
    
    mutex_unlock(&dev->lock);
    return ret;
}

static const struct file_operations waitq_fops = {
    .owner = THIS_MODULE,
    .open = waitq_open,
    .release = waitq_release,
    .read = waitq_read,
    .write = waitq_write,
};

static int __init waitq_init(void)
{
    int ret;
    
    // 分配设备结构
    waitq_dev = kzalloc(sizeof(*waitq_dev), GFP_KERNEL);
    if (!waitq_dev)
        return -ENOMEM;
    
    // 分配缓冲区
    waitq_dev->buffer = kzalloc(BUFFER_SIZE, GFP_KERNEL);
    if (!waitq_dev->buffer) {
        ret = -ENOMEM;
        goto free_dev;
    }
    
    // 初始化等待队列
    init_waitqueue_head(&waitq_dev->read_wq);
    init_waitqueue_head(&waitq_dev->write_wq);
    
    // 初始化互斥锁
    mutex_init(&waitq_dev->lock);
    
    // 初始状态:可写,不可读
    waitq_dev->write_avail = 1;
    waitq_dev->read_avail = 0;
    waitq_dev->data_len = 0;
    
    // 分配设备号
    ret = alloc_chrdev_region(&waitq_dev->devno, 0, 1, DEVICE_NAME);
    if (ret < 0)
        goto free_buffer;
    
    // 初始化cdev
    cdev_init(&waitq_dev->cdev, &waitq_fops);
    waitq_dev->cdev.owner = THIS_MODULE;
    
    // 添加cdev到系统
    ret = cdev_add(&waitq_dev->cdev, waitq_dev->devno, 1);
    if (ret)
        goto unregister_dev;
    
    printk(KERN_INFO "Wait queue example driver loaded\n");
    return 0;

unregister_dev:
    unregister_chrdev_region(waitq_dev->devno, 1);
free_buffer:
    kfree(waitq_dev->buffer);
free_dev:
    kfree(waitq_dev);
    return ret;
}

static void __exit waitq_exit(void)
{
    cdev_del(&waitq_dev->cdev);
    unregister_chrdev_region(waitq_dev->devno, 1);
    kfree(waitq_dev->buffer);
    kfree(waitq_dev);
    printk(KERN_INFO "Wait queue example driver unloaded\n");
}

module_init(waitq_init);
module_exit(waitq_exit);

MODULE_LICENSE("GPL");
MODULE_AUTHOR("Your Name");
MODULE_DESCRIPTION("Wait queue example driver");

5. 高级用法和模式

5.1 带超时的等待

static ssize_t wait_with_timeout(struct file *filp, char __user *buf, 
                                size_t count, loff_t *f_pos)
{
    struct my_device *dev = filp->private_data;
    long timeout_ret;
    
    // 等待最多2秒钟
    timeout_ret = wait_event_interruptible_timeout(dev->wq, 
                    dev->data_ready, 
                    msecs_to_jiffies(2000));
    
    if (timeout_ret == 0) {
        // 超时
        return -ETIMEDOUT;
    } else if (timeout_ret < 0) {
        // 被信号中断
        return -ERESTARTSYS;
    }
    
    // 条件满足,处理数据
    return process_data(dev, buf, count);
}

5.2 独占等待

static ssize_t exclusive_wait(struct file *filp, char __user *buf,
                             size_t count, loff_t *f_pos)
{
    wait_queue_entry_t wait;
    int ret = 0;
    
    init_waitqueue_entry(&wait, current);
    wait.flags |= WQ_FLAG_EXCLUSIVE;  // 设置为独占等待
    
    add_wait_queue(&my_wq, &wait);
    set_current_state(TASK_INTERRUPTIBLE);
    
    while (!condition) {
        if (signal_pending(current)) {
            ret = -ERESTARTSYS;
            break;
        }
        schedule();
        set_current_state(TASK_INTERRUPTIBLE);
    }
    
    set_current_state(TASK_RUNNING);
    remove_wait_queue(&my_wq, &wait);
    
    return ret;
}

5.3 轮询结合等待队列

static unsigned int waitq_poll(struct file *filp, poll_table *wait)
{
    struct waitq_device *dev = filp->private_data;
    unsigned int mask = 0;
    
    poll_wait(filp, &dev->read_wq, wait);
    poll_wait(filp, &dev->write_wq, wait);
    
    mutex_lock(&dev->lock);
    
    if (dev->read_avail)
        mask |= POLLIN | POLLRDNORM;  // 可读
    
    if (dev->write_avail)
        mask |= POLLOUT | POLLWRNORM; // 可写
    
    mutex_unlock(&dev->lock);
    
    return mask;
}

6. 最佳实践和注意事项

6.1 使用准则

  1. 总是使用可中断版本:除非有特殊需求,否则使用_interruptible版本
  2. 保护共享数据:等待队列通常需要与锁配合使用
  3. 条件检查:在睡眠前和唤醒后都要检查条件
  4. 避免虚假唤醒:使用循环检查条件

6.2 常见错误

// 错误:没有在循环中检查条件
wait_event_interruptible(wq, condition);  // 可能虚假唤醒

// 正确:手动实现循环检查
while (!condition) {
    wait_event_interruptible(wq, condition);
}

// 错误:忘记恢复进程状态
set_current_state(TASK_INTERRUPTIBLE);
schedule();
// 忘记: set_current_state(TASK_RUNNING);

// 错误:在持有锁时睡眠
mutex_lock(&lock);
wait_event_interruptible(wq, condition);  // 可能死锁!
mutex_unlock(&lock);

6.3 性能考虑

  1. 选择合适的唤醒函数

    • wake_up() - 唤醒所有等待者
    • wake_up_interruptible() - 只唤醒可中断等待者
    • wake_up_sync() - 同步唤醒,不重新调度
  2. 避免惊群效应:使用独占等待标志WQ_FLAG_EXCLUSIVE

等待队列是Linux内核中实现进程同步的核心机制,正确使用可以创建高效、响应及时的驱动程序。