如果你是嵌入式Linux開發者,你可能會遇到這樣的問題:如何在多個任務或執行緒之間安全地共享裝置資源?如何避免數據的競爭與不一致?如何提高系統的效能和可靠性?這些問題都涉及到並發控制技術,也就是如何協調多個執行實體對共享資源的存取。在本文中,我們將介紹Linux驅動中常用的並發控制技術,包括原子操作,自旋鎖,信號量,互斥鎖,讀寫鎖,順序鎖和RCU等,並舉例說明它們的使用方法和注意事項。
#為了實現對臨界資源的有效管理,應用層的程式有原子變量,條件變量,信號量來控制並發,同樣的問題也存在與驅動開發中,例如一個驅動同時被多個應用層程式調用,此時驅動程式中的全域變數會同時屬於多個應用層進程的進程空間,這種情況下也要使用一些技術來實現對並發的控制。本文將討論內核中下述並發控制技術的技術特性與應用場景。
#顧名思義,就是屏蔽所有的中斷。在嵌入式系統,中斷屏蔽可以有三級,1. 硬體介面的屏蔽,2. 硬體GIC的屏蔽,3. CPU(核心)的屏蔽。 如果在介面處屏蔽了,那麼中斷來了就丟了,根本找不到。如果在GIC處屏蔽了,那麼在屏蔽期間如果來了irq_1,irq_2,irq_3個中斷,因為只有一個pending標誌位,所以最後irq_3來的時候會將pending置位,之後解除屏蔽了,CPU發現pending有置位,還是會處理,但1,2就肯定丟了。在ARM處的屏蔽,即核心中的屏蔽,看怎麼設定了,如果就是local_irq_disable,那麼丟了就是丟了,和在介面處屏蔽一樣,如果是local_irq_save就和第二種一樣,追到最後一個中斷,核心也有相應的機制進行中斷計數,知道這段期間來了多少個中斷,但是實際操作中,大部分情況我們都不會追著執行錯過的中斷,除非這個中斷非常重要。
我們這裡討論的,就是在核心中進行中斷屏蔽。由於核心中許多重要的操作都要依賴中斷,所以屏蔽所有的中斷是十分危險的,裡面執行的程式碼要盡可能的快,而且,由於核心的進程調度也是由中斷驅動的,所以中斷屏蔽中不能有可能引發休眠的程式碼,否則無法被喚醒。請注意,中斷屏蔽只是屏蔽了本CPU的中斷,所以並不能解決SMP引發的競泰問題,通常,中斷屏蔽要和自旋鎖聯合使用,用於防止訪問自旋鎖定保護的臨界區時被中斷打斷
#local_irq_disable(); //屏蔽中断 //或 local_irq_save(flags); //屏蔽中断并保存目前CPU中的中断位信息 /* 临界区 */ local_irq_enable(); //解除屏蔽 //或 local_irq_restore(flags); //解除屏蔽并恢复中断位信息
local_bh_disable(); //屏蔽中断 /* 临界区 */ local_bh_enable();
原子操作即不能被打斷的操作,和應用層的概念一樣,內核中的原子操作模板如下:
//asm/atomic.h //创建并初始化原子变量 atomic_t tv = ATOMIC_INIT(初值); //读原子变量 int atomic_read(atomic_t *v); //写原子变量 void atomic_set(atomic_t *v, int i); /** *atomic_dec_and_test - 尝试将原子变量-1 *v:如果-1之后原子变量变为0,返回非0, 否则返回0 */ int atomic_dec_and_test(volatile atomic_t *v); int atomic_inc_and_test(volatile atomic_t *v); int atomic_sub_and_test(int i, volatile atomic_t *v); //操作并返回 int atomic_add_return(int i, atomic *v); int atomic_sub_return(int i, atomic *v); int atomic_inc_return(atomic *v); int atomic_dev_return(atomic *v);
static atomic_t tv; static int demo_open(struct inode *inode, struct file *filp) { if(!atomic_dec_and_test(&tv)){ atomic_inc(&tv); return -EBUSY; } /* 操作代码 */ return 0; } static int demo_release(struct inode *inode, struct file *filp) { atomic_inc(&tv); return 0; } static int __init demo_init(void) { // init atomic_t atomic_set(&tv, 1); }
#位元原子操作即原子的位元操作,內核中大量使用」位元」來記錄訊息,例如位圖,這些操作都必須是原子性的,內核API如下:
//设置位 void set_bit(nr,void *addr); //清除位 void clear_bit(nr,void *addr); //改变位 void change_bit(nr,void *addr); //测试位 test_bit(nr, void *addr); //测试并操作位 int test_and_set_bit(nr, void *addr); int test_and_clear_bit(nr,void *addr); int test_and_change_bit(nr,void *addr);
意即”在原地打轉”,當加鎖不成功時,自旋,自旋鎖會不斷的佔用CPU進行變量的測試,由於屬於原子操作,所以該CPU的佔用會升為100%,所以,使用自旋鎖時,臨界區的程式碼需要很短,否則會影響系統性能,此外,作為鎖機制的一種,使用自旋鎖同樣需要注意死鎖的出現,自旋鎖鎖定期間不能調用可能引起進程調度的函數,如果進程獲得自旋鎖之後再阻塞,eg,copy_from_user(),copy_to_user(),kmalloc(),msleep()等,一旦阻塞發生就可能導致內核崩潰。
自旋鎖可以用來解決SMP競態問題。不同類型的自旋鎖有自己的處理機制,適用於不同的情況。包括傳統自旋鎖,讀寫自旋鎖,RCU機制,順序鎖等,自旋鎖是訊號量,互斥體的底層實作工具。
比較\類型 | 傳統自旋鎖定 | 讀寫自旋鎖定 | 順序鎖定 | RCU機制 |
---|---|---|---|---|
應用場合 | 需要上鎖者獨佔的資源 | 需要寫者獨佔的資源 | 很少同時讀寫的資源 | 讀多寫少的資源 |
讀 讀 並發 | × | √ | √ | √ |
讀 寫 並發 | × | × | √ | √ |
寫 寫 並發 | × | × | × | √ |
和其他锁机制的一样,使用自旋锁保护数据分为抢锁-操作-解锁,下面就是一个典型的使用锁的流程,通过自旋锁实现一个文件只被一个进程打开。
int cnt=0; lock_t lock; static int my_open() { lock(&lock); if(cnt){ unlock(&lock) } cnt++; unlock(&lock); } static int release() { lock(&lock); cnt--; unlock(&lock); }
是一种比较粗暴的自旋锁,使用这种锁的时候,被锁定的临界区域不允许其他CPU访问,需要注意的是,尽管获得锁之后执行的临界区操作不会被其他CPU和本CPU内其他抢占进程的打扰,但是仍然会被中断和底半部的影响,所以通常我们会使用下述API中的衍生版本,比如上文中提到的将自旋锁+中断屏蔽来防止使用自旋锁访问临界资源的时候被中断打断,对应的宏函数就是spin_lock_irq和spin_lock_irqsave。
//定义并初始化自旋锁 spinlock_t spinlock void spin_lock_init(spinlock_t *); //加锁 //spin_lock - 加锁函数(忙等) void spin_lock(spinlock_t *lock); int spin_trylock(spinlock_t *lock); spin_lock_irq(); //=spin_lock() + local_irq_disable() spin_lock_irqsave(); //= spin_lock() + lock_irq_save(); spin_lock_bh(); //=spin_lock() + local_bh_disable(); //解锁 void spin_unlock(spinlock_t *lock); spin_unlock_irq(); //=spin_unlock() + local_irq_enable() spin_unlock_irqrestore(); //= spin_unlock() + lock_irq_restore(); spin_unlock_bh(); //=spin_unlock() + local_bh_enable();
传统的自旋锁粗暴的将临界资源划归给一个CPU,但是很多资源都不会因为读而被破坏,所以我们可以允许多个CPU同时读临界资源,但不允许同时写资源,类似于应用层的文件锁,内核的读写锁机制同样有下述互斥原则:
//include/linux/rwlock.h //定义并初始化自旋锁 rwlock_t rwlock; void rwlock_init(rwlock_t *lock); //加读锁 void read_lock(rwlock_t *lock); int read_trylock(rwlock_t *lock); void read_lock_irqsave(rwlock_t *lock,unsigned long flags); void read_lock_irq(rwlock_t *lock, unsigned long flags); void read_lock_bh(rwlock_t *lock); //解读锁 void read_unlock(rwlock_t *lock) void read_unlock_irqrestrore(rwlock_t *lock,unsigned long flags); void read_unlock_irq(rwlock_t *lock, unsigned long flags); void read_unlock_bh(rwlock_t *lock); //加写锁 void write_lock(rwlock_t *lock) int write_trylock(rwlock_t *lock) void write_lock_irqsave(rwlock_t *lock,unsigned long flags); void write_lock_irq(rwlock_t *lock, unsigned long flags); void write_lock_bh(rwlock_t *lock); //解写锁 void write_unlock(rwlock_t *lock) void write_unlock_irqrestrore(rwlock_t *lock,unsigned long flags); void write_unlock_irq(rwlock_t *lock, unsigned long flags); void write_unlock_bh(rwlock_t *lock);
顺序锁可以看作读写锁的升级版,读写锁不允许同时存在读者和写者,顺序锁对这一情况进行了改良,它允许写者和读者同时访问临界区,不必再向读写锁那样读者要读必须等待写者写完,写者要写必须等待读者读完。不过,使用顺序锁的时候,临界区不能有指针,因为写者可能会修改指针的指向,如果读者去读,就会Oops,此外,如果读者读过的数据被写者改变了,读者需要重新读,以维持数据是最新的,虽然有这两个约束,但是顺序锁提供了比读写锁更大的灵活性。对于写者+写者的情况,顺序锁的机制和读写锁一样,必须等!
//include/linux/seqlock.h //定义顺序锁 struct seqlock_t sl; //获取顺序锁 void write_seqlock(seqlock_t *sl); void write_tryseqlock(seqlock_t *sl); void write_seqlock_irqsave(lock,flags); //=local_irq_save() + write_seqlock() void write_seqlock_irq(seqlock_t *sl); //=local_irq_disable() + write_seqlock() void write_seqlock_bh(seqlock_t *sl); //local_bh_disable() + write_seqlock() //释放顺序锁 void write_sequnlock(seqlock_t *sl); void write_sequnlock_irqsave(lock,flags); //=local_irq_restore() + write_sequnlock() void write_sequnlock_irq(seqlock_t *sl); //=local_irq_enable() + write_sequnlock() void write_sequnlock_bh(seqlock_t *sl); //local_bh_enable() + write_sequnlock() //读开始 unsigned read_seqbegin(const seqlock_t *sl); read_seqbegin_irqsave(lock,flags); //=local_irq_save() + read_seqbegin(); //重读 int read_seqretry(const seqlock_t *sl,unsigned iv); read_seqretry_irqrestore(lock,iv,flags); //=local_irq_restore() + read_seqretry();
RCU即Read-Copy Update,即读者直接读,写者先拷贝再择时更新,是另外一种读写锁的升级版,这种机制在VFS层被大量使用。正如其名,读者访问临界资源不需要锁,从下面的rcu_read_lock的定义即可看出,写者在写之前先将临界资源进行备份,去修改这个副本,等所有的CPU都退出对这块临界区的引用后,再通过回调机制,将引用这块资源的原指针指向已经修改的备份。从中可以看出,在RCU机制下,读者的开销大大降低,也没有顺序锁的指针问题,但是写者的开销很大,所以RCU适用于读多写少的临界资源。如果写操作很多,就有可能将读操作节约的性能抵消掉,得不偿失。
内核会为每一个CPU维护两个数据结构-rcu_data和rcu_bh_data,他们用于保存回调函数,函数call_rcu()把回调函数注册到rcu_data,而call_rcu_bh()则把回调函数注册到rcu_bh_data,在每一个数据结构上,回调函数们会组成一个队列。
使用RCU时,读执行单元必须提供一个信号给写执行单元以便写执行单元能够确定数据可以被安全地释放或修改的时机。内核中有一个专门的垃圾收集器来探测读执行单元的信号,一旦所有的读执行单元都已经发送信号告诉收集器自己都没有使用RCU的数据结构,收集器就调用回调函数完成最后的数据释放或修改操作。
读即RCU中的R,从下面的宏定义可以看出,读操作其实就是禁止内核的抢占调度,并没有使用一个锁对象。
//读锁定 //include/linux/rcupdate.h rcu_read_lock(); //preempt_disbale() rcu_read_lock_bh(); //local_bh_disable() //读解锁 rcu_read_unlock() //preempt_enable() rcu_read_unlock_bh(); //local_bh_enable()
同步即是RCU写操作的最后一个步骤-Update,下面这个接口会则色写执行单元,直到所有的读执行单元已经完成读执行单元临界区,写执行单元才可以继续下一步操作。如果有多个RCU写执行单元调用该函数,他们将在一个grace period(即所有的读执行单元已经完成对临界区的访问)之后全部被唤醒。
synchrosize_rcu()
下面这个接口也由RCU写执行单元调用,它不会使写执行单元阻塞,因而可以在中断上下文或软中断中使用,该函数把func挂接到RCU回调函数链上,然后立即返回。函数sychrosize_rcu()其实也会调用call_rcu()。
void call_rcu(struct rcu_head *head,void (*func)(struct rcu_head *rcu));
下面这个接口会将软中断的完成也当作经历一个quiecent state(静默状态),因此如果写执行单元调用了该函数,在进程上下文的读执行单元必须使用rcu_read_lock_bh();
void call_rcu_bh(struct rcu_head *head,void (*func)(struct rcu_head *rcu));
RCU机制被大量的运用在内核链表的读写中,下面这些就是内核中使用RCU机制保护的数据结构,函数的功能和非RCU版本一样,具体可以参考内核文件**”include/linux/list.h”**,只不过这里的操作都会使用RCU保护起来。
void list_add_rcu(struct list_head *new, struct list_head *head); void list_add_tail_rcu(struct list_head *new,struct list_head *head); void list_del_rcu(struct list_head *entry); void list_replace_rcu(struct list_head *old,struct list_head *new); list_for_each_rcu(pos,head); list_for_each_safe_rcu(pos,n,head); list_for_each_entry_rcu(pos,head,member); void hlist_del_rcu(struct hlist_node *n); void hlist_add_head_rcu(struct hlist_node *n, struct hlist_head *h); list_for_each_rcu(pos,head); hlist_for_each_entry_rcu(tpos,pos,head,member);
自旋锁一节提过,如果一个CPU不能获取临界资源,就会造成”原地自旋”,所以自旋锁保护的临界区的执行时间不能太长,但如果我们的确需要保护一段执行时间比较长的临界区呢?答案就是信号量
,信号量的底层依托于自旋锁来实现其原子性,并进一步将其提高到”进程”的维度,称为一种可以运行在进程上下文的”锁”,正是这种能运行在进程上下文的能力赋予了信号量和自旋锁的诸多不同。
使用信号量,如果试图获取信号量的进程获取失败,内核就会将其调度为睡眠状态,执行其他进程,避免了CPU的忙等。不过,进程上下文的切换也是有成本的,所以通常,信号量在内核中都是只用于保护大块临界区。
此外,一个进程一旦无法获取期待的信号量就会进入睡眠,所以信号量保护的临界区可以有睡眠的代码。在这方面,自旋锁保护的区域是不能睡眠也不能执行schedule()的,因为一旦一个抢到了锁的CPU进行了进程上下文的切换或睡眠,那么其他等待这个自旋锁的CPU就会一直在那忙等,就永远无法等到这个锁,,形成死锁,除非有其他进程将其唤醒(通常都不会有)。
也正是由于信号量操作可能引起阻塞,所以信号量不能用于中断上下文。总结一下刚才罗嗦这一段:
專案 | 信號量 | 自旋鎖定 |
---|---|---|
臨界區時間 | 進程切換時間更短 | 臨界區執行時間更短 |
進程上下文 | 臨界區可以睡眠或調度 | 臨界區不可以睡眠或排程 |
中斷上下文 | 只有down_trylock()可以 | 可以 |
内核的信号量和应用层的信号量的使用方式类似,但没有获取信号量这一步骤,因为内核中中的信号量可以映射到所有调用这个模块的用户进程的内核空间。这些用户进程也就直接共享了一个信号量,所以也就没有获取信号量一说,相关的内容我在”Linux IPC System V 信号量”一文中有所讨论。
和应用层的信号量一样,内核信号量也是用于对临界资源的互斥/顺序访问,同样,虽然在使用信号量的时候我们可以初始化为任意值,但实际操作上我们通常只初始化为1或0,下述是Linux内核提供的信号量API。
//include/linux/semaphore.h //定义并初始化semaphore对象 struct semphore sem; //初始化信号量 void sem_init(struct semaphore * sem,int val); init_MUTEX(sem); init_MUTEX_LOCKED(sem); DECLARE_MUTEX(sem); DECLARE_MUTEX_LOCKED(sem); //P操作 //down()会导致睡眠,不能用于中断上下文 void down(struct semaphore *sem); //down_interruptible同样会进入休眠,但能被打断 int down_interruptible(struct semaphore *sem); //down_trylock不能获得锁时会立即返回,不会睡眠,可以用在中断上下文 int down_trylock(struct semaphore *sem); //V操作 void up(struct semaphore *sem);
读写信号量与信号量的关系 和 读写自旋锁与自旋锁的关系类似,他们的互斥逻辑都是一样的,这里不再赘述
//定义并初始化读写信号量 struct rw_semaphore my_rwsem; void init_rwsem(struct rw_semaphore *sem); //P读信号量 void down_read(struct rw_semaphore *sem); int down_read_trylock(struct rw_semaphore *sem); //V读信号量 void up_read(struct rw_semaphore *sem); //P写信号量 void down_write(struct rw_semaphore *sem); int down_write_trylock(struct rw_semaphore *sem); //V写信号量 void up_write(struct rw_semaphore *sem);
struct rw_semaphore my_rwsem; void init_rwsem(&my_rwsem); //读前获取读信号量 down_read(&my_rwsem); //若要非阻塞:down_read_trylock(&my_rwsem); /* 读临界区 */ //读完释放读信号量 up_read(&my_rwsem); //写前获取写信号量 down_write(&my_rwsem); //若要非阻塞:down_write_trylock(&my_rwsem); /* 写临界区 */ //写完释放写信号量 up_write(&my_rwsem);
完成量用于一个执行单元等待另一个执行单元执行完某事,和传统信号量一样,主要是用来实现队临界区的顺序/互斥访问。但是完成量还提供一种唤醒一个或唤醒所有等待进程的接口,有点类似与应用层的条件变量。
//定义并初始化完成量 struct completion my_completion; init_completion(&my_completion); //或 DECLARE_COMPLETION(my_completion) //等待completion void wait_for_completion(struct completion *c); //唤醒completion void complete(struct completion *c); //只唤醒一个等待的执行单元 void complete_all(struct completion *c); //释放所有等待该完成量的执行单元
除了信号量,Linux内核还提供了一种专门用于实现互斥的机制-互斥体,相关的内核API如下:
//include/linux/mutex.h //定义并初始化mutex对象 struct mutex my_mutex; mutex_init(&my_mutex); //获取mutex void mutex_lock(struct mutex *lock); int mutex_trylock(struct mutex *lock); int mutex_lock_interruptible(struct mutex *lock); //释放mutex void mutex_unlock(struct mutex *lock);
通过本文,我们了解了Linux驱动中的并发控制技术,它们各有优缺点和适用场景。我们应该根据实际需求选择合适的技术,并遵循一些基本原则,如最小化临界区,避免死锁,减少上下文切换等。并发控制技术是Linux驱动开发中不可或缺的一部分,它可以保证设备资源的正确性和高效性,也可以提升系统的稳定性和可扩展性。希望本文能够对你有所帮助和启发。
以上是Linux驅動中的同時控制技術:原則與實踐的詳細內容。更多資訊請關注PHP中文網其他相關文章!