Linux并发与竞争

1. 并发与竞争概述

1.1 基本概念

  • 并发:多个执行单元同时或并行执行
  • 竞争:多个执行单元对共享资源的同时访问导致的不确定性
  • 临界区:访问共享资源的代码段
  • 竞态条件:执行结果依赖于并发执行顺序的情况

1.2 并发来源

/* 并发的主要来源 */
1. 多核CPU真正并行执行
2. 单核CPU上的任务抢占
3. 中断处理程序
4. 底半部机制(软中断、tasklet、工作队列)
5. 内核抢占
6. 睡眠与唤醒

2. 内核同步机制

2.1 原子操作

2.1.1 整型原子操作

#include <linux/atomic.h>

atomic_t counter = ATOMIC_INIT(0);

// 基本操作
void atomic_set(atomic_t *v, int i);
int atomic_read(atomic_t *v);

// 算术操作
void atomic_add(int i, atomic_t *v);
void atomic_sub(int i, atomic_t *v);
void atomic_inc(atomic_t *v);
void atomic_dec(atomic_t *v);

// 返回操作结果
int atomic_inc_return(atomic_t *v);
int atomic_dec_return(atomic_t *v);

// 测试操作
int atomic_inc_and_test(atomic_t *v);  // 结果为0返回1
int atomic_dec_and_test(atomic_t *v);  // 结果为0返回1

2.1.2 位原子操作

#include <linux/bitops.h>

unsigned long word = 0;

void set_bit(int nr, volatile unsigned long *addr);
void clear_bit(int nr, volatile unsigned long *addr);
void change_bit(int nr, volatile unsigned long *addr);

int test_bit(int nr, volatile unsigned long *addr);
int test_and_set_bit(int nr, volatile unsigned long *addr);
int test_and_clear_bit(int nr, volatile unsigned long *addr);

2.2 自旋锁

2.2.1 基本自旋锁

#include <linux/spinlock.h>

DEFINE_SPINLOCK(my_lock);
unsigned long flags;

// 获取锁
spin_lock(&my_lock);
// 临界区
spin_unlock(&my_lock);

// 中断上下文安全版本
spin_lock_irqsave(&my_lock, flags);    // 保存中断状态并禁用中断
// 临界区
spin_unlock_irqrestore(&my_lock, flags); // 恢复中断状态

// 简单版本(已知中断未使能)
spin_lock_irq(&my_lock);
spin_unlock_irq(&my_lock);

2.2.2 读写自旋锁

#include <linux/rwlock.h>

DEFINE_RWLOCK(my_rwlock);

// 读者
read_lock(&my_rwlock);
// 读取临界区
read_unlock(&my_rwlock);

// 写者
write_lock(&my_rwlock);
// 写入临界区
write_unlock(&my_rwlock);

2.3 信号量

2.3.1 计数信号量

#include <linux/semaphore.h>

struct semaphore my_sem;

// 初始化
sema_init(&my_sem, 1);  // 二进制信号量
sema_init(&my_sem, 5);  // 计数信号量

// 获取信号量
void down(struct semaphore *sem);           // 不可中断
int down_interruptible(struct semaphore *sem); // 可被信号中断
int down_trylock(struct semaphore *sem);    // 非阻塞

// 释放信号量
void up(struct semaphore *sem);

2.3.2 互斥信号量

#include <linux/mutex.h>

struct mutex my_mutex;

// 初始化
mutex_init(&my_mutex);

// 使用
mutex_lock(&my_mutex);
// 临界区
mutex_unlock(&my_mutex);

// 非阻塞版本
if (mutex_trylock(&my_mutex)) {
    // 临界区
    mutex_unlock(&my_mutex);
} else {
    // 处理无法获取锁的情况
}

2.4 完成量

#include <linux/completion.h>

struct completion my_completion;

// 初始化
init_completion(&my_completion);

// 等待完成
wait_for_completion(&my_completion);

// 通知完成
complete(&my_completion);      // 唤醒一个等待者
complete_all(&my_completion);  // 唤醒所有等待者

2.5 读写信号量

#include <linux/rwsem.h>

struct rw_semaphore my_rwsem;

// 初始化
init_rwsem(&my_rwsem);

// 读者
down_read(&my_rwsem);
// 读取临界区
up_read(&my_rwsem);

// 写者
down_write(&my_rwsem);
// 写入临界区
up_write(&my_rwsem);

3. 实际应用示例

3.1 设备驱动中的并发控制

#include <linux/module.h>
#include <linux/fs.h>
#include <linux/cdev.h>
#include <linux/spinlock.h>
#include <linux/semaphore.h>

struct my_device {
    struct cdev cdev;
    atomic_t open_count;
    spinlock_t data_lock;
    struct mutex io_mutex;
    struct semaphore sem;
    char *buffer;
    size_t buffer_size;
};

static struct my_device my_dev;

static int my_open(struct inode *inode, struct file *filp)
{
    // 使用原子操作计数
    if (atomic_inc_return(&my_dev.open_count) > 1) {
        atomic_dec(&my_dev.open_count);
        return -EBUSY;
    }
    
    filp->private_data = &my_dev;
    return 0;
}

static int my_release(struct inode *inode, struct file *filp)
{
    atomic_dec(&my_dev.open_count);
    return 0;
}

static ssize_t my_read(struct file *filp, char __user *buf, 
                      size_t count, loff_t *f_pos)
{
    struct my_device *dev = filp->private_data;
    unsigned long flags;
    ssize_t ret;
    
    // 使用互斥锁保护IO操作
    if (mutex_lock_interruptible(&dev->io_mutex))
        return -ERESTARTSYS;
    
    // 使用自旋锁保护共享数据
    spin_lock_irqsave(&dev->data_lock, flags);
    
    // 读取数据到用户空间
    if (count > dev->buffer_size - *f_pos)
        count = dev->buffer_size - *f_pos;
    
    if (copy_to_user(buf, dev->buffer + *f_pos, count)) {
        ret = -EFAULT;
    } else {
        *f_pos += count;
        ret = count;
    }
    
    spin_unlock_irqrestore(&dev->data_lock, flags);
    mutex_unlock(&dev->io_mutex);
    
    return ret;
}

static ssize_t my_write(struct file *filp, const char __user *buf,
                       size_t count, loff_t *f_pos)
{
    struct my_device *dev = filp->private_data;
    unsigned long flags;
    ssize_t ret;
    
    // 使用信号量限制并发写入者数量
    if (down_interruptible(&dev->sem))
        return -ERESTARTSYS;
    
    // 使用互斥锁和自旋锁
    mutex_lock(&dev->io_mutex);
    spin_lock_irqsave(&dev->data_lock, flags);
    
    if (copy_from_user(dev->buffer + *f_pos, buf, count)) {
        ret = -EFAULT;
    } else {
        *f_pos += count;
        ret = count;
    }
    
    spin_unlock_irqrestore(&dev->data_lock, flags);
    mutex_unlock(&dev->io_mutex);
    up(&dev->sem);
    
    return ret;
}

3.2 中断处理中的并发

#include <linux/interrupt.h>

struct my_device_data {
    spinlock_t lock;
    struct work_struct work;
    struct workqueue_struct *wq;
    void __iomem *reg_base;
};

static irqreturn_t my_interrupt_handler(int irq, void *dev_id)
{
    struct my_device_data *dev = dev_id;
    unsigned long flags;
    
    // 在中断处理中使用自旋锁
    spin_lock_irqsave(&dev->lock, flags);
    
    // 读取中断状态
    // 清除中断标志
    // 处理紧急事务
    
    spin_unlock_irqrestore(&dev->lock, flags);
    
    // 调度底半部处理
    queue_work(dev->wq, &dev->work);
    
    return IRQ_HANDLED;
}

// 底半部工作函数
static void my_work_handler(struct work_struct *work)
{
    struct my_device_data *dev = container_of(work, 
                                struct my_device_data, work);
    
    // 这里可以使用睡眠操作
    // 处理耗时任务
}

4. 选择正确的锁机制

4.1 锁选择指南

场景 推荐机制 原因
短期保护,中断上下文 自旋锁 不会睡眠,响应快
长期保护,进程上下文 互斥锁 可睡眠,不浪费CPU
多读少写 读写锁/信号量 提高并发性
简单计数 原子操作 无锁,高效
任务同步 完成量 专门用于同步

4.2 最佳实践

// 1. 按固定顺序获取锁,避免死锁
void correct_order(void)
{
    mutex_lock(&lock_a);
    mutex_lock(&lock_b);
    // 临界区
    mutex_unlock(&lock_b);
    mutex_unlock(&lock_a);
}

// 2. 避免在持有锁时调用可能睡眠的函数
void bad_practice(struct mutex *lock)
{
    mutex_lock(lock);
    copy_to_user(...);  // 可能睡眠
    mutex_unlock(lock); // 危险!
}

// 3. 使用锁的适当变体
int good_practice(struct mutex *lock)
{
    if (mutex_lock_interruptible(lock)) // 可被信号中断
        return -ERESTARTSYS;
    
    // 临界区
    mutex_unlock(lock);
    return 0;
}

5. 调试和检测

5.1 锁调试功能

// 配置内核选项
CONFIG_DEBUG_SPINLOCK=y
CONFIG_DEBUG_MUTEXES=y
CONFIG_DEBUG_ATOMIC_SLEEP=y

// 锁统计信息
CONFIG_LOCK_STAT=y

5.2 死锁检测

// 使用lockdep内核功能
#include <linux/lockdep.h>

// 在初始化时注册锁
lockdep_set_class(&my_lock, &my_lock_key);

// 检查锁的使用情况
spin_lock_nested(&my_lock, SINGLE_DEPTH_NESTING);

正确理解和使用Linux内核的并发控制机制是编写稳定、高效驱动程序的关键。应根据具体场景选择合适的同步机制,并遵循最佳实践来避免竞态条件和死锁。