网站集群建设ppt,无锡电子商务网站建设,江西建筑人才网,做ppt封面的网站目录 22.1 什么是生产者-消费者模型 22.2 为什么要用生产者-消费者模型? 22.3 生产者-消费者模型的特点 22.4 BlockingQueue实现生产者-消费者模型 22.4.1 实现阻塞队列BlockQueue 1) 添加一个容器来存放数据 2)加入判断Blocking Queue情况的成员函数 3)实现push和pop方法 4)完…目录 22.1 什么是生产者-消费者模型 22.2 为什么要用生产者-消费者模型? 22.3 生产者-消费者模型的特点 22.4 BlockingQueue实现生产者-消费者模型 22.4.1 实现阻塞队列BlockQueue 1) 添加一个容器来存放数据 2)加入判断Blocking Queue情况的成员函数 3)实现push和pop方法 4)完整代码 22.4.2 [可选] 修改成RAII风格代码 22.4.3 定义Blocking Queue中存放Task类任务 22.4.4 生产者-消费者模型主函数实现 1) 实现主函数 2)定义任务函数 3)定义消费者函数 consumer,生产者函数 producer 4)完整代码 22.4.5 makefile编译 22.4.6 效果展示 22.1 什么是生产者-消费者模型
生产者 - 消费者模型 Producer-consumer problem 是一个非常经典的多线程并发协作的模型 在这个模型中有两种角色
生产者:生成数据并将其放入共享资源中消费者:从共享资源中获取数据并进行处理。
它们共享一个有限的资源比如一个缓冲区。
我们可以用超市购物的场景来解释生产者-消费者模型 生产者在这个例子中生产者是超市的供应商。他们将各种商品产品送到超市的货架上让消费者购买。供应商不断地提供新货物并放置在货架上。共享资源在这个例子中共享资源就是超市的货架。货架有限无法容纳无限数量的商品。因此货架可以看作是一个有界缓冲区只能容纳一定数量的商品。消费者消费者是超市的顾客。他们来到超市从货架上选购商品并将其购买。消费者会不断地从货架上取走商品。潜在问题:货架容量有限供应商不能无限制地往货架上放商品否则会导致货架满了无法再放入新商品。同样如果货架上没有商品了顾客无法购买商品会感到不满。超市需要协调供应商和顾客的行为。解决方案:供应商必须在货架有空间时才能往货架上放置商品否则需要等待。顾客只有在货架上有商品时才能选购否则需要等待。这种协调可以通过合适的管理和排队机制来实现以确保货架的正常供应和顾客的购买需求。 22.2 为什么要用生产者-消费者模型? 缓冲和平衡负载 在多线程开发中为了解决生产者和消费者之间速度不匹配的问题常常会引入一个缓冲区来平衡生产和消费的速度差异。 缓冲区的作用是暂时存储生产者生产的数据以便消费者在需要时取出。这样一来即使生产者的速度比消费者快生产者也不需要等待消费者立即处理数据而是可以继续生产新的数据并将其放入缓冲区。同样如果消费者的速度比生产者快消费者也可以从缓冲区中取出数据并进行处理而不必等待新数据的到来。 解耦生产者和消费者 生产者和消费者可以独立运行彼此之间无需直接交互。这种解耦可以简化系统的设计和维护并且允许更容易地修改或替换生产者和消费者的实现。 22.3 生产者-消费者模型的特点
多线程同步与互斥生产者消费者模型是一个典型的多线程同步与互斥场景。多个生产者和消费者之间需要同步操作共享资源同时确保互斥访问避免数据竞争和不一致状态。 三种关系 生产者与生产者之间存在互斥关系多个生产者不能同时往共享资源中添加数据需要通过互斥机制保证只有一个生产者访问资源。消费者与消费者之间存在互斥关系多个消费者不能同时从共享资源中取出数据也需要通过互斥机制保证只有一个消费者访问资源。生产者与消费者之间存在互斥关系和同步关系生产者生产数据后需要通知消费者进行消费消费者消费完数据后需要通知生产者进行生产。这种同步关系确保生产者和消费者之间的顺序执行。 两种角色生产者和消费者是模型中的两种核心角色通常由线程或进程来扮演。生产者负责生成数据并放入共享资源而消费者负责从共享资源中取出数据并进行处理。 一个交易场所共享资源通常是一个缓冲区用于暂时存储生产者生产的数据以便消费者进行消费。这个交易场所可以是内存中的一段缓冲区也可以是其他形式的数据结构如队列、管道等。 我们用代码编写生产者消费者模型的时候本质就是对这三个特点进行维护。 22.4 BlockingQueue实现生产者-消费者模型
22.4.1 实现阻塞队列BlockQueue 阻塞队列Blocking Queue是一种常用于实现生产者和消费者模型的数据结构 阻塞队列为什么适用于实现生产者和消费者模型
当队列为空时从队列获取元素的操作将会被阻塞直到队列中放入了元素。当队列满时往队列里存放元素的操作会被阻塞直到有元素从队列中取出。
实现阻塞队列的基本原理 阻塞队列通过使用互斥锁和条件变量来确保对队列的访问是线程安全的。互斥锁用于保护对队列的并发访问而条件变量用于在适当的时候通知等待的线程。 当生产者要向队列中放入数据时首先会获取互斥锁以确保在放入数据的过程中不会被其他线程干扰。然后生产者会检查队列是否已满如果队列已满则生产者会等待条件变量直到队列有空闲空间为止。 同样地当消费者要从队列中取出数据时也会先获取互斥锁以确保在取出数据的过程中不会被其他线程干扰。然后消费者会检查队列是否为空如果队列为空则消费者会等待条件变量直到队列中有数据可取。 这种同步和互斥机制确保了生产者和消费者之间的顺序执行。生产者和消费者之间通过条件变量进行通信生产者负责向队列中放入数据消费者负责从队列中取出数据二者之间通过互斥锁确保对队列的安全访问。 介绍完原理,我们开始一步一步用代码来实现 1) 添加一个容器来存放数据 我们使用STL中现成的queue来模拟实现Blocking Queue ,这里我们创建一个名为BlockQueue.hpp的文件来定义BlockingQueue类 const int gDefaultCap 5;
template class T
class BlockQueue
{
public:BlockQueue(int capacity gDefaultCap) : capacity_(capacity){pthread_mutex_init(mtx_, nullptr);pthread_cond_init(Empty_, nullptr);pthread_cond_init(Full_, nullptr);}~BlockQueue(){pthread_mutex_destroy(mtx_);pthread_cond_destroy(Empty_);pthread_cond_destroy(Full_);}
private:std::queueT bq_; // 阻塞队列int capacity_; // 容量上限pthread_mutex_t mtx_; // 通过互斥锁保证队列安全pthread_cond_t Empty_; // 用它来表示bq 是否空的条件pthread_cond_t Full_; // 用它来表示bq 是否满的条件
}; 这里我们默认capacity为5,具体可以通过修改gDefaultCap改变 2)加入判断Blocking Queue情况的成员函数 bool isQueueEmpty()
{return bq_.size() 0;
}
bool isQueueFull()
{eturn bq_.size() capacity_;
} isQueueEmpty()判断队列是否为空
当消费者试图从队列中取出数据时如果队列为空则消费者需要等待直到队列中有数据可取以避免消费者线程空转浪费资源。
isQueueFull()判断队列是否已满
当生产者试图向队列中放入数据时如果队列已满则生产者需要等待直到队列有空闲位置以避免向已满的队列中添加数据。 3)实现push和pop方法 void push(const T in) // 生产者{pthread_mutex_lock(mtx_);while(isQueueFull()) pthread_cond_wait(Full_, mtx_);bq_.push(in);if(bq_.size() capacity_/2) pthread_cond_signal(Empty_);pthread_mutex_unlock(mtx_);} void pop(T *out){pthread_mutex_lock(mtx_);while (isQueueEmpty())pthread_cond_wait(Empty_, mtx_);*out bq_.front();bq_.pop();pthread_cond_signal(Full_);pthread_mutex_unlock(mtx_);} 判断是否满足生产消费条件时不能用if而应该用while
pthread_cond_wait函数是让当前执行流进行等待的函数是函数就意味着有可能调用失败调用失败后该执行流就会继续往后执行。其次在多消费者的情况下当生产者生产了一个数据后如果使用pthread_cond_broadcast函数唤醒消费者就会一次性唤醒多个消费者但待消费的数据只有一个此时其他消费者就被伪唤醒了。为了避免出现上述情况我们就要让线程被唤醒后再次进行判断确认是否真的满足生产消费条件因此这里必须要用while进行判断。 4)完整代码 #pragma once
#include iostream
#include queue
#include mutex
#include pthread.h
const int gDefaultCap 5;
template class T
class BlockQueue
{
private:bool isQueueEmpty(){return bq_.size() 0;}bool isQueueFull(){return bq_.size() capacity_;}public:BlockQueue(int capacity gDefaultCap) : capacity_(capacity){pthread_mutex_init(mtx_, nullptr);pthread_cond_init(Empty_, nullptr);pthread_cond_init(Full_, nullptr);}void push(const T in) // 生产者{pthread_mutex_lock(mtx_);while(isQueueFull()) pthread_cond_wait(Full_, mtx_);bq_.push(in);if(bq_.size() capacity_/2) pthread_cond_signal(Empty_);pthread_mutex_unlock(mtx_);} void pop(T *out){pthread_mutex_lock(mtx_);while (isQueueEmpty())pthread_cond_wait(Empty_, mtx_);*out bq_.front();bq_.pop();pthread_cond_signal(Full_);pthread_mutex_unlock(mtx_);}~BlockQueue(){pthread_mutex_destroy(mtx_);pthread_cond_destroy(Empty_);pthread_cond_destroy(Full_);}
private:std::queueT bq_; // 阻塞队列int capacity_; // 容量上限pthread_mutex_t mtx_; // 通过互斥锁保证队列安全pthread_cond_t Empty_; // 用它来表示bq 是否空的条件pthread_cond_t Full_; // 用它来表示bq 是否满的条件
};22.4.2 [可选] 修改成RAII风格代码
我们可以定义了一个 lockGuard 类采用 RAII资源获取即初始化方式对互斥锁进行加锁和解锁确保在作用域结束时自动释放锁。
这里我们创建一个名为lockGuard.hpp的文件来定义lockGuard类
#pragma once
#include iostream
#include pthread.h
class lockGuard
{
public:lockGuard(pthread_mutex_t *mtx) : mtx_(mtx){pthread_mutex_lock(mtx_);}~lockGuard(){pthread_mutex_unlock(mtx_);}private:pthread_mutex_t *mtx_; // 指向要管理的互斥锁的指针
};在lockGuard类的构造函数中首先通过传入的pthread_mutex_t类型的指针初始化mtx_成员变量即指向要管理的互斥锁。然后调用pthread_mutex_lock函数对该互斥锁进行加锁操作。
在lockGuard类的析构函数中调用pthread_mutex_unlock函数对互斥锁进行解锁操作。由于该析构函数在对象生命周期结束时自动调用因此实现了互斥锁的自动释放。这样在使用lockGuard对象时只需要在作用域中创建该对象当对象离开作用域时析构函数会自动调用从而释放互斥锁确保了互斥锁的安全管理。 修改后的Blocking Queue代码 #pragma once
#include iostream
#include queue
#include mutex
#include pthread.h
#include lockGuard.hpp
const int gDefaultCap 5;
template class T
class BlockQueue
{
private:bool isQueueEmpty(){return bq_.size() 0;}bool isQueueFull(){return bq_.size() capacity_;}
public:BlockQueue(int capacity gDefaultCap) : capacity_(capacity){pthread_mutex_init(mtx_, nullptr);pthread_cond_init(Empty_, nullptr);pthread_cond_init(Full_, nullptr);}void push(const T in) // 生产者{lockGuard lockgrard(mtx_); // 自动调用构造函数while (isQueueFull())pthread_cond_wait(Full_, mtx_);bq_.push(in);if(bq_.size() capacity_/2) pthread_cond_signal(Empty_);} // 自动调用lockgrard 析构函数void pop(T *out){lockGuard lockguard(mtx_);while (isQueueEmpty())pthread_cond_wait(Empty_, mtx_);*out bq_.front();bq_.pop();pthread_cond_signal(Full_);}~BlockQueue(){pthread_mutex_destroy(mtx_);pthread_cond_destroy(Empty_);pthread_cond_destroy(Full_);}
private:std::queueT bq_; // 阻塞队列int capacity_; // 容量上限pthread_mutex_t mtx_; // 通过互斥锁保证队列安全pthread_cond_t Empty_; // 用它来表示bq 是否空的条件pthread_cond_t Full_; // 用它来表示bq 是否满的条件
};22.4.3 定义Blocking Queue中存放Task类任务
现在我么已经实现了BlockQueue的逻辑,但是我们需要实现生产者生产资源后通过阻塞队列派发给消费者,这里我们不妨将派发的资源定义为一个Task类,生产者将Task任务派发给消费者完成
这里我们创建一个名为Task.hpp的文件来定义Task类
#pragma once
#include iostream
#include functional
typedef std::functionint(int, int) func_t;
class Task
{
public:Task(){}Task(int x, int y, func_t func):x_(x), y_(y), func_(func){}int operator ()(){return func_(x_, y_);}
public:int x_;int y_;func_t func_;
};
重载了函数调用运算符 operator()使得 Task 类的对象可以像函数一样被调用。在这个运算符重载函数中调用了成员变量 func_ 所指向的函数对象并传入 x_ 和 y_ 作为参数返回函数调用的结果。
22.4.4 生产者-消费者模型主函数实现
这里我们创建一个名为pro-con.cc的文件来模拟实现生产者-消费者模型 1) 实现主函数 int main()
{srand((uint64_t)time(nullptr) ^ getpid() ^ 0x32457);BlockQueueTask *bqueue new BlockQueueTask();pthread_t c[2],p[2];pthread_create(p, nullptr, productor, bqueue);pthread_create(p 1, nullptr, productor, bqueue);sleep(1);pthread_create(c, nullptr, consumer, bqueue);pthread_create(c 1, nullptr, consumer, bqueue);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);delete bqueue;return 0;
}
srand((uint64_t)time(nullptr) ^ getpid() ^ 0x32457);用于初始化随机数生成器的种子。BlockQueueTask *bqueue new BlockQueueTask();创建了一个 BlockQueue 类型的阻塞队列对象。pthread_create创建了两个消费者线程和两个生产者线程并分别传入相应的函数指针和参数。pthread_join等待所有线程的完成。delete bqueue;释放了动态分配的阻塞队列对象的内存空间。 2)定义任务函数 我们设计的任务函数是两个参数的类型,为了方便演示,这里我们就简单写了一个加法Add函数来实现(有兴趣可以自己DIY!)
int myAdd(int x, int y)
{return x y;
} 3)定义消费者函数 consumer,生产者函数 producer void* consumer(void *args)
{BlockQueueTask *bqueue (BlockQueueTask *)args;while(true){Task t;bqueue-pop(t);std::cout pthread_self() consumer: t.x_ t.y_ t() std::endl;}return nullptr;
}
void* productor(void *args)
{BlockQueueTask *bqueue (BlockQueueTask *)args;while(true){int x rand()%10 1;usleep(rand()%1000);int y rand()%5 1;Task t(x, y, myAdd);bqueue-push(t);std::cout pthread_self() productor: t.x_ t.y_ ? std::endl;sleep(1);}return nullptr;
}
void* consumer(void *args)消费者线程的入口函数。它接收一个 BlockQueueTask 类型的参数并不断地从阻塞队列中取出任务对象并执行任务函数。执行完毕后打印出任务的计算结果。void* productor(void *args)生产者线程的入口函数。它接收一个 BlockQueueTask 类型的参数并不断地生成随机的任务对象并将其推送到阻塞队列中。每个任务对象都包含两个随机生成的整数参数和任务函数的指针。生产者线程每次生成任务后都会打印出任务的描述。 4)完整代码 #include BlockQueue.hpp
#include pthread.h
#include unistd.h
#include ctime
#include Task.hpp
int myAdd(int x, int y)
{return x y;
}
void* consumer(void *args)
{BlockQueueTask *bqueue (BlockQueueTask *)args;while(true){Task t;bqueue-pop(t);std::cout pthread_self() consumer: t.x_ t.y_ t() std::endl;}return nullptr;
}
void* productor(void *args)
{BlockQueueTask *bqueue (BlockQueueTask *)args;while(true){int x rand()%10 1;usleep(rand()%1000);int y rand()%5 1;Task t(x, y, myAdd);bqueue-push(t);std::cout pthread_self() productor: t.x_ t.y_ ? std::endl;sleep(1);}return nullptr;
}int main()
{srand((uint64_t)time(nullptr) ^ getpid() ^ 0x32457);BlockQueueTask *bqueue new BlockQueueTask();pthread_t c[2],p[2];pthread_create(p, nullptr, productor, bqueue);pthread_create(p 1, nullptr, productor, bqueue);sleep(1);pthread_create(c, nullptr, consumer, bqueue);pthread_create(c 1, nullptr, consumer, bqueue);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);delete bqueue;return 0;
}
22.4.5 makefile编译
pro-con:pro-con.ccg -o $ $^ -stdc11 -lpthread
.PHONY:clean
clean:rm -f pro-con
22.4.6 效果展示