淘先锋技术网

首页 1 2 3 4 5 6 7

线程互斥

线程互斥及相关概念

线程互斥(Mutual Exclusion)是指在多线程环境下,同一时刻只能有一个线程访问共享资源,以避免对该资源的不正确访问,造成数据不一致等问题。

例如,如果有多个线程都要同时对同一个全局变量进行修改,那么就需要使用线程互斥来保证对该变量的访问是互斥的,也就是说,在任意时刻只能有一个线程对该变量进行修改。

临界资源(Critical Resource)是指在多线程环境下需要被多个线程共享访问的资源,对该资源的访问需要进行同步(如使用互斥进行互斥)以避免出现不正确的访问。

临界区:每个线程内部,访问临界资源的代码,就叫做临界区

互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用。是对临界资源保护的一种手段。

原子性:不会被任何调度机制影响的操作,该操作只有两态,要么完成,要么未完成。

多线程抢票

#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>

using namespace std;

// 共享资源,多线程同时访问(未来的临界资源)
int tickets = 10000;

void* getTickets(void* args)
{
    string* ps = (string*)args;
    while(true)
    {
        if(tickets > 0)  // 未来的临界区
        {
            usleep(1000);
            printf("%s : %d\n", ps->c_str(), tickets); // 未来的临界区
            // cout << *ps << " get ticket " << tickets << endl; 
            tickets--; // 未来的临界区
        }
        else
        {
            break;
        }
    }
    delete ps;
    return nullptr;
}


// 多线程抢票程序
int main()
{
    pthread_t tid[3];
    for(int i = 0; i < 3; ++i)
    {
        string* ps = new string("thread");
        ps->operator+=(to_string(i+1));
        pthread_create(tid + i, nullptr, getTickets, (void*)ps);
    }
    for(int i = 0; i < 3; ++i)
    {
        pthread_join(tid[i], nullptr);
        // printf("主线程等待回收新线程%d成功\n", i + 1);
        // cout << "主线程等待回收新线程" << i+1 << "成功" << endl;
    }
    return 0;
}

分析

上方程序为多线程抢票程序,全局数据tickets为共享资源(未来的临界资源,此时还没有进行互斥保护),多个线程对getTickets函数进行了重入,getTickets方法中对全局tickets变量访问和修改的代码都是未来的临界区,如if判断,printf打印及tickets--代码都是未来的临界区代码。

因为多线程并发执行,访问共享资源,因时序及线程切换等原因造成的数据不一致等问题。我们则需要对访问共享资源的代码进行加锁保护,进行线程互斥。

为什么多个线程并发访问共享资源时,因为线程切换就会造成数据不一致呢?下面举两点解释说明:

  1. if(tickets > 0):tickets > 0判断的本质也是计算,则该代码执行时需要CPU进行逻辑运算,tickets全局数据存储在内存中,则需要将tickets数据load到CPU寄存器中,本质就是将数据load到当前线程的上下文中。执行if判断的后面代码块时,因为多线程并发执行,此时的执行线程随时可能被切换(此时寄存器中的线程上下文数据也会被线程保存起来),则就可能造成多个执行线程同时进入if判断内部。若此时tickets为1,则就会因为线程切换造成tickets减到0甚至-1。(上方程序中的usleep更加提高了这种情况发生的可能性)

  1. tickets--:这条C语句在不进行优化的情况下翻译为汇编时,最少会变为三条:1. load到CPU寄存器中 2. 对寄存器内容进行-- 3. 将寄存器数据load回内存中。因此这个--操作并不是原子的,执行到哪一步时都有可能进行线程切换。则存在以下场景:两个线程,线程1和线程2接下来要进行tickets--操作,此时 tickets为10,线程1执行完load到寄存器之后,被切换了,此时线程1会保存它的上下文数据,比如此时保存tickets的寄存器值,其他临时数据,程序计数器eip,栈顶栈底指针的值等。线程2执行tickets--的过程中没有被切换,此时内存中的tickets的值成功被--到了9。再切换为线程1,线程1执行第二步和第三步。此时内存中的tickets又被重复写入到了9。

造成上方多线程抢票程序问题的主要原因其实是第一点,第二点也会存在,只是概率相对更低。实际的执行的情况会比上方所述复杂的多,总之这样的多线程并发访问共享资源的程序是有问题的。

因此我们需要进行线程互斥,常见的保护措施就是互斥锁。使得加锁和解锁之间的代码区域同一时刻只能有一个线程执行,这样的代码区域称为临界区,tickets数据称为临界资源。

pthread线程库mutex互斥锁

// 初始化互斥锁
    // 1. 静态分配,适用于全局或静态的互斥锁
       pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    // 2. 动态分配,适用于局部的互斥锁
       int pthread_mutex_init(pthread_mutex_t *restrict mutex,
              const pthread_mutexattr_t *restrict attr);   // 参数二设为nullptr即可
// 销毁互斥锁
    // 使用PTHREAD_MUTEX_INITIALIZER初始化的互斥锁不需要销毁
       int pthread_mutex_destroy(pthread_mutex_t *mutex);
// 加锁,解锁
       int pthread_mutex_lock(pthread_mutex_t *mutex);
       int pthread_mutex_unlock(pthread_mutex_t *mutex);
    // int pthread_mutex_trylock(pthread_mutex_t *mutex);

调用pthread_mutex_lock时,可能出现以下情况。

  1. 互斥锁处于未锁状态,该函数会将互斥锁锁定,同时返回成功(0)

  1. 调用时,互斥锁已经被其他线程锁定,或者有其他线程同时申请互斥锁且此线程没有竞争到互斥锁。则pthread_mutex_lock会将调用线程进行阻塞等待,等待其他线程解锁该互斥锁。

#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>

using namespace std;

// 共享资源,多线程同时访问(未来的临界资源)
int tickets = 3000;
// pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;

struct ThreadData
{
public:
    ThreadData(const string& tname, pthread_mutex_t* pmtx)
    : tname_(tname), pmtx_(pmtx)
    {}
    string tname_;
    pthread_mutex_t* pmtx_;
};

void* getTickets(void* args)
{
    ThreadData* td = (ThreadData*)args;
    while(true)
    {
        pthread_mutex_lock(td->pmtx_);
        if(tickets > 0)  // 未来的临界区
        {
            usleep(1000);
            printf("%s : %d\n", td->tname_.c_str(), tickets); // 未来的临界区
            // cout << *ps << " get ticket " << tickets << endl;
            tickets--; // 未来的临界区
            pthread_mutex_unlock(td->pmtx_);
        }
        else
        {
            pthread_mutex_unlock(td->pmtx_);
            break;
        }
        // usleep(1000);
    }
    delete td;
    pthread_exit(nullptr);
    // return nullptr;
}

// 多线程抢票程序
int main()
{
    pthread_t tid[3];
    pthread_mutex_t mtx;
    pthread_mutex_init(&mtx, nullptr);
    for(int i = 0; i < 3; ++i)
    {
        string s("thread");
        s += to_string(i+1);
        ThreadData* td = new ThreadData(s, &mtx);
        pthread_create(tid + i, nullptr, getTickets, (void*)td);
    }
    for(int i = 0; i < 3; ++i)
    {
        pthread_join(tid[i], nullptr);
        // printf("主线程等待回收新线程%d成功\n", i + 1);
        // cout << "主线程等待回收新线程" << i+1 << "成功" << endl;
    }
    pthread_mutex_destroy(&mtx);
    return 0;
}
  1. 不加锁时,多线程并发执行,效率较高。加锁之后,同一时刻只会有一个线程执行临界区代码,其他线程都会在pthread_mutex_lock这里阻塞等待,等待这个锁被解锁。效率会降低。因此加锁的粒度越小越好。

  1. 加锁之后,线程在临界区内依旧会被切换,但是不会造成之前的数据不一致等问题。因为执行线程切换时,时持有锁被切换的(调用过pthread_mutex_lock),其他线程要想进入临界区执行临界区代码,也要先申请锁,此时它是申请不成功的,会阻塞等待持有锁线程解锁。因此同一时刻只会有一个线程进入临界区执行临界区代码访问临界资源。从而保证了临界区中数据的一致性。

  1. 加锁之后,临界区代码是串行执行的。而不是之前的多线程并发执行。

要进入临界区访问临界资源,每个线程必须先调用pthread_mutex_lock申请锁,则每个线程必须看到同一把锁&&访问它(pthread_mutex_t)。则锁本身就是一个共享资源(类比上方的tickets),那么如何保证多线程加锁时,访问锁的安全呢?

互斥锁mutex的实现原理

共识:

在汇编的角度,我们认为一条汇编语句的执行是原子的,也就是要么执行完成,要么未执行。没有中间态。

在执行流视角,CPU内部的寄存器,本质叫做当前执行流的上下文。这些寄存器,空间上是被所有执行流共享的,但是寄存器的内容,是每一个执行的执行流私有的,叫做执行流的上下文。

上图为pthread_mutex_lock和pthread_mutex_unlock的伪代码。

为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单 元的数据相交换,由于只有一条指令,保证了操作的原子性。

lock:mutex可以理解为pthread_mutex_t互斥锁,初始化后,在内存中它的值为1。假设现有两个线程,线程a执行movb,将CPU寄存器%al的值置为0,然后被切换了(注意此时%al寄存器的内容是线程a的上下文,切换时要保存起来),线程b执行movb,exchange,将内存中mutex内存块存储的1和%al的0相交换(注意此操作是原子的),至此,线程b申请锁成功,后面会执行return语句。此时内存中mutex的值为0。线程切换为线程a,根据程序计数器的值,继续执行exchange语句,将%al的0和内存中的0交换(表示线程b申请锁失败,此时互斥锁已经被其他线程锁定了,竞争锁失败)。之后就会执行挂起等待,等待申请锁的线程执行unlock:将内存中mutex的值置为1,下次线程a执行goto lock,如果是第一个执行exhcnage %al,mutex语句的线程,则线程a竞争锁成功。

上方只是大致描述竞争锁的原理,其实根本上就是竞争锁时,利用exchange这样的指令的原子性,谁先执行exchange,将内存中1这样的公有数据变为线程上下文数据(私有数据),则表示竞争锁成功。

重新理解,线程在临界区内也会被切换,但是它是持有锁被切换的。其他线程要想进入临界区比如申请锁,此时会申请失败阻塞等待,等待持有锁线程解锁(执行unlock)

可重入VS线程安全 - 略了

略略略略略略略略

死锁

死锁(Deadlock)是指两个或多个进程(或线程)在执行过程中,因互相等待对方释放资源而陷入无限等待的一种状态。

例如,如果线程A获取了锁1,但需要等待线程B释放锁2才能继续执行,而线程B同时获取了锁2,但需要等待线程A释放锁1才能继续执行,那么两个线程就会陷入无限等待的状态,即死锁。

死锁是一种非常严重的问题,因为它会导致应用程序无法继续执行,并可能导致系统崩溃。因此,在编写多线程或多进程的应用程序时,必须小心处理锁的获取和释放顺序,以避免死锁的发生。

死锁的四个必要条件:

  1. 互斥条件:一个资源每次只能被一个执行流使用(多线程互斥场景下使用了互斥锁)

  1. 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放

  1. 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺

  1. 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系

避免死锁:

  1. 破坏死锁的四个必要条件

  1. 加锁顺序一致

  1. 避免锁未释放的场景

  1. 资源一次性分配

抽象抽象~

线程同步

解决的问题:

一、

例如上方的多线程抢票程序,会发生一段时间甚至整个程序运行过程中都是一个线程在抢票,也就是某一个线程因为调度器调度的缘故一直抢到了锁,获取了临界资源。导致其他线程长时间访问不到临界资源,造成其他线程的饥饿问题。GPT:执行流饥饿(Starvation)问题是指某个线程或进程无法获得所需的系统资源,导致它无法继续执行的一种情况。在并发编程中,如果多个线程或进程同时竞争一些共享资源,可能会出现某些线程或进程一直得不到访问共享资源的机会,导致它们无法执行或执行效率非常低下,这就是执行流饥饿问题。

二、

线程在访问临界资源前需要加锁,这是为了保护临界资源,避免多线程并发访问导致数据不一致的问题。同时,在某些情况下,例如一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量进行线程同步。

此情况下,队列为临界资源,线程在获取临界资源前需要先判断临界资源是否就绪,是否满足获取条件,而这种判断本质也是对临界资源的一种访问,故需要在加锁和解锁之间的互斥条件下进行。因此可能造成一种情况:线程加锁,判断临界资源(比如此情况下的队列)是否就绪,未就绪,解锁。因为它并不知道什么时候就绪,即队列中有新增节点(由另一个线程完成此工作),则该线程就需重复加锁,判断,解锁的工作。这个工作无疑是浪费锁资源,不合理的。

解决上方两种问题,我们就可以利用条件变量进行线程同步。

线程同步:

同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问 题,叫做同步。

条件变量:

// 条件变量的初始化与销毁
       int pthread_cond_destroy(pthread_cond_t *cond);
       int pthread_cond_init(pthread_cond_t *restrict cond,
              const pthread_condattr_t *restrict attr);
       pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
// 使线程在cond条件变量下等待,mutex互斥锁
       int pthread_cond_wait(pthread_cond_t *restrict cond,
              pthread_mutex_t *restrict mutex);
// 唤醒在cond条件变量下等待的所有线程
       int pthread_cond_broadcast(pthread_cond_t *cond);
// 唤醒在cond条件变量下等待的一个线程
       int pthread_cond_signal(pthread_cond_t *cond);

pthread_cond_signal用于发送一个通知信号,唤醒等待在条件变量上的一个线程。如果有多个线程在等待,那么只有一个线程会被唤醒,并且系统并不保证哪个线程会被唤醒。因此pthread_cond_signal通常用于通知某个线程可以继续执行。

pthread_cond_broadcast用于发送广播通知信号,唤醒所有等待在条件变量上的线程。这意味着所有等待线程都会被唤醒,并且可以同时开始执行。因此,pthread_cond_broadcast通常用于通知多个线程可以同时开始执行。

#include <iostream>
#include <pthread.h>
#include <string>
#include <functional>
#include <unistd.h>
using namespace std;

struct ThreadData;
#define THREAD_NUM 5
typedef void (*func_t)(ThreadData*);
// #define std::function<void (ThreadData*)> func

bool quit = false;

struct ThreadData
{
public:
    ThreadData(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond, func_t func)
    :name_(name), pmtx_(pmtx), pcond_(pcond), func_(func)
    {}
    string name_;
    pthread_mutex_t* pmtx_;
    pthread_cond_t* pcond_;
    func_t func_;
};

void thread1(ThreadData* td)
{
    while(!quit)
    {
        pthread_mutex_lock(td->pmtx_);
        // 这里原本是要先判断临界资源是否就绪,若不就绪则wait等待
        cout << td->name_ << " wait" << endl;
        pthread_cond_wait(td->pcond_, td->pmtx_);
        cout << td->name_ << " wait done" << endl;
        pthread_mutex_unlock(td->pmtx_);
        sleep(1);
    }
}

void thread2(ThreadData* td)
{
    while(!quit)
    {
        pthread_mutex_lock(td->pmtx_);
        // 这里原本是要先判断临界资源是否就绪,若不就绪则wait等待
        cout << td->name_ << " wait" << endl;
        pthread_cond_wait(td->pcond_, td->pmtx_);
        cout << td->name_ << " wait done" << endl;
        pthread_mutex_unlock(td->pmtx_);
        sleep(1);
    }
}

void thread3(ThreadData* td)
{
    while(!quit)
    {
        pthread_mutex_lock(td->pmtx_);
        // 这里原本是要先判断临界资源是否就绪,若不就绪则wait等待
        cout << td->name_ << " wait" << endl;
        pthread_cond_wait(td->pcond_, td->pmtx_);
        cout << td->name_ << " wait done" << endl;
        pthread_mutex_unlock(td->pmtx_);
        sleep(1);
    }
}

void thread4(ThreadData* td)
{
    while(!quit)
    {
        pthread_mutex_lock(td->pmtx_);
        // 这里原本是要先判断临界资源是否就绪,若不就绪则wait等待
        cout << td->name_ << " wait" << endl;
        pthread_cond_wait(td->pcond_, td->pmtx_);
        cout << td->name_ << " wait done" << endl;
        pthread_mutex_unlock(td->pmtx_);
        sleep(1);
    }
}

void thread5(ThreadData* td)
{
    while(!quit)
    {
        pthread_mutex_lock(td->pmtx_);
        // 这里原本是要先判断临界资源是否就绪,若不就绪则wait等待
        cout << td->name_ << " wait" << endl;
        pthread_cond_wait(td->pcond_, td->pmtx_);
        cout << td->name_ << " wait done" << endl;
        pthread_mutex_unlock(td->pmtx_);
        sleep(1);
    }
}

void* entry(void* args)
{
    ThreadData* td = (ThreadData*)args;
    td->func_(td);
    delete td;
    pthread_exit(nullptr);
}

int main()
{
    pthread_mutex_t mtx;
    pthread_cond_t cond;

    pthread_mutex_init(&mtx, nullptr);
    pthread_cond_init(&cond, nullptr);

    pthread_t tid[THREAD_NUM];
    func_t funcs[THREAD_NUM] = {thread1, thread2, thread3, thread4, thread5};
    for(int i = 0; i < THREAD_NUM; ++i)
    {
        string name = "thread";
        name += to_string(i + 1);
        ThreadData* td = new ThreadData(name, &mtx, &cond, funcs[i]);
        pthread_create(tid + i, nullptr, entry, (void*)td);
    }
    int cnt = 10;
    while(cnt != 0)
    {
        cnt--;
        sleep(2);
        pthread_cond_signal(&cond);
        // pthread_cond_broadcast(&cond);
    }
    cout << "main thread control done" << endl;
    quit = true;
    pthread_cond_broadcast(&cond);

    for(int i = 0; i < THREAD_NUM; ++i)
    {
        pthread_join(tid[i], nullptr);
        printf("main thread waits thread%d success\n", i+1);
    }

    pthread_mutex_destroy(&mtx);
    pthread_cond_destroy(&cond);
    return 0;
}

现象结论:

pthread_cond_siganl唤醒某条件变量下的一个线程时,并不是完全随机的,而是类似于在条件变量下组织了一个队列,按照等待顺序去唤醒。

pthread_cond_broadcast唤醒某条件变量下的全部线程时,确实是全部唤醒,但一次的内部也是有顺序的,也是按照等待顺序唤醒。

这就证实了利用条件变量进行线程同步时,可以让线程按照一定顺序访问临界资源,避免线程饥饿的问题。


上方这个条件变量的测试代码。仅仅是最简单的一种测试代码,实际上,这里的pthread_cond_wait和pthread_cond_signal的使用是完全生硬的使用测试。因为这里根本没有临界资源,更不要说判断临界资源是否就绪,若不就绪,则调用wait等待(正确的使用方式) 具体的条件变量的使用场景见下方生产者消费者模型,在恰当的场景下才能理解条件变量的正确使用方式。

生产者消费者模型

生产者消费者模型是什么

生产者消费者模型中有三个关键成员:生产者、消费者和缓冲区。

  1. 生产者(Producer):生产者负责生产数据,并将数据存储到缓冲区中。在多线程场景中,生产者线程是用来执行生产者任务的线程。

  1. 消费者(Consumer):消费者负责消费数据,并从缓冲区中取出数据。在多线程场景中,消费者线程是用来执行消费者任务的线程。

  1. 缓冲区(Buffer):缓冲区是用来存储生产者生产的数据的地方,同时也是消费者从中取出数据的地方。缓冲区可以是一个数组、队列、链表等数据结构。在多线程场景中,缓冲区需要通过互斥和条件变量等同步机制来实现生产者和消费者之间的协调和同步。

生产者、消费者和缓冲区是生产者消费者模型中的三个关键成员,它们之间的协调和同步可以通过同步机制来实现。这种模型可以有效地实现资源的共享和利用,提高系统的效率。

便于记忆:321原则

一个交易场所:缓冲区

二个角色:生产者,消费者

三种关系:生产者和生产者:竞争,互斥关系。消费者和消费者:竞争,互斥关系。生产者和消费者:互斥,同步关系。(利用互斥锁和条件变量来实现。)

基于BlockingQueue的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别 在于,当队列为空时,从队列获取元素的操作(消费者线程进行)将会被阻塞,直到队列中被放入了元素(生产者线程进行);当队列满时,往队列里存放元 素的操作(生产者线程进行)也会被阻塞,直到有元素被从队列中取出(消费者线程进行)

因为缓冲区实际上就是一个某种数据结构组织的内存空间,只是这里用队列来充当这个缓冲区,同时加了同步和互斥机制,所以是一个阻塞队列。

阻塞队列生产消费模型代码实现

BlockQueue.hpp

#pragma once

#include <pthread.h>
#include <queue>

const int gBQCapacity = 5;

template<class T>
class BlockQueue
{
public:
    BlockQueue(int capacity = gBQCapacity)
    : capacity_(capacity)
    {
        pthread_mutex_init(&mtx_, nullptr);
        pthread_cond_init(&notFull_, nullptr);
        pthread_cond_init(&notEmpty_, nullptr);
    }
    void push(const T& data)
    {
        // 生产者
        pthread_mutex_lock(&mtx_);
        while(bq_.size() == capacity_)
        {
            // 临界资源不就绪
            pthread_cond_wait(&notFull_, &mtx_);
        }
        bq_.push(data);
        // 此时,缓冲区中一定有数据了
        pthread_cond_signal(&notEmpty_);
        pthread_mutex_unlock(&mtx_);
    }
    void pop(T* pData)
    {
        // 消费者
        pthread_mutex_lock(&mtx_);
        while(bq_.size() == 0)
        {
            // 临界资源不就绪,wait等待
            pthread_cond_wait(&notEmpty_, &mtx_);
        }
        *pData = bq_.front();
        bq_.pop();
        pthread_cond_signal(&notFull_);
        pthread_mutex_unlock(&mtx_);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&mtx_);
        pthread_cond_destroy(&notFull_);
        pthread_cond_destroy(&notEmpty_);
    }
private:
    std::queue<T> bq_;   // 阻塞队列,缓冲区
    int capacity_;       // 阻塞队列的最大容量
    pthread_mutex_t mtx_; // 互斥锁
    pthread_cond_t notFull_; // 条件变量
    pthread_cond_t notEmpty_; // 条件变量
};

ProducerConsumer.cc

#include "BlockQueue.hpp"
#include <iostream>
#include <unistd.h>
using namespace std;

pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;

void* producer(void* arg)
{
    // 生产者
    BlockQueue<int>* bq = (BlockQueue<int>*)arg;
    int n = 1;
    while(true)
    {
        // pthread_mutex_lock(&mtx);
        bq->push(n);
        // cout << "producer : " << n << endl;
        printf("%dproducer : %d\n", pthread_self(), n);
        ++n;
        // pthread_mutex_unlock(&mtx);   // 会造成死锁!
        // sleep(1);
    }
    return nullptr;
}

void* consumer(void* arg)
{
    // 消费者
    BlockQueue<int>* bq = (BlockQueue<int>*)arg;
    while(true)
    {
        // pthread_mutex_lock(&mtx);
        int n;
        bq->pop(&n);
        // cout << "consumer : " << n << endl;
        printf("%dconsumer : %d\n", pthread_self(), n);
        // pthread_mutex_unlock(&mtx);
        // usleep(500000);
        sleep(1);
    }
    return nullptr;
}

int main()
{
    pthread_t pTid[2];
    pthread_t cTid[3];
    BlockQueue<int> bq;
    for(int i = 0; i < 2; ++i)
        pthread_create(pTid + i, nullptr, producer, &bq);
    for(int i = 0; i < 3; ++i)
        pthread_create(cTid + i, nullptr, consumer, &bq);
    for(int i = 0; i < 2; ++i)
        pthread_join(pTid[i], nullptr);
    for(int i = 0; i < 3; ++i)
        pthread_join(cTid[i], nullptr);
    return 0;
}

互斥锁&共享变量:

  1. 因为利用条件变量进行了线程同步,因此可以让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题。(生产者和消费者调用pthread_cond_siganl时是按照条件变量的等待顺序进行唤醒的)

  1. 条件变量的使用需要环境,在生产者消费者模型中,生产者生产完数据之后,可以signal消费者,因为生产者知道缓冲区中新增数据了。消费者消费完数据之后,可以signal生产者,因为消费者知道缓冲区中有空间了。

  1. 为什么int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);的第二个参数是一个互斥锁:当一个线程调用此函数时,是因为它检测到了临界资源不就绪,不满足访问条件,因此它需要等待(例如消费者等待生产者生产数据),而对于临界资源的检测本身也是一种访问,故需要在互斥的条件下进行,也就是加锁和解锁之间。因此,当线程调用pthread_cond_wait时,需要传入一个互斥锁,pthread_cond_wait会先将互斥锁解锁,以便其它线程可以进入临界区访问临界资源。比如:消费者检测阻塞队列内没有数据,需要等待,但是它必须先把互斥锁进行解锁,生产者才能进入临界区生产数据。

  1. pthread_cond_wait第二个参数是一个锁,当成功调用wait之后,传入的锁,会被自动释放!当线程被唤醒的时候,pthread_cond_wait,会自动帮助线程获取锁(要竞争锁)。由此可见,条件变量本身就是和互斥锁配合起来使用的。

  1. pthread_cond_wait是一个函数,可能调用失败,也可能存在伪唤醒的情况,因此编码规范为:while (条件为假) pthread_cond_wait(cond, mutex);

你猜猜为什么.pdf

生产者消费者模型的优点:

  1. 分离生产和消费,提高系统的解耦性和可维护性。

  1. 充分利用多核CPU的并行处理能力,从而提高系统的处理性能和响应速度。

  1. 注意:生产消费的过程并非只是生产者往缓冲区放数据,消费者从缓冲区拿数据。更重要和耗时的是生产者生产数据和消费者处理数据的过程。因为缓冲区放数据和拿数据是互斥的,所以这里的执行是串行执行的,并没有提高效率。真正提高效率的是,提高生产者生产数据和消费者处理数据的并发度。以避免出现生产过剩或消费不及的情况,从而使系统的负载保持在一个合理的范围内。

  1. 设计多生产者多消费者的目的也是为了提高生产数据和消费数据的并发度,当然,如果生产数据和消费数据的过程很简单,则多生产多消费的意义也就不大了。若过程是大量IO的过程,则可以提高整体效率。

这块多少有点抽象和哲学的成分在内了...

前面的生产消费代码,可以把生产者输入整数和消费者消费整数稍微修改一下,体现出生产者生产数据和消费者处理数据的过程... 略略略