一、线程互斥
1、进程线程间的互斥相关背景概念
- 临界资源:多线程执行流共享的资源就叫做临界资源。
- 临界区 :每个线程内部访问临界资源的代码,就叫做临界区。
- 互斥 :任何时刻,互斥保证有且只有一个执行流进入临界区访问临界资源,通常对临界资源起保护作用。
- 原子性 :不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成。
我们现在利用线程模拟一下抢票逻辑
或者也可以说 if 判断条件为真后,代码可能并发切换到其它线程,因为 usleep 漫长的等待过程中可能会有多个线程进入该临界区,所以 tickets 就不是原子的。实际多线程在切换时极有可能出现因为数据交叉操作而导致数据不一致的问题,那么 OS 中可能从内核态返回用户态的时候就会进行线程间切换。
代码如下
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
int tickets = 300;
void* GetTickets(void* agrs)
{
while(true)
{
if(tickets > 0)
{
usleep(5000); //模拟抢票需要花的时间
cout << "thread get a ticket :" << tickets <<endl;
tickets--;
}
else break;
}
}
int main()
{
pthread_t t1,t2,t3;
pthread_create(&t1,nullptr,GetTickets,nullptr);
pthread_create(&t2,nullptr,GetTickets,nullptr);
pthread_create(&t3,nullptr,GetTickets,nullptr);\
pthread_join(t1,nullptr);
pthread_join(t2,nullptr);
pthread_join(t3,nullptr);
return 0;
}
运行结果如下:
运行结果导致tickets到0还会继续抢票,一个线程在处理数据的时候,另一个线程也对其进行处理,造成了线程互斥问题。
2、抢票逻辑造成线程互斥问题逻辑
在讲解逻辑之前,我们给大家先介绍一下CPU寄存器和线程上下文。
首先,寄存器不等于寄存器的内容,每一个线程要使用这一套寄存器的时候,当它切换的时候,在寄存器保存的数据要带走,这就是这个线程的上下文。
可以理解成每个线程对应一套寄存器
我们把tickets--自减操作细分为三步
- 先将tickets从内存读到CPU的寄存器上
- CPU内部对tickets进行--操作
- 将计算结果写回内存
现在我们分析多线程并发访问的过程
我们在上面说将tickets细分为3步骤,来看上面这张图, 我们假如线程1执行完第一步的时候(将tickets从内存写到CPU寄存器上)可能存在该线程的时间片结束了,线程被替换了,该线程带走了自己的上下文。
我们在理想状态下,线程2执行tickets步骤时,没有受任何影响一直执行,一直执行到tickets=10的时候,该线程时间片结束了。 当切换回线程1的时候,线程1第一先将自己的上下文恢复,之前保存的是1000,恢复到CPU上,CPU寄存器里的10改为1000,自减操作到999写回内存中。
这样把线程2做的事情给覆盖了。这就是线程互斥问题。
抢票逻辑:
判断tickets>0要不要放到CPU上,肯定是要的,因为这是逻辑运算。
当某个线程可能在执行tickets>0时,时间片结束了,线程切换了,假如某个线程把tickets减到0了,并写回内存了,这个时候上一个线程切换回来,因为这个线程已经执行完判断,继续往下执行,在执行cout打印tickets的时候,要从物理内存拿到tickets,此时是0,自减后变为-1了,然后下一个线程也是执行完判断切换回来继续打印(-1),又自减少。
这就是我们刚刚抢票为什么出现tickets为-1的情况。
我们把这个操作称作非原子的,什么是原子性呢?我们在讲信号量的时候介绍过,信号量的PV操作是原子的,后续我们给大家介绍环形队列信号量和生产消费模型这三个整合起来再给大家详解。
3、互斥量mutex
- 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。
- 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享完成线程之间的交互。
- 多个线程并发的操作共享变量,会带来一些问题。
而要做到这三点,就需要一把锁,Linux 上提供的这把锁叫做互斥量。
那我们该怎么解决上述问题呢?
我们对共享数据的任何访问,保证任何时候只有一个执行流访问——互斥。
这就是加锁。
4、互斥量的接口
- mutex:初始化或释放的锁。
- attr:锁的属性,设置nullptr,即不管。
(1)初始化互斥量
A. 静态分配(初始化不用 destroy)
B. 动态分配
(2)销毁互斥量
销毁互斥量需要注意:
- 使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁。
- 不要销毁一个已经加锁的互斥量。
- 已经销毁的互斥量,要确保后面不会有线程再尝试加锁。
(3)互斥量的加锁和解锁
A. 互斥量加锁
调用 pthread_lock 时,可能会遇到以下情况:
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功。
- 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么 pthread_ lock 调用会陷入阻塞(执行流被挂起),等待互斥量解锁。
B. 互斥量解锁
5、优化抢票逻辑
这里我们对抢票行为进行加锁
允许结果如下:
这里我们写了两个解锁方式,为什么我们不能这样写呢?
这里我们代码一步步执行,一个线程拿到了锁,假设此时tickets等于0,那直接break了,此时加锁却没有解锁,现在会一直处于阻塞状态,其它线程都在等待。所以,正确写法应该是上面运行的代码。
完整代码
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <vector>
#include <string>
#include <cstring>
using namespace std;
int tickets = 300;
class ThreadData
{
public:
ThreadData(const string name, pthread_mutex_t* mtx) :_name(name) ,_mtx(mtx)
{}
public:
string _name;
pthread_mutex_t* _mtx;
};
void* GetTickets(void* agrs)
{
ThreadData* td = (ThreadData*)agrs;
while(true)
{
pthread_mutex_lock(td->_mtx); //加锁
if(tickets > 0)
{
usleep(5000); //模拟抢票需要花的时间
cout << td->_name << ":" << tickets <<endl;
tickets--;
pthread_mutex_unlock(td->_mtx); //解锁
}
else
{
pthread_mutex_unlock(td->_mtx); //解锁
break;
}
usleep(13);
}
delete td;
return nullptr;
}
int main()
{
pthread_mutex_t mtx;
pthread_mutex_init(&mtx,nullptr);
vector<pthread_t> tids;
for(int i = 0; i <= 5; i++)
{
pthread_t tid;
string name = "thread-";
name += to_string(i+1);
ThreadData* td = new ThreadData(name,&mtx);
pthread_create(&tid,nullptr,GetTickets,(void*)td);
tids.push_back(tid);
}
for(auto& e : tids)
{
pthread_join(e,nullptr);
}
return 0;
}
运行结果如下:
假设我们没有usleep(13)
我们看到都是同一个线程在进行抢票,这在纯互斥的环境,如果锁分配不够合理,会导致其他线程饥饿问题。
为了解决这个问题,我们引入了同步的概念,这个概念我们稍后为大家讲解。
加锁的表现
线程对于临界区代码串行执行,原则上临界区的代码越少越好。
加锁的本质
本质使用时间来换安全。打开加解锁,运行速度明显变慢了,且因为互斥,没有出现原子性问题
6、互斥量的原理
经过上面的例子,我们已经能够意识到单纯的 i++ 或者 ++i 都不是原子的,因为这有可能会出现数据不一致问题。为了实现互斥锁操作,大多数体系结构都提供了 swap 或 exchange 指令,该指令的作用是把 CPU 寄存器区和内存单元的数据相交换,由于只有一条指令,就保证了原子性,即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时,另一个处理器的交换指令只能等待总线周期。也就是说锁的原子性实现是由寄存器级别的,如下是 lock unlock 的伪代码以及理解:
lock:
movb $0, %al
xchgb %al, mutex
if(a1寄存器的内容 > 0){
return 0;
}
else{
挂起等待;
}
goto lock;
unlock:
movb $1, mutex
唤醒等待mutex的线程;
return 0;
mutex 就是一个内存中的变量,和定义 int 没太大区别,假设 mutex 默认是 1。
线程 X 执行 movb $0,%al 将 %al 寄存器清零。在这个过程中线程 Y 当然也有可能执行这条语句,这没问题,因为线程在剥离时会作上下文数据的保存,线程在切换时就会把上下文数据保存于 TCB 中。
线程 X 执行 xchgb %al,mutex 将寄存器 %al 中的值 0 和内存的值 mutex:1 交换,这意味着 mutex 里面的值被交换到了对应线程中的上下文中,就相当于变成线程私有的。是的,在汇编代码上只要一条语句就完成交换了,因为它只是一条语句,所以交换过程一定是原子的。具体 xchgb 是怎么完成的,可以去了解了解它的汇编原理,简单提一下就是在体系结构上有一个时序的概念,在一个指定周期中,去访问总线时,汇编指令能被 CPU 接收到,是因为汇编指令会在特定的时间放在总线上的,而总线是可以被锁住的,这在硬件上实现比较简单,所以即使 xchgb 汇编翻译成二进制的时候,是对应多条语句的,但是它依旧不影响,因为硬件级别把总线锁住了,所以它虽然在汇编翻译成二进制时依旧是有多条语句的,但是因为总线被锁住了,所以不会出现原子性问题。
当线程 X 交换完后,线程 Y 突然切换进来了,在此之前就会把线程 X 的上下文数据保存于线程 X 的 TCB 中,然后把线程 Y 中上一次在 TCB 中保存的上下文数据恢复到 CPU(这里没有),然后线程 Y 执行 movb $0,%al 将 %al 寄存器清零,执行 xchgb %al,mutex 将寄存器 %al 中的值 0 和内存的值 mutex:0 交换。所以线程 Y 再往下走就会 else 挂起等待。
再线程切换到线程 X,把线程 Y 中的上下文数据保存于自己的 TCB,然后线程 X 把自己在 TCB 中保存的上下文数据数据恢复到 CPU,再执行 if,就可以访问临界区中的代码了,然后访问临界区成功,并返回。
然后线程 X 去执行 unlock 中的 movb $1,mutex 把内存中的 mutex 值又置为 1,然后唤醒等待 mutex 的线程 Y,成功返回,这样线程 X 就完成了解锁,unlock 也一定是原子的,因为能执行 unlock 的一定是曾经 lock 过的,所以 unlock 是不是原子性的已经不是重点了。最后切换到线程 Y,将上下文数据恢复,继续往下执行 goto lock,将寄存器 %al 清零, 然后将其与内存中的mutex 值交换,成功访问临界区资源返回,然后把 mutex 值置为 1,没有唤醒的 mutex 线程,然后成功解锁。注意这里以不用管寄存器中的 %al:1,因为下次在申请锁时会先把 %al 置 0。
- xchgb交换的本质:把内存上的数据交换到CPU寄存器中。
- 线程交换的本质:把数据交换到线程硬件的上下文,这是线程私有的。
- 把一个共享的锁,让一个线程以一条汇编语句的方式,交换到自己的上下文当中,也就是说当前进程持有锁。
7、线程安全&重入问题
重入:
同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,就称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则就是不可重入函数。
线程安全 :
多个线程并发同一段代码时不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下会出现该问题。
常见的线程不安全的情况:
- 不保护共享变量的函数。
- 函数状态随着被调用,状态发生变化的函数。
- 返回指向静态变量指针的函数。
- 调用线程不安全函数的函数
常见的线程安全的情况:
- 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的。
- 类或者接口对于线程来说都是原子操作。
- 多个线程之间的切换不会导致该接口的执行结果存在二义性。
常见不可重入的情况:
- 调用了 malloc/free 函数,因为 malloc 函数是用全局链表来管理堆的。
- 调用了标准 I/O 库函数,标准 I/O 库的很多实现都以不可重入的方式使用全局数据结构。
- 可重入函数体内使用了静态的数据结构。
常见可重入的情况:
- 不使用全局变量或静态变量。
- 不使用用 malloc 或者 new 开辟出的空间。
- 不调用不可重入函数。
- 不返回静态或全局数据,所有数据都有函数的调用者提供。
- 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据。
可重入与线程安全联系:
- 函数是可重入的,那就是线程安全的。
- 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题。
- 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。
可重入与线程安全区别:
- 可重入函数是线程安全函数的一种。
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。
二、死锁
1、概念
死锁是一种常见的锁,死锁是指在一组进程或线程中的各个进程或线程均占有不会释放的资源,它们申请锁而不释放锁,进而导致多个执行流互相等待的状态。
死锁通常发生在两个或多个进程或线程之间竞争资源的情况,当一个进程或线程持有一个资源并请求另一个进程或线程持有的资源时,如果另一个进程或线程也持有该进程所需的资源并请求第一个进程或线程的资源,就有可能发生死锁。
举个例子,小明和小红身上各有 5 毛钱,想买 1 块钱的辣条,小明对小红说把你的 5 毛钱给我,我来买,小红也对小明说把你的 5 毛钱给我,我来买,此时他们既想要申请对方的 5 毛钱,又不想妥协释放自己的 5 毛钱,这种状态就叫做死锁。
2、死锁四个必要条件
只要产生了死锁,一定有如下四个条件:
- 互斥条件:一个资源每次只能被一个执行流使用。(本质上引入互斥就是为了保护临界资源,由并行变成了串行,保证了原子性。但也带了新的问题:死锁。因为互斥只要存在就会产生申请资源阻塞的情况,就有可能出现死锁)
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放。(其实就是请求你的锁给我,我的锁不释放)
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺。
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系。(一般而言,你要我的资源,而我要别人的资源,是不会造成死锁的。而你要我的资源,我也要你的资源,就可能会造成死锁,因为我们是竞争关系)
3、避免死锁
- 破坏死锁的四个必要条件其中一个。(互斥条件很难被破坏,因为它要求一次只有一个进程或线程使用资源,一般也建议能不用锁就不用锁;请求与保持就是释放相对应的锁资源;不剥夺条件就是可以根据优先级设计一个算法,就算一个执行流未使用完资源,也可以被剥夺;循环等待条件就是若干执行流之间在申请和释放锁资源时不要形成回路)
- 加锁顺序一致。
- 避免锁未释放的场景。
- 资源一次性分配。
4、避免死锁算法
(1)死锁检测算法(了解)
死锁检测算法是一种利用进程状态的变化来识别死锁的方式,能够提高系统的并发性和效率。
这种算法主要有两种形式:
- 预防式,它在进程申请资源时就进行干预,以防止系统进入不安全状态;
- 事后处理,它允许系统进入不安全状态,然后通过某种算法检测并恢复。
具体来说,死锁检测需要用某种数据结构来保存资源的请求和分配信息,并提供一种算法,利用上述信息来检测系统是否已进入死锁状态。如果检测到系统已经发生死锁,可以采取相应的措施解除死锁,如资源剥夺法、撤销进程法或进程回退法等。值得注意的是,死锁检测通常在资源有向图(也称为资源分配图)上进行。
因此,理解和掌握如何有效地在资源有向图上进行死锁检测和解除,对于提高系统的性能和稳定性至关重要。
(2)银行家算法(了解)
银行家算法,是由艾兹格·迪杰斯特拉在 1965 年为 T.H.E 系统设计的一种避免死锁产生的算法。这个算法不仅适用于操作系统给进程分配资源,还可以用于其他需要进行资源分配和调度的场景。
该算法主要使用了四个数据结构:可利用资源向量 Available、最大需求矩阵 Max、分配矩阵 Allocation 以及需求矩阵 Need。这些数据结构分别记录了系统中当前可用的资源数量、每个进程对资源的最大需求量、系统已经分配给每个进程的资源数量以及每个进程还需要的资源数量。
在实现的过程中,银行家算法首先检查系统是否处于安全状态,如果是,则允许进程继续申请资源;否则,将拒绝进程的请求。如果系统进入不安全状态,则需要采取相应的措施使系统返回到安全状态。
总的来说,银行家算法是一种既简单又有效的死锁避免方法,它通过合理的资源分配和调度来防止系统进入不安全状态,从而避免了死锁的发生。
三、线程同步
我们在上面讲优化逻辑抢票的时候,给大家提到了在纯互斥环境下,如果锁分配不够合理,会导致其他线程的饥饿问题。
先讲个故事
你从墙边拿了钥匙去单间自习室中自习,一会出来了,刚把钥匙挂在墙边,看到有很多人在排队,你想着好不容易抢到了单间自习室,于是你刚放下钥匙又拿起了钥匙进去自习,一会又出来放钥匙了,看到人还是很多,心想把明早的也自习算了,于是又拿着钥匙去自习了,反反复复。你本质能反反复复的进去自习是因为你离钥匙最近,理论上虽然没问题,但却不合理。所以这个时候,要存在一个管理员,申请自习室的人必须要排队,出自习室的人,不能立马重新申请锁,必须排到队列尾部。
也就是说让所有的线程,获取锁的线程按照同样的顺序获取资源,这就是同步。
我们保证数据安全(线程申请锁失败再去排队)的情况下,让我们线程访问资源具有一定的顺序性
那我们该如何实现同步呢?
条件变量
1、条件变量
为什么需要使用条件变量
使用条件变量主要是因为它们提供了在多线程编程中一种有效的同步机制。当多个线程需要等待某个特定条件成立才能继续执行时,条件变量就显得尤为重要。通过条件变量,线程可以安全地进入等待状态,直到被其他线程显式地唤醒或满足等待的条件。这有助于避免线程的无谓轮询或忙等待,提高了系统的响应能力和效率。
注意:在使用条件变量时,必须确保与互斥锁一起使用,以避免竞态条件的发生。
条件变量是一个原生线程库提供的一个描述临界资源状态的对象或变量。
我们让出自习室的人把锁挂墙上,敲一下铃铛告诉下一个人准备进入,如果刚出来的这个人还想进入自习室自习,就应该到队尾排队。
2、条件变量接口
(1)pthread_cond_t
pthread_cond_t
是 POSIX 线程库(Pthreads)中用于表示条件变量的数据类型。
(2)pthread_cond_init(初始化)
- cond:要初始化的条件变量。
- attr:NULL。
返回值
- 如果成功,返回 0。
- 如果失败,返回错误码。
(3)pthread_cond_destroy(销毁)
参数
cond
:指向要销毁的条件变量的指针。
返回值
- 如果成功,返回 0。
- 如果失败,返回错误码。
(4)pthread_cond_wait
(等待)
- timedwait:指定的时间范围内等待条件变量下的线程(少用)。
- wait:指定的条件变量下进行等待,一定要入 cond 的队列,mutex 用于在锁中阻塞挂起时会自动释放锁,唤醒时会自动获取锁(后面生产者消费者中详细介绍)。
- cond:要在这个条件变量上等待。
- mutex:互斥量,后面详细解释。
pthread_cond_wait
的行为如下
- 解锁互斥锁:调用 pthread_cond_wait 的线程首先会释放(解锁)它当前持有的互斥锁。这一步是必要的,因为条件变量通常与互斥锁一起使用,以确保对共享数据的访问是同步的。解锁互斥锁允许其他线程获取该锁,从而可以安全地修改共享数据。
- 加入等待队列:在解锁互斥锁之后,调用 pthread_cond_wait 的线程会将自己添加到与该条件变量相关联的等待队列中。此时,线程进入阻塞状态,等待被唤醒。
- 阻塞并等待信号:线程在等待队列中保持阻塞状态,直到它收到一个针对该条件变量的信号(通过 pthread_cond_signal 或 pthread_cond_broadcast 发出)。需要注意的是,仅仅因为线程在等待队列中并不意味着它会立即收到信号;它必须等待直到有其他线程显式地发出信号。
- 重新获取互斥锁:当线程收到信号并准备从 pthread_cond_wait 返回时,它首先会尝试重新获取之前释放的互斥锁。如果此时锁被其他线程持有,那么该线程会阻塞在互斥锁的等待队列中,直到获得锁为止。这一步确保了线程在继续执行之前能够重新获得对共享数据的独占访问权。
- 检查条件:一旦线程成功获取到互斥锁,它会再次检查导致它调用 pthread_cond_wait 的条件是否现在满足。虽然通常认为在收到信号时条件已经满足,但这是一个编程错误的常见来源。正确的做法是在每次从 pthread_cond_wait 返回后都重新检查条件,因为可能有多个线程在等待相同的条件,或者条件可能在信号发出和线程被唤醒之间发生变化。
- 返回并继续执行:如果条件满足,线程会从 pthread_cond_wait 返回,并继续执行后续的代码。如果条件仍然不满足,线程可以选择再次调用 pthread_cond_wait 进入等待状态,或者执行其他操作。
(5)pthread_cond_signal && pthread_cond_broadcast(唤醒)
- signal:唤醒指定条件变量下等待的一个线程。
- broadcast:唤醒所有在该条件变量下等待的线程(少用)。
pthread_cond_signal:唤醒正在等待特定条件变量的一个线程。
pthread_cond_broadcast:用于唤醒所有正在等待指定条件变量的线程。
3、验证代码接口
代码如下:
#include <iostream>
#include <pthread.h>
#include <vector>
#include <string>
#include <unistd.h>
#include <cstring>
using namespace std;
int cnt = 0;
pthread_mutex_t mux = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
void* Runthread(void* args)
{
uint64_t i = (uint64_t )args;
string name = "pthread-" + to_string(i);
while(1)
{
pthread_mutex_lock(&mux);
pthread_cond_wait(&cond,&mux);
cout << name << ":" << cnt++ <<endl;
pthread_mutex_unlock(&mux);
}
return nullptr;
}
int main()
{
vector<pthread_t> tids;
for(uint64_t i = 0; i <= 5; i++)
{
pthread_t tid;
pthread_create(&tid,nullptr,Runthread,(void*)i);
tids.push_back(tid);
}
while(1)
{
sleep(1);
pthread_cond_signal(&cond);
cout << "one thread signal" << endl;
}
for(auto& e:tids)
{
pthread_join(e,nullptr);
}
return 0;
}
运行结果如下:
我们能看到是按到一定的顺序去获取资源的。
我们唤醒函数换成 pthread_cond_broadcast唤醒在等待队列所有线程。
运行结果如下: