淘先锋技术网

首页 1 2 3 4 5 6 7

一、生产消费者模型

1.1 什么是生产消费者模型?

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题
什么是强耦合问题呢?举个栗子
假设我们需要寄出一封邮件,我们有两个选择:

  1. 将信件交给邮递员,但我们需要事先打电话给邮局联系邮递员,并且要和邮递员约好时间等他来去邮件
  2. 将信件放到邮筒,让邮递员自己来取,我们不再干涉

对于第一种方式,我们需要联系邮递员,甚至需要认识邮递员,而且还要约时间等等,我们和邮递员的耦合度就很强,假如更换了邮递员我们就需要重新联系邮递员约时间等等

而使用第二种方式,我们和邮递员的耦合度就很弱,因为我们只需要写好邮件放到邮筒就可以了,至于是否更换邮递员都不会影响到我们寄邮件

如果我们将消费者和生产者定义成两个类,其中一个类修改了代码,那么另一个就需要修改代码。

1.2 为什么使用生产消费者模型?

为了降低耦合度我们使用生产消费者模型。

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。相当于我们栗子中的邮筒。

1.3 生产消费者模型的321原则

  1. 三种关系

生产者 VS 生产者:互斥
生产者 VS 消费者:同步与互斥
消费者 VS 消费者:互斥

  1. 两个角色

生产者
消费者

  1. 一个交易场所

阻塞队列

1.4 生产者消费者模型优点

  • 解耦
    生产者和消费者不直接进行通讯,而是通过阻塞队列进行通讯,降低了耦合度
  • 支持并发
    多线程轮循处理
  • 支持忙闲不均
    可以有效解决一方生产(消费)快,一方消费(生产)慢的问题

解决了上面两个问题后我们通过下面的C++实现了解怎么用生产消费者模型

二、基于BlockingQueue的生产者消费者模型

2.1 什么是BlockQueue(阻塞队列)?

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于

  • 队列为空时,从队列获取元素的操作将会被阻塞直到队列中被放入元素
  • 队列满时,往队列里存放元素的操作也会被阻塞直到元素被从队列中取出

以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞

2.2 C++模拟实现基于互斥锁、条件变量和阻塞队列的生消费者模型

<1> Makefile

main:main.cc
	g++ -o $@ $^ -lpthread -std=c++11 
.PHONY:clean
clean:
	rm main

<2> BlockQUeue.hpp

#ifndef __QUEUE_BLOCK_H
#define __QUEUE_BLOCK_H

#include<iostream>
#include<unistd.h>
#include<queue>
#include<pthread.h>

class BlockQueue{
  private:
    std::queue<int> q;  
    size_t cap;
    pthread_mutex_t lock;
    pthread_cond_t c_cond;  //将来消费者,在该条件变量下等!
    pthread_cond_t p_cond;  //将来生产者,在该条件下等!
  public:
    bool IsFull()
    {
      return q.size() >= cap;
    }
    
    bool IsEmpty()
    {
      return q.empty();
    }
    
    void LockQueue()
    {
      pthread_mutex_lock(&lock);
    }

    void UnLockQueue()
    {
      pthread_mutex_unlock(&lock);
    }

    void WakeUpConsumer()
    {
      std::cout << "wake up consumer...." << std::endl;
      pthread_cond_signal(&c_cond);
    }

    void WakeUpProductor()
    {
      std::cout << "wake up productor...." << std::endl;
      pthread_cond_signal(&p_cond);
    }

    void ProductorWait()
    {
      pthread_cond_wait(&p_cond, &lock); //1.要判断就必须进入临界区 2.持有锁进入的 3.wait时必须释放锁
    }

    void ConsumerWait()
    {
      pthread_cond_wait(&c_cond, &lock);
    }
    
  public:
    BlockQueue(size_t _cap)
      :cap(_cap)
    {
      pthread_mutex_init(&lock, nullptr);
      pthread_cond_init(&c_cond, nullptr);
      pthread_cond_init(&p_cond, nullptr);
    }
    void Put(int x)
    {
      LockQueue();
      while(IsFull()) //这里不能使用if判断,因为可能存在异常唤醒
      {
        WakeUpConsumer();
        ProductorWait();

      }
      q.push(x);
      UnLockQueue();
    }

    void Get(int &out)
    {
      LockQueue();
      while(IsEmpty())  //这里不能使用if判断,因为可能存在异常唤醒
      {
        WakeUpProductor();
        ConsumerWait();
      }
      out = q.front();
      q.pop();
      UnLockQueue();
    }
    ~BlockQueue()
    {
      pthread_mutex_destroy(&lock);
      pthread_cond_destroy(&c_cond);
      pthread_cond_destroy(&p_cond);
    }
};

#endif

<3> main.cc

#include "BlockQueue.hpp"

using namespace std;

void *consumer_run(void *arg)
{
  BlockQueue *bq = (BlockQueue *)arg;
  while(true)
  {
    int n = 0;
    bq->Get(n);
    cout << "consumer get data is " << n << endl;
  }
}

void *productor_run(void *arg)
{
  BlockQueue *bq = (BlockQueue *)arg;
  int count = 0;
  while(true)
  {
    //int data = rand()%10 + 1;
    int data = count % 5 + 1; //产生一个1-5的数字
    count++;
    bq->Put(data);
    cout<< "product data is : " << data << endl;
  }
}

int main()
{
  BlockQueue *bq = new BlockQueue(5);
  pthread_t c,p;
  pthread_create(&c, nullptr, consumer_run,(void*)bq);
  pthread_create(&p, nullptr, productor_run,(void*)bq);


  pthread_join(c,nullptr);
  pthread_join(p,nullptr);
  cout<<"BLOCKQUEUE"<<endl;
  delete bq;
  return 0;
}

<4> 三种运行结果

直接运行上述代码

我们还可以修改main函数中consumer_run和productor_run的消费和生产速度实现生产快(慢),消费慢(快)的情况
在这里插入图片描述
在这里插入图片描述