xampp可以做网站吗,响应式网站开发asp,网站建设哪些会影响价格,网站开发费用如何入账文章目录 前言CodeThreadPool.hppmain.cpp 简单讲解所需头文件using成员变量构造析构添加任务PS测试效果 END 前言 线程池_百度百科 (baidu.com) 线程池是一种多线程处理形式#xff0c;处理过程中将任务添加到队列#xff0c;然后在创建线程后自动启动这些任务。线程池线程都… 文章目录 前言CodeThreadPool.hppmain.cpp 简单讲解所需头文件using成员变量构造析构添加任务PS测试效果 END 前言 线程池_百度百科 (baidu.com) 线程池是一种多线程处理形式处理过程中将任务添加到队列然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小以默认的优先级运行并处于多线程单元中。如果某个线程在托管代码中空闲如正在等待某个事件,则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙但队列中包含挂起的工作则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队但他们要等到其他线程完成后才启动。 线程池主要有fixed模式和cached模式。
其中fixed模式实现起来比较简单。本文就是以此来编写。 多线程基础请看(C) 多线程之生产者消费者问题_c 多线程 生产者消费者_天赐细莲的博客-CSDN博客 cached模式的线程池可以参考程序喵达人的代码C线程池的实现 - 掘金 (juejin.cn) Code
ThreadPool.hpp
#include atomic
#include condition_variable
#include functional
#include mutex
#include queue
#include sstream
#include string
#include thread
#include vectornamespace lotus {
std::string get_threadID() {std::stringstream ss;ss std::this_thread::get_id();return ss.str();
}class ThreadPool {
public:// 统一将任务bind为 void(*)() 的函数形式using Task_Type std::functionvoid();private: // config// 后期根据这个将 fixed 模式改为 cached模式const size_t TASK_COUNT;private: // taskstd::vectorstd::unique_ptrstd::thread m_threadControlList;std::queueTask_Type m_taskQueue;private: // thread helperstd::atomicbool m_canRun;std::condition_variable m_condVar;std::mutex m_mutex;public:/*** brief Construct a new Thread Pool object* 尽量不要让线程数 cpu内核数* param taskCnt*/ThreadPool(const size_t taskCnt) : TASK_COUNT(taskCnt) {m_canRun (TASK_COUNT std::thread::hardware_concurrency());open_pool();}// copy prohibitedThreadPool(const ThreadPool) delete;ThreadPool operator(const ThreadPool) delete;/*** brief Destroy the Thread Pool object*/~ThreadPool() {close_pool();}public:/*** brief* 添加任务并让条件变量通知一次* 目前不处理返回值* tparam Fun* tparam Args* param fun* param args*/template typename Fun, typename... Argsvoid Add_task(Fun fun, Args... args) {std::lock_guardstd::mutex lock(m_mutex);auto task std::bind(std::forwardFun(fun), std::forwardArgs(args)...);m_taskQueue.push(std::move(task));m_condVar.notify_one();}private:/*** brief Create a thread object*/void create_thread() {std::lock_guardstd::mutex lock(m_mutex);auto createTaskThread [this]() {for (;;) {std::unique_lockstd::mutex lock(m_mutex);while (m_taskQueue.empty() m_canRun) {m_condVar.wait(lock);}if (false m_canRun) {break;}auto task std::move(m_taskQueue.front());m_taskQueue.pop();lock.unlock();task();} // while 1};auto thPtr std::make_uniquestd::thread(createTaskThread);m_threadControlList.emplace_back(std::move(thPtr));}private:/*** brief* 创建一定数量的线程* param taskCnt*/void open_pool() {for (size_t i 0; i TASK_COUNT; i 1) {create_thread();}}/*** brief* 运行标志改为 false* 并通知所有线程* 确保所有线程都join完*/void close_pool() {m_canRun false;m_condVar.notify_all();for (auto thPtr : m_threadControlList) {if (thPtr-joinable()) {thPtr-join();}}}
}; // class} // namespace lotumain.cpp
#include iostream#include ThreadPool.hpp
namespace my lotus;#ifdef _MSC_VER
#define __FUNC_NAME__ __FUNCSIG__
#elif defined(__GNUC__) || defined(__clang__)
#define __FUNC_NAME__ __PRETTY_FUNCTION__
#else
#define __FUNC_NAME__ __func__
#endif/*** brief* test fun* param waitTime* param str*/
void fun(int waitTime, const char* str) {const int N 5;printf([%s]time{%d}start\n, __FUNC_NAME__, waitTime);for (int i 0; i N; i 1) {std::this_thread::sleep_for(std::chrono::seconds(waitTime));auto threadId my::get_threadID();const char* idStr threadId.c_str();printf([%s]%s\n, idStr, str);}printf([%s]time{%d}end\n, __FUNC_NAME__, waitTime);
}/*** brief* main fun* param argc* param argv* return int*/
int main(int argc, const char** argv) {srand(time(0));printf([%s] start\n, __FUNC_NAME__);{my::ThreadPool pool(2);pool.Add_task(fun, 1, 11111111111111111111);pool.Add_task(fun, 2, 22222222222222222222);pool.Add_task(fun, 3, 33333333333333333333);system(pause);}printf([%s] end\n, __FUNC_NAME__);// getchar();
}简单讲解
所需头文件
#include atomic
#include condition_variable
#include functional
#include mutex
#include queue
#include sstream
#include string
#include thread
#include vectorusing
// 将所有任务bind成void(*)()的形式
using Task_Type std::functionvoid();成员变量
private: // config// 后期根据这个将 fixed 模式改为 cached模式const size_t TASK_COUNT;private: // taskstd::vectorstd::unique_ptrstd::thread m_threadControlList;// 任务队列std::queueTask_Type m_taskQueue;private: // thread helper// 终止标志std::atomicbool m_canRun;// 条件变量std::condition_variable m_condVar;// 互斥量std::mutex m_mutex;构造
传入fixed的线程数量 尽量不要 cpu核数。禁止拷贝操作。
public:// 传入fixed的线程数量ThreadPool(const size_t taskCnt) : TASK_COUNT(taskCnt) {// 尽量不要 cpu核数m_canRun (TASK_COUNT std::thread::hardware_concurrency());open_pool();}// 禁止拷贝ThreadPool(const ThreadPool) delete;ThreadPool operator(const ThreadPool) delete;初始化TASK_COUNT数量的线程。
每个线程写死循环不断等待和执行任务队列的任务。
基于条件变量std::condition_variable来进行阻塞。
private: void create_thread() {std::lock_guardstd::mutex lock(m_mutex);auto createTaskThread [this]() {for (;;) {std::unique_lockstd::mutex lock(m_mutex);while (m_taskQueue.empty() m_canRun) {m_condVar.wait(lock);}if (false m_canRun) {break;}auto task std::move(m_taskQueue.front());m_taskQueue.pop();lock.unlock();task();} // while 1};auto thPtr std::make_uniquestd::thread(createTaskThread);m_threadControlList.emplace_back(std::move(thPtr));}private:void open_pool() {for (size_t i 0; i TASK_COUNT; i 1) {create_thread();}}析构
public:~ThreadPool() {close_pool();}将运行标志改为false由于这里的标志是std::atomic。并且其余变量不涉及资源竞争因此无需加锁。
注意这里使用join()来保证每个线程是正常退出的。
private:void close_pool() {m_canRun false;m_condVar.notify_all();for (auto thPtr : m_threadControlList) {if (thPtr-joinable()) {thPtr-join();}}}添加任务
这里使用变参模板技术将任务包装成std::functionvoid()的一个可调用对象。
注意这里传入的是万能引用要使用std::forward。
每加入一个任务就让条件变量进行一次通知notify_one()。
public:template typename Fun, typename... Argsvoid Add_task(Fun fun, Args... args) {std::lock_guardstd::mutex lock(m_mutex);auto task std::bind(std::forwardFun(fun), std::forwardArgs(args)...);m_taskQueue.push(std::move(task));m_condVar.notify_one();}PS
由于std::this_thread::get_id()返回的是thread::id。对于cpp可以使用std::cout来进行输出。但是并没有对应的类型转化操作。且id的大小和数值类型在不同平台不同编译器中不一致。更不能用于printf虽然有的编译器支持。
因此用std::stringstream流进行一次间接的转化。
std::string get_threadID() {std::stringstream ss;ss std::this_thread::get_id();return ss.str();
}测试效果 g (x86_64-posix-seh-rev0, Built by MinGW-W64 project) 7.3.0 [int main(int, const char**)] start
[void fun(int, const char*)]time{1}start
[void fun(int, const char*)]time{2}start
请按任意键继续. . . [2]11111111111111111111
[3]22222222222222222222
[2]11111111111111111111
[2]11111111111111111111
[3]22222222222222222222
[2]11111111111111111111
[2]11111111111111111111
[void fun(int, const char*)]time{1}end
[void fun(int, const char*)]time{3}start
[3]22222222222222222222
[3]22222222222222222222
[2]33333333333333333333
[3]22222222222222222222
[void fun(int, const char*)]time{2}end
[2]33333333333333333333
[2]33333333333333333333
[2]33333333333333333333
[2]33333333333333333333
[void fun(int, const char*)]time{3}end[int main(int, const char**)] end最后的空行是对应system(pause)。由于在线程池的析构中使用了join()因此提前键入也可以。 END