线程安全:线程中对临界资源的访问操作是安全的
临界资源:公共资源,大家都能访问到的资源
线程安全的实现:同步与互斥
互斥:通过同一时间只有一个线程能够访问资源实现资源访问的安全性
同步:通过条件判断让线程对临界资源访问更加合理有序
互斥的实现:
互斥锁(通过互斥锁保护线程对临界资源的访问操作不会被打断)
本质:是一个只有0/1的计数器
原理:标记临界资源的两种访问状态,可访问或者不可访问。在线程访问临界资源之前,先进行互斥锁加锁操作。(判断是否可访问,可访问则返回,不可访问则阻塞)在线程访问临界资源完毕之后,进行解锁操作。互斥锁本身计数器的操作是原子操作。
操作流程:
(1)定义互斥锁变量:pthread_mutex_t mutex
(2)初始化互斥锁:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
int pthread_mutex_init(pthread_mutex_t* mutex, pthread_mutexattr_t* attr)
(3)在访问临界资源之前进行加锁
int pthread_mutex_lock(pthread_mutex_t* mutex)—阻塞接口
int pthread_mutex_trylock(pthread_mutex_t* mutex)—非阻塞接口
(4)在访问临界资源完毕之后解锁
int pthread_mutex_unlock(pthread_mutex_t* mutex)
(5)不再使用互斥锁, 则销毁
int pthread_mutex_destroy(pthread_mutex_t* mutex)
黄牛抢票实例(互斥锁保护多个线程对临界资源—全局变量tickets票数进行操作的过程):
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
int tickets = 100; //初始化全局变量---票数
void* scalpers(void* arg) //线程入口函数
{
pthread_mutex_t* mutex = (pthread_mutex_t*)arg; //将传入的void*型参数强转为pthread_mutex_t互斥锁类型
pthread_t tid = pthread_self(); //获取当前调用线程的tid
while(1)
{
pthread_mutex_lock(mutex); //在访问临界资源前加锁
if(tickets > 0) //若所剩票数大于0
{
// usleep(1);
printf("I got a ticket: %p - %d\n", tid, tickets); //打印当前进程的tid以及当前抢到的票数
tickets--; //所剩余票数减一
}
else //若所剩票数小于等于0
{
//加锁后,则需要在任意有可能退出线程的地方之前解锁
pthread_mutex_unlock(mutex); //进行解锁操作
pthread_exit(NULL); //退出当前调用线程
}
pthread_mutex_unlock(mutex); //访问临界资源完毕后解锁
usleep(1); //睡眠一微秒,轮转时间片
}
return NULL;
}
int main()
{
pthread_mutex_t mutex; //定义互斥锁变量
pthread_t tid[4]; //定义数组接收线程tid
int ret; //接收线程创建的返回值
pthread_mutex_init(&mutex, NULL); //初始化互斥锁变量
for(int i = 0; i < 4; i++)
{
//创建4个线程,入口函数名为scalpers,参数为void*类型的互斥锁变量地址
ret = pthread_create(&tid[i], NULL, scalpers, (void*)&mutex);
if(ret != 0) //若返回值非0则创建线程失败
{
printf("thread create error\n");
return -1;
}
}
for(int i = 0; i < 4; i++)
{
pthread_join(tid[i], NULL); //等待线程退出,回收资源
}
pthread_mutex_destroy(&mutex); //销毁互斥锁
return 0;
}
运行结果:
可以看到四个线程分别都抢到了票,并且在票数等于0时停止抢票
死锁:描述的是程序因为流程无法继续推进卡死的情况
死锁产生的原因:死锁产生的四个必要条件
(1)互斥条件:同一时间只有一个线程能够访问资源
(2)不可剥夺条件:我加的锁只有我能解
(3)请求与保持条件:得到A锁然后请求B锁,请求不到B,不释放A
(4)环路等待条件:A线程拿到1锁,然后请求2锁;B线程拿到2锁,请求1锁
预防死锁:预防四个必要条件的产生
(1)保证加解锁顺序一致
(2)使用非阻塞加锁
避免死锁:银行家算法,死锁检测算法…
同步的实现:通过条件判断资源获取是否合理—条件变量
条件变量:一个pcb等待队列+使线程阻塞以及唤醒的接口
原理:当线程获取资源不合理的时候,调用阻塞接口,使线程阻塞,将pcb挂到等待队列。等待资源访问合理的时候通过唤醒接口唤醒阻塞的等待队列上的线程。
注意:条件变量本身只提供阻塞与唤醒功能,而资源获取是否合理需要我们自己来判断完成。
操作流程:
(1)定义条件变量:pthread_cond_t cond
(2)初始化条件变量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER
int pthread_cond_init(pthread_cond_t* cond, pthread_condattr_t* attr)
(3)在获取资源不合理时调用阻塞接口使线程阻塞
int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex)—解锁,休眠,被唤醒后加锁
int pthread_cond_timedwait(pthread_cond_t* cond, pthread_mutex_t* mutex, struct timespec* abstime)—限制阻塞时长
(4)其他线程促使资源获取合理后,唤醒等待队列上的线程
int pthread_cond_signal(pthread_cond_t* cond)—至少唤醒一个
int pthread_cond_broadcast(pthread_cond_t* cond)—唤醒所有
(5)销毁:int pthread_cond_destroy(pthread_cond_t* cond)
条件变量代码实例(两个线程通过条件变量实现交替打印的控制):
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<pthread.h>
pthread_cond_t cond_cook; //定义厨师条件变量
pthread_cond_t cond_customer; //定义顾客条件变量
pthread_mutex_t mutex; //定义互斥锁
int counter = 0; //0表示没有饭,1表示有饭
void* cook(void* arg)
{
while(1)
{
pthread_mutex_lock(&mutex); //加锁
while(counter == 1) //循环判断是否有饭
{
pthread_cond_wait(&cond_cook, &mutex); //有饭则阻塞
}
printf("我做了一碗饭\n"); //没饭则做饭
counter++;
pthread_cond_signal(&cond_customer); //唤醒至少一个顾客线程吃饭
pthread_mutex_unlock(&mutex); //解锁
}
return NULL;
}
void* customer(void* arg)
{
while(1)
{
pthread_mutex_lock(&mutex); //加锁
while(counter == 0)
{
pthread_cond_wait(&cond_customer, &mutex); //没饭则阻塞
}
printf("真好吃!\n"); //有饭则吃饭
counter--;
pthread_cond_signal(&cond_cook); //唤醒厨师线程做饭
pthread_mutex_unlock(&mutex); //解锁
}
return NULL;
}
int main()
{
pthread_t tid1[3], tid2[3]; //定义数组保存所创建线程的tid
int ret; //保存创建线程的返回值
pthread_cond_init(&cond_cook, NULL); //初始化厨师条件变量
pthread_cond_init(&cond_customer, NULL); //初始化顾客条件变量
pthread_mutex_init(&mutex, NULL); //初始化互斥锁
for(int i = 0; i < 3; i++) //创建3个厨师线程,线程入口函数为cook
{
ret = pthread_create(&tid1[i], NULL, cook, NULL);
if(ret != 0)
{
printf("pthread create error\n");
return -1;
}
}
for(int i = 0; i < 3; i++) //创建3个顾客线程,线程入口函数为customer
{
ret = pthread_create(&tid2[i], NULL, customer, NULL);
if(ret != 0)
{
printf("pthread create error\n");
return -1;
}
}
for(int i = 0; i < 3; i++) //等待线程退出,回收资源
{
pthread_join(tid1[i], NULL);
pthread_join(tid2[i], NULL);
}
pthread_cond_destroy(&cond_cook); //销毁条件变量
pthread_cond_destroy(&cond_customer);
pthread_mutex_destroy(&mutex); //销毁互斥锁
return 0;
}
运行结果:
注意事项:
(1)条件的判断应该使用while循环判断:
三个顾客阻塞,厨师做好一碗饭唤醒顾客,但是顾客都被唤醒,开始加锁,只有一个顾客加锁成功,其他顾客卡在锁这里,等到这个顾客吃完饭解锁了,有可能获取到锁的不是厨师而是卡在锁这里的顾客,这时候如果没有二次判断有没有饭则有可能会出现在没有饭的情况下顾客吃到饭。
(2)在具有多种角色的情况下,应该使用多个条件变量
三个顾客,因为没有饭挂在阻塞队列上,一个厨师做好饭唤醒一个顾客,顾客吃完饭,唤醒等待队列上阻塞的pcb,但是因为队列上的顾客在前,有可能唤醒的不是厨师而是顾客,这个顾客二次判断因为没有饭再次陷入阻塞。
条件变量需要搭配互斥锁一起使用;对是否满足资源获取条件的判断需要循环进行;多种角色的情况下,使用多个条件变量,分别进行等待唤醒。
生产者消费者模型:一种典型的设计模式
设计模式:人们针对典型场景设计的解决方案
应用场景:应用于有大量的数据产生以及进行处理的场景
优点:解耦合,支持忙先不均,支持并发
为了能够支持并发,因此缓冲区必须实现线程安全
生产者与生产者之间应该保持互斥关系,消费者与消费者之间应该保持互斥关系,生产者与消费者之间应该保持同步与互斥的关系。
实现:两种对应角色的线程(一种入队,一种出队)+线程安全的队列
线程安全的队列实现:封装实现一个线程安全的队列类(阻塞队列)
class BlockQueue
有空闲节点可以入队数据,没有空闲节点则阻塞入队线程
有数据节点可以出队数据,没有数据节点则阻塞出队线程
通过条件变量+互斥锁实现线程安全的阻塞队列,最终实现生产者消费者模型:
#include<cstdio>
#include<queue>
#include<iostream>
#include<pthread.h>
//实现一个线程安全的队列类
class BlockQueue{
private:
int _capacity; //阻塞队列的容量
std::queue<int> _queue; //使用STL容器中的队列
pthread_cond_t _cond_pro; //定义生产者条件变量
pthread_cond_t _cond_con; //定义消费者条件变量
pthread_mutex_t _mutex; //定义互斥锁
public:
BlockQueue(int cap = 5):_capacity(cap) //阻塞队列类的构造函数,初始化列表中定义阻塞队列的容量
{
pthread_mutex_init(&_mutex, NULL); //初始化条件变量
pthread_cond_init(&_cond_pro, NULL);
pthread_cond_init(&_cond_con, NULL); //初始化互斥锁
}
~BlockQueue() //阻塞队列的析构函数,完成对条件变量及互斥锁的销毁
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond_pro);
pthread_cond_destroy(&_cond_con);
}
bool Push(int data) //向外部即生产者线程提供入队操作
{
pthread_mutex_lock(&_mutex); //加锁
W> while(_queue.size() == _capacity) //若阻塞队列的大小等于容量则表示装满了
{
pthread_cond_wait(&_cond_pro, &_mutex); //使生产者线程阻塞在生产者等待队列上
}
_queue.push(data); //data数据入队
pthread_cond_signal(&_cond_con); //唤醒消费者等待队列上的消费者线程
pthread_mutex_unlock(&_mutex);
return true;
}
bool Pop(int* data) //向消费者线程提供出队操作
{
pthread_mutex_lock(&_mutex); //加锁
while(_queue.empty()) //若队列为空
{
pthread_cond_wait(&_cond_con, &_mutex); //使消费者线程阻塞在消费者等待队列上
}
*data = _queue.front(); //获取队列的队头数据
_queue.pop(); //头删
pthread_cond_signal(&_cond_pro); //唤醒生产者等待队列上的生产者线程
pthread_mutex_unlock(&_mutex); //解锁
return true;
}
};
void* producer(void* arg) //生产者线程入口函数
{
BlockQueue* q = (BlockQueue*)arg; //将传入的void*类型参数强转为阻塞队列类
int i = 0;
while(1) //循环的向阻塞队列进行入队操作
{
q->Push(i);
printf("生产者入队数据:%d\n", i++);
}
return NULL;
}
void* consumer(void* arg) //消费者线程入口函数
{
BlockQueue* q = (BlockQueue*)arg;
while(1) //循环的向阻塞队列进行出队操作
{
int data;
q->Pop(&data);
printf("消费者出队数据:%d\n", data);
}
return NULL;
}
int main()
{
BlockQueue q; //定义一个阻塞队列的变量
pthread_t ptid[4], ctid[4]; //定义数组保存所创建线程的tid
int ret;
for(int i = 0; i < 4; i++)
{
ret = pthread_create(&ptid[i], NULL, producer, &q); //创建生产者线程
if(ret != 0)
{
printf("create thread error\n");
return -1;
}
}
for(int i = 0; i < 4; i++)
{
ret = pthread_create(&ctid[i], NULL, consumer, &q); //创建消费者线程
if(ret != 0)
{
printf("create thread error\n");
return -1;
}
}
for(int i = 0; i < 4; i++) //等待线程退出,回收资源
{
pthread_join(ptid[i], NULL);
pthread_join(ctid[i], NULL);
}
return 0;
}
运行结果:
信号量:
本质:计数器+pcb等待队列
作用:实现进程或线程间的同步与互斥
P操作:计数-1,判断计数 <0 则阻塞进程或线程(将其加入pcb等待队列)
V操作:计数+1,唤醒pcb等待队列上的一个进程或线程
同步的实现:通过计数器对资源进行计数。线程在获取资源之前进行P操作,其他线程产生一个资源后进行V操作
互斥的实现:初始化计数器为1,表示资源只有一个。1表示可访问,非1表示不可。线程在访问资源之前进行P操作,访问资源完毕之后进行V操作
操作流程:
头文件:#include<semaphore.h>
(1)定义信号量:sem_t sem
(2)初始化信号量
int sem_init(sem_t* sem, int pshared, unsigned int value)
sem:信号量变量;pshared:0—线程间,非0—进程间;value:初值
返回值:成功返回0;失败返回-1
(3)P操作:
sem_wait(sem_t* sem);sem_trywait();sem_timedwait()
(4)V操作:
sem_post(sem_t* sem)
(5)销毁信号量
sem_destroy(sem_t* sem)
通过信号量实现线程安全的环形队列,最终实现生产者与消费者模型:
#include<cstdio>
#include<vector>
#include<semaphore.h>
#include<pthread.h>
class RingQueue{
private:
std::vector<int> _arry; //调用线性表实现环形队列
int _capacity; //环形队列即数组的容量
int _step_read; //读位置在线性表里的下标
int _step_write; //写位置
sem_t _sem_lock; //用于实现互斥的信号量
sem_t _sem_idle; //对于生产者,空闲节点是资源
sem_t _sem_data; //对于消费者,数据节点是资源
public:
RingQueue(int cap = 5) : _arry(cap), _capacity(cap), _step_read(0), _step_write(0)
{
sem_init(&_sem_lock, 0, 1);
sem_init(&_sem_idle, 0, cap); //空闲节点的初值为数组容量
sem_init(&_sem_data, 0, 0); //数据节点的初值为0
}
~RingQueue()
{
sem_destroy(&_sem_data);
sem_destroy(&_sem_idle);
sem_destroy(&_sem_lock);
}
bool Push(int data)
{
sem_wait(&_sem_idle); //判断是否有空闲节点
sem_wait(&_sem_lock); //加锁
_arry[_step_write] = data; //将data数据在_step_write位置写入
_step_write = (_step_write + 1) % _capacity; //更新写位置
sem_post(&_sem_lock); //解锁
sem_post(&_sem_data); //数据节点+1,唤醒消费者
return true;
}
bool Pop(int* data)
{
sem_wait(&_sem_data); //数据节点-1,判断是否合理
sem_wait(&_sem_lock); //加锁
*data = _arry[_step_read]; //读取_step_read位置数据
_step_read = (_step_read + 1) % _capacity; //更新读位置
sem_post(&_sem_lock); //解锁
sem_post(&_sem_idle); //空闲节点+1,唤醒生产者
return true;
}
};
void* producer(void* arg) //生产者线程入口函数
{
RingQueue* q = (RingQueue*)arg; //将传入的void*类型参数强转为环形队列类
int i = 0;
while(1) //循环的向阻塞队列进行入队操作
{
q->Push(i);
printf("生产者入队数据:%d\n", i++);
}
return NULL;
}
void* consumer(void* arg) //消费者线程入口函数
{
RingQueue* q = (RingQueue*)arg;
while(1) //循环的向环形队列进行出队操作
{
int data;
q->Pop(&data);
printf("消费者出队数据:%d\n", data);
}
return NULL;
}
int main()
{
RingQueue q; //定义一个环形队列的变量
pthread_t ptid[4], ctid[4]; //定义数组保存所创建线程的tid
int ret;
for(int i = 0; i < 4; i++)
{
ret = pthread_create(&ptid[i], NULL, producer, &q); //创建生产者线程
if(ret != 0)
{
printf("create thread error\n");
return -1;
}
}
for(int i = 0; i < 4; i++)
{
ret = pthread_create(&ctid[i], NULL, consumer, &q); //创建消费者线程
if(ret != 0)
{
printf("create thread error\n");
return -1;
}
}
for(int i = 0; i < 4; i++) //等待线程退出,回收资源
{
pthread_join(ptid[i], NULL);
pthread_join(ctid[i], NULL);
}
return 0;
}
运行结果:
信号量与条件变量的区别与联系:
联系:都是用于实现同步的
区别:条件变量使用中条件是否满足需要我们自己判断,信号量不需要;条件变量的使用需要搭配互斥锁,信号量不需要
读者写者模型:
应用场景:读共享,写互斥
实现:读写锁—读共享,写互斥的锁
加读锁:读锁可以重复加(但是前提是这个锁没有被加写锁)
加写锁:写锁互斥加(前提是这个锁既没有被加读锁,也没有被加写锁)
读写锁是通过自旋锁实现的
自旋锁:一直占据cpu不释放,循环对加锁条件进行判断。自旋锁实时性高,但是不适用长时间等待的场景。
乐观锁—CAS锁、悲观锁、可重入锁、不可重入锁