网站建设图片改不了,博客网站哪个权重高,五原网站建设,在网站设计公司上班好吗Linux线程概念 线程在进程内部执行,是OS调度的基本单位OS是可以做到让进程进行资源的细粒度划分的物理内存是以4kb为单位的我们的.exe可执行程序本来就是按照地址空间的方式进行编译的页表映射 - 详细图 理解线程 线程在进程的地址空间内运行, 进程内部具有多个执行流的,而线程…Linux线程概念 线程在进程内部执行,是OS调度的基本单位OS是可以做到让进程进行资源的细粒度划分的物理内存是以4kb为单位的我们的.exe可执行程序本来就是按照地址空间的方式进行编译的页表映射 - 详细图 理解线程 线程在进程的地址空间内运行, 进程内部具有多个执行流的,而线程只有一个执行流,线程独有的一组寄存器,栈等等
CPU的视角
Linux下,PCB 其他OS内的PCB的,所以Linux下的进程:统一称之为轻量级进程CPU其实不关心执行流是进程还是线程,只关心PCB
Linux没有真正意义上的线性结构,Linux是用进程PCB模拟线程的,
Linux并不能直接给我们提供线程的接口,只能提供轻量级进程的接口! 它也在用户层实现了一套用户层多线程方案,以库的方式提供给用户进行使用pthread线程库 -- 原生线程库
为什么线程切换的成本更低
CPU内部是有L1~L3cache 对应内存的代码和数据,根据局部性原理,预读CPU内部!!! 线程是不用切换页表 改变地址空间,进程需要切换cache就会立即失效,新进程过来,只能重新缓存,
创建线程
pthread_create 这段代码也可以证明CPU是用LWP来区分线程
线程ID及进程地址空间布局 pthread_ create函数会产生一个线程ID存放在第一个参数指向的地址中。该线程ID和前面说的线程ID 不是一回事。 前面讲的线程ID属于进程调度的范畴。因为线程是轻量级进程是操作系统调度器的最小单位所以需要一个数值来唯一表示该线程。 pthread_ create函数第一个参数指向一个虚拟内存单元该内存单元的地址即为新创建线程的线程ID pthread_self 可以获得线程自身的IDpthread_t类型的理解 对于Linux目前实现的NPTL实现而言pthread_t类型的线程ID本质就是一个进程地址空间上的一个地址。线程终止
如果需要只终止某个线程而不终止整个进程,可以有三种方法: 从线程函数return。这种方法对主线程不适用,从main函数return相当于调用exit。 线程可以调用pthread_ exit终止自己。 一个线程可以调用pthread_ cancel终止同一进程中的另一个线程。 pthread_cancel 线程等待
为什么需要线程等待已经退出的线程其空间没有被释放仍然在进程的地址空间内。 创建新的线程不会复用刚才退出线程的地址空间。
pthread_join 分离线程 默认情况下新创建的线程是joinable的线程退出后需要对其进行pthread_join操作否则无法释放资源从而造成系统泄漏。 如果不关心线程的返回值join是一种负担这个时候我们可以告诉系统当线程退出时自动释放线程资源。
pthread_detach Linux线程互斥 临界资源多线程执行流共享的资源就叫做临界资源 临界区每个线程内部访问临界资源的代码就叫做临界区 互斥任何时刻互斥保证有且只有一个执行流进入临界区访问临界资源通常对临界资源起保护作用原子性 不会被任何调度机制打断的操作该操作只有两态要么完成要么未完成
#include stdio.h
#include stdlib.h
#include string.h
#include unistd.h
#include pthread.h
int ticket 100;
void *route(void *arg)
{char *id (char*)arg;while ( 1 ) {if ( ticket 0 ) {usleep(1000);printf(%s sells ticket:%d\n, id, ticket);ticket--;} else {break;}}
}int main( void )
{pthread_t t1, t2, t3, t4;pthread_create(t1, NULL, route, (void*)thread 1);pthread_create(t2, NULL, route, (void*)thread 2);pthread_create(t3, NULL, route, (void*)thread 3);pthread_create(t4, NULL, route, (void*)thread 4);pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);
} 多个线程并发的操作共享变量会带来一些问题。
为什么可能无法获得争取结果 if 语句判断条件为真以后代码可以并发的切换到其他线程 usleep 这个模拟漫长业务的过程在这个漫长的业务过程中可能有很多个线程会进入该代码段 --ticket 操作本身就不是一个原子操作 要解决以上问题需要做到三点
代码必须要有互斥行为当代码进入临界区执行时不允许其他线程进入该临界区。 如果多个线程同时要求执行临界区的代码并且临界区没有线程在执行那么只能允许一个线程进入该临界区。 如果线程不在临界区中执行那么该线程不能阻止其他线程进入临界区。
要做到这三点本质上就是需要一把锁。Linux上提供的这把锁叫互斥量。
pthread_mutex_lock pthread_mutex_unlock #include iostream
#include thread
#include cerrno
#include cstring
#include unistd.h
#include pthread.h
#include time.h
#include cassert
#include cstdiousing namespace std;int tickets 10000; // 在并发访问的时候导致了我们数据不一致的问题临界资源#define THREAD_NUM 10// 线程数量class ThreadData
{
public:ThreadData(const std::string n, pthread_mutex_t* pm) :tname(n), pmtx(pm){}
public:std::string tname;pthread_mutex_t* pmtx;
};void* getTickets(void* args)
{ThreadData* td (ThreadData*)args;while (true){// 抢票逻辑int n pthread_mutex_lock(td-pmtx);// 加锁assert(n 0);// 临界区if (tickets 0) // 1. 判断的本质也是计算的一种{usleep(rand() % 1500);printf(%s: %d\n, td-tname.c_str(), tickets);tickets--; // 2. 也可能出现问题n pthread_mutex_unlock(td-pmtx);// 解锁assert(n 0);}else {n pthread_mutex_unlock(td-pmtx);// 解锁assert(n 0);break;}// 抢完票其实还需要后续的动作usleep(rand() % 2000);}delete td;return nullptr;
}int main()
{time_t start time(nullptr);pthread_mutex_t mtx;// 局部变量 pthread_mutex_init初始化pthread_mutex_init(mtx, nullptr);srand((unsigned long)time(nullptr) ^ getpid() ^ 0x147);// 让随机数更随机pthread_t t[THREAD_NUM];// 多线程抢票的逻辑for (int i 0; i THREAD_NUM; i){std::string name thread ;name std::to_string(i 1);ThreadData* td new ThreadData(name, mtx);pthread_create(t i, nullptr, getTickets, (void*)td);}for (int i 0; i THREAD_NUM; i){pthread_join(t[i], nullptr);}pthread_mutex_destroy(mtx);time_t end time(nullptr);cout cast: (int)(end - start) S endl;
} pthread_mutex_t 就是原生线程库提供的一个数据类型 如果多线程访问同一个全局变量并对它进行数据计算多线程不会互相影响 可以定义一个全局变量 pthread_mutex_t mtx PTHREAD_MUTEX_INITIALIZER;加锁保护加锁的时候一定要保证加锁的力度越小越好加锁之后就是串行执行的了,线程在临界区,切换也不会有什么问题, 锁只有一把,线程切换走了,持有锁也会被带走,而其他抢票的线程要执行临界区的代码又要申请锁,这时锁是无法申请成功的,也就不会让其他线程进入临界区,保证了临界区中数据的一致性
对锁的理解 在上那段代码中,锁只有一把,拿1的汇编只有1条一个线程被切换走,它会将自己的数据和对应执行到的汇编
补充一点: 可重入函数就是安全的
常见锁概念
死锁 一把锁也可能发生死锁问题
死锁四个必要条件
互斥条件一个资源每次只能被一个执行流使用 请求与保持条件一个执行流因请求资源而阻塞时对已获得的资源保持不放 不剥夺条件:一个执行流已获得的资源在末使用完之前不能强行剥夺 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
避免死锁 破坏死锁的四个必要条件 加锁顺序一致 避免锁未释放的场景 资源一次性分配
Linux线程同步
条件变量 当一个线程互斥地访问某个变量时它可能发现在其它线程改变状态之前它什么也做不了。 例如一个线程访问队列时发现队列为空它只能等待直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
同步概念与竞态条件
同步在保证数据安全的前提下让线程能够按照某种特定的顺序访问临界资源从而有效避免饥饿问题叫做同步 竞态条件因为时序问题而导致程序异常我们称之为竞态条件。在线程场景下这种问题也不难理解 引入同步: 主要是为了解决访问临界资源合理性问题的,使线程按照一定顺序,进行临界资源的访问,线程同步
方案一: 条件变量 当我们申请临界资源前,先要做临界资源是否存在的检测,而检测的本质: 也是访问临界资源所以对临界资源的检测,也一定需要在加锁和解锁之间的#include iostream
#include string
#include unistd.h
#include pthread.h#define TNUM 4
typedef void (*func_t)(const std::string name,pthread_mutex_t *pmtx, pthread_cond_t *pcond);
volatile bool quit false;class ThreadData
{
public:ThreadData(const std::string name, func_t func, pthread_mutex_t *pmtx, pthread_cond_t *pcond):name_(name), func_(func), pmtx_(pmtx), pcond_(pcond){}
public:std::string name_;func_t func_;pthread_mutex_t *pmtx_;pthread_cond_t *pcond_;
};void func1(const std::string name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{while(!quit){// wait一定要在加锁和解锁之间进行wait// v2: pthread_mutex_lock(pmtx);// if(临界资源是否就绪-- 否) pthread_cond_waitpthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候wait代码被执行当前线程会被立即被阻塞std::cout name running -- 播放 std::endl;pthread_mutex_unlock(pmtx);}
}void func2(const std::string name,pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{while(!quit){pthread_mutex_lock(pmtx);pthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候wait代码被执行当前线程会被立即被阻塞if(!quit) std::cout name running -- 下载 std::endl;pthread_mutex_unlock(pmtx);}
}
void func3(const std::string name,pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{while(!quit){pthread_mutex_lock(pmtx);pthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候wait代码被执行当前线程会被立即被阻塞std::cout name running -- 刷新 std::endl;pthread_mutex_unlock(pmtx);}
}
void func4(const std::string name,pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{while(!quit){pthread_mutex_lock(pmtx);pthread_cond_wait(pcond, pmtx); //默认该线程在执行的时候wait代码被执行当前线程会被立即被阻塞std::cout name running -- 扫码用户信息 std::endl;pthread_mutex_unlock(pmtx);}
}void *Entry(void *args)
{ThreadData *td (ThreadData*)args; //td在每一个线程自己私有的栈空间中保存td-func_(td-name_, td-pmtx_, td-pcond_); // 它是一个函数调用完成就要返回delete td;return nullptr;
}int main()
{pthread_mutex_t mtx;pthread_cond_t cond;// 条件变量pthread_mutex_init(mtx, nullptr);pthread_cond_init(cond, nullptr);// 初始化条件变量pthread_t tids[TNUM];func_t funcs[TNUM] {func1, func2, func3, func4};// 函数指针数组for (int i 0; i TNUM; i){std::string name Thread ;name std::to_string(i 1);ThreadData *td new ThreadData(name, funcs[i], mtx, cond);pthread_create(tids i, nullptr, Entry, (void*)td);}sleep(5);// ctrl new threadint cnt 10;while(cnt){std::cout resume thread run code .... cnt-- std::endl;pthread_cond_signal(cond);// 唤醒等待sleep(1);}std::cout ctrl done std::endl;quit true;pthread_cond_broadcast(cond);// 唤醒等待for(int i 0; i TNUM; i){pthread_join(tids[i], nullptr);std::cout thread: tids[i] quit std::endl;}pthread_mutex_destroy(mtx);pthread_cond_destroy(cond);return 0;
} 生产者消费者模型 321原则
3种关系: 生产者和生产者(竞争,互斥)消费者和消费者(竞争,互斥)生产者和消费者(互斥,同步)2种角色: 生产者和消费者1个交易场所 超市BlockQueue.hpp
#pragma once#include iostream
#include queue
#include mutex
#include pthread.h
#include lockGuard.hppconst int gDefaultCap 5;// 默认容量template class T
class BlockQueue
{
private:bool isQueueEmpty(){return bq_.size() 0;}bool isQueueFull(){return bq_.size() capacity_;}
public:// 构造BlockQueue(int capacity gDefaultCap) : capacity_(capacity){pthread_mutex_init(mtx_, nullptr);pthread_cond_init(Empty_, nullptr);pthread_cond_init(Full_, nullptr);}void push(const T in) // 生产者{// 1. 先检测当前的临界资源是否能够满足访问条件lockGuard lockgrard(mtx_); // 自动调用构造函数while (isQueueFull()){pthread_cond_wait(Full_, mtx_);}// 2. 访问临界资源100%确定资源是就绪的bq_.push(in);pthread_cond_signal(Empty_);// 局部变量 出了作用域会自动调用lockgrard 析构函数} void pop(T *out){lockGuard lockguard(mtx_);while (isQueueEmpty())pthread_cond_wait(Empty_, mtx_);*out bq_.front();bq_.pop();pthread_cond_signal(Full_);}~BlockQueue(){pthread_mutex_destroy(mtx_);pthread_cond_destroy(Empty_);pthread_cond_destroy(Full_);}
private:std::queueT bq_; // 阻塞队列int capacity_; // 容量上限pthread_mutex_t mtx_; // 通过互斥锁保证队列安全pthread_cond_t Empty_; // 用它来表示bq 是否空的条件pthread_cond_t Full_; // 用它来表示bq 是否满的条件
};
ConProd.cc
#include BlockQueue.hpp
#include Task.hpp#include pthread.h
#include unistd.h
#include ctimeint myAdd(int x, int y)
{return x y;
}void* consumer(void *args)
{BlockQueueTask *bqueue (BlockQueueTask *)args;while(true){// 获取任务Task t;bqueue-pop(t);// 完成任务std::cout pthread_self() consumer: t.x_ t.y_ t() std::endl;// sleep(1);}return nullptr;
}// 生产者
void* productor(void *args)
{BlockQueueTask *bqueue (BlockQueueTask *)args;while(true){// 制作任务int x rand()%10 1;usleep(rand()%1000);int y rand()%5 1;Task t(x, y, myAdd);// 生产任务bqueue-push(t);// 输出消息std::cout pthread_self() productor: t.x_ t.y_ ? std::endl;sleep(1);}return nullptr;
}int main()
{srand((uint64_t)time(nullptr) ^ getpid() ^ 0x32457);BlockQueueTask *bqueue new BlockQueueTask();pthread_t c[2],p[2];pthread_create(c, nullptr, consumer, bqueue);pthread_create(c 1, nullptr, consumer, bqueue);pthread_create(p, nullptr, productor, bqueue);pthread_create(p 1, nullptr, productor, bqueue);pthread_join(c[0], nullptr);// 等待pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);delete bqueue;return 0;
}
Task.hpp
#pragma once#include iostream
#include functionaltypedef std::functionint(int, int) func_t;class Task
{public:Task(){}Task(int x, int y, func_t func):x_(x), y_(y), func_(func){}int operator ()(){return func_(x_, y_);}
public:int x_;int y_;func_t func_;
};
lockGuard.hpp
#pragma once#include iostream
#include pthread.h
// 定义一把锁
class Mutex
{
public:Mutex(pthread_mutex_t *mtx):pmtx_(mtx){}void lock() {std::cout 要进行加锁 std::endl;pthread_mutex_lock(pmtx_);}void unlock(){std::cout 要进行解锁 std::endl;pthread_mutex_unlock(pmtx_);}~Mutex(){}
private:pthread_mutex_t *pmtx_;
};// RAII风格的加锁方式
class lockGuard
{
public:lockGuard(pthread_mutex_t *mtx):mtx_(mtx){mtx_.lock();}~lockGuard(){mtx_.unlock();}
private:Mutex mtx_;
};
Makefile
cp:ConProd.ccg -o $ $^ -stdc11 -lpthread
.PHONY:clean
clean:rm -f cp
pthread_cond_wait第二个参数是一个锁当成功调用wait之后传入的锁会被自动释放从哪里阻塞挂起就从哪里唤醒, 被唤醒的时候我们还是在临界区被唤醒的啊RAII风格的加锁(封装了一个锁的类)
POSIX信号量
信号量的概念 共享资源-任何时候都只有一个执行流在进行访问,如果一个共享资源,不当做一个整体,而让不同的执行流访问不同的局域的话,那就可以并发我们用信号量来表示这段共享资源中还剩多少个资源,信号量本质:
是一个计数器,访问临界资源的时候,必须先申请信号量资源(sem--,预订资源,P),使用完毕信号量资源(sem,释放资源,V)环形队列实现生产者消费者模型 生产者不能将消费者套圈,消费者不能超过生产者,为空: 一定要让生产者先运行为满: 一定要让消费者先运行生产者: 最关心的是空间资源-spaceSem-N P(spaceSem),将会在特定位置生产,然后V(dataSem)
消费者: 最关心的是数据资源-dataSem-0
P(dataSem),消费特定的数据,然后V(spaceSem)Makefile
ring_queue:testMain.ccg -o $ $^ -stdc11 -lpthread
.PHONY:clean
clean:rm -f ring_queue ringQueue.hpp
#ifndef _Ring_QUEUE_HPP_
#define _Ring_QUEUE_HPP_#include iostream
#include vector
#include pthread.h
#include sem.hppconst int g_default_num 5;// 多线程
templateclass T
class RingQueue
{
public:RingQueue(int default_num g_default_num): ring_queue_(default_num), num_(default_num),c_step(0),p_step(0),space_sem_(default_num),data_sem_(0){// 初始化锁pthread_mutex_init(clock, nullptr);pthread_mutex_init(plock, nullptr);}~RingQueue(){// 销毁锁pthread_mutex_destroy(clock);pthread_mutex_destroy(plock);}// 生产者: 空间资源, 生产者们的临界资源是什么下标void push(const T in){// 先申请信号量(0)space_sem_.p();// --pthread_mutex_lock(plock); // ?// 一定是竞争成功的生产者线程 -- 就一个ring_queue_[p_step] in;p_step % num_;pthread_mutex_unlock(plock);data_sem_.v();// }// 消费者: 数据资源 消费者们的临界资源是什么下标void pop(T *out){data_sem_.p();pthread_mutex_lock(clock);// 一定是竞争成功的消费者线程 -- 就一个*out ring_queue_[c_step];c_step % num_;pthread_mutex_unlock(clock);space_sem_.v();}// void debug()// {// std::cerr size: ring_queue_.size() num: num std::endl;// }
private:std::vectorT ring_queue_;int num_;int c_step; // 消费下标int p_step; // 生产下标Sem space_sem_;// 记录空间的信号量Sem data_sem_;// 记录空间数据的信号量pthread_mutex_t clock;// 消费者的锁pthread_mutex_t plock;// 生产者的锁
};#endif
sem.hpp
#ifndef _SEM_HPP_
#define _SEM_HPP_#include iostream
#include semaphore.hclass Sem
{
public:Sem(int value){// 信号量初始化sem_init(sem_, 0, value);}void p(){// 等待信号量会将信号量的值减1sem_wait(sem_);}void v(){// 发布信号量,会将信号量的值加1sem_post(sem_);}~Sem(){// 销毁相互量sem_destroy(sem_);}
private:sem_t sem_;
};#endif testMain.cc
#include ringQueue.hpp
#include cstdlib
#include ctime
#include sys/types.h
#include unistd.h// 消费者
void *consumer(void *args)
{// 生产者和消费者看到的是同一个rqRingQueueint *rq (RingQueueint *)args;while(true){sleep(1);int x;// 1. 从环形队列中获取任务或者数据rq-pop(x);// 2. 进行一定的处理 -- 不要忽略它的时间消耗问题std::cout 生产: x [ pthread_self() ] std::endl;}
}// 生产者
void *productor(void *args)
{// 生产者和消费者看到的是同一个rqRingQueueint *rq (RingQueueint *)args;while(true){// sleep(1);// 1. 构建数据或者任务对象 -- 一般是可以从外部来 -- 不要忽略它的时间消耗问题int x rand() % 100 1;std::cout 消费: x [ pthread_self() ] std::endl;// 2. 推送到环形队列中rq-push(x); // 完成生产的过程}
}int main()
{srand((uint64_t)time(nullptr) ^ getpid());// 产生随机数RingQueueint *rq new RingQueueint();// 多线程模式pthread_t c[3],p[2];pthread_create(c, nullptr, consumer, (void*)rq);pthread_create(c1, nullptr, consumer, (void*)rq);pthread_create(c2, nullptr, consumer, (void*)rq);pthread_create(p, nullptr, productor, (void*)rq);pthread_create(p1, nullptr, productor, (void*)rq);for(int i 0; i 3; i) pthread_join(c[i], nullptr);for(int i 0; i 2; i) pthread_join(p[i], nullptr);return 0;
}多生产多消费的意义 将数据或者任务生产前和拿到之后处理才是最耗费时间的。 生产的本质私有的任务- 公共空间中 消费的本质公共空间中的任务- 私有化
信号量的意义
信号量本质是一把计数器, 可以不用进入临界区就可以得知资源情况甚至可以减少临界区内部的判断信号量可以提前预设资源的情况而且在pv变化过程中我们可以在外部就能知晓临界资源的情况
线程池
Makefile
thread_pool:testMain.ccg -o $ $^ -stdc11 -lpthread #-DDEBUG_SHOW
.PHONY:clean
clean:rm -f thread_pool
-DDEBUG_SHOW在命令中定义了一个宏,多和条件变量配合使用Task.hpp
#pragma once#include iostream
#include string
#include functional
#include log.hpptypedef std::functionint(int, int) func_t;class Task
{
public:Task(){}Task(int x, int y, func_t func):x_(x), y_(y), func_(func){}void operator ()(const std::string name){// std::cout 线程 name 处理完成, 结果是: x_ y_ func_(x_, y_) std::endl;logMessage(WARNING, %s处理完成: %d%d%d | %s | %d,name.c_str(), x_, y_, func_(x_, y_), __FILE__, __LINE__);}
public:int x_;int y_;// int type;func_t func_;
};
lockGuard.hpp
#pragma once#include iostream
#include pthread.hclass Mutex
{
public:Mutex(pthread_mutex_t *mtx):pmtx_(mtx){}void lock() {// std::cout 要进行加锁 std::endl;pthread_mutex_lock(pmtx_);}void unlock(){// std::cout 要进行解锁 std::endl;pthread_mutex_unlock(pmtx_);}~Mutex(){}
private:pthread_mutex_t *pmtx_;
};// RAII风格的加锁方式
class lockGuard
{
public:lockGuard(pthread_mutex_t *mtx):mtx_(mtx){mtx_.lock();}~lockGuard(){mtx_.unlock();}
private:Mutex mtx_;
};log.hpp
#pragma once#include iostream
#include cstdio
#include cstdarg
#include ctime
#include string// 日志是有日志级别的
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4const char *gLevelMap[] {DEBUG,NORMAL,WARNING,ERROR,FATAL
};#define LOGFILE ./threadpool.log// 完整的日志功能至少: 日志等级 时间 支持用户自定义(日志内容, 文件行文件名)
void logMessage(int level, const char *format, ...)
{
#ifndef DEBUG_SHOWif(level DEBUG) return;
#endif// va_list ap;// va_start(ap, format);// while()// int x va_arg(ap, int);// va_end(ap); //apnullptrchar stdBuffer[1024]; //标准部分time_t timestamp time(nullptr);// struct tm *localtime localtime(timestamp);snprintf(stdBuffer, sizeof stdBuffer, [%s] [%ld] , gLevelMap[level], timestamp);char logBuffer[1024]; //自定义部分va_list args;va_start(args, format);// vprintf(format, args);vsnprintf(logBuffer, sizeof logBuffer, format, args);va_end(args);FILE *fp fopen(LOGFILE, a);// printf(%s%s\n, stdBuffer, logBuffer);fprintf(fp, %s%s\n, stdBuffer, logBuffer);fclose(fp);
} thread.hpp
#pragma once
#include iostream
#include string
#include functional
#include cstdio// typedef std::functionvoid* (void*) fun_t;
typedef void* (*fun_t)(void*);class ThreadData
{
public:void* args_;std::string name_;
};class Thread
{
public:Thread(int num, fun_t callback, void* args) : func_(callback){char nameBuffer[64];snprintf(nameBuffer, sizeof nameBuffer, Thread-%d, num);name_ nameBuffer;tdata_.args_ args;tdata_.name_ name_;}void start(){pthread_create(tid_, nullptr, func_, (void*)tdata_);}void join(){pthread_join(tid_, nullptr);}std::string name(){return name_;}~Thread(){}private:std::string name_;fun_t func_;ThreadData tdata_;pthread_t tid_;
};
threadPool.hpp
#pragma once#include iostream
#include vector
#include string
#include queue
#include unistd.h
#include thread.hpp
#include lockGuard.hpp
#include log.hppconst int g_thread_num 3;
// 本质是: 生产消费模型
template class T
class ThreadPool
{
public:pthread_mutex_t* getMutex(){return lock;}bool isEmpty(){return task_queue_.empty();}void waitCond(){pthread_cond_wait(cond, lock);}T getTask(){T t task_queue_.front();task_queue_.pop();return t;}private:ThreadPool(int thread_num g_thread_num) : num_(thread_num){pthread_mutex_init(lock, nullptr);pthread_cond_init(cond, nullptr);for (int i 1; i num_; i){threads_.push_back(new Thread(i, routine, this));}}ThreadPool(const ThreadPoolT other) delete;const ThreadPoolT operator(const ThreadPoolT other) delete;public:// 考虑一下多线程使用单例的过程static ThreadPoolT* getThreadPool(int num g_thread_num){// 可以有效减少未来必定要进行加锁检测的问题// 拦截大量的在已经创建好单例的时候剩余线程请求单例的而直接访问锁的行为if (nullptr thread_ptr){lockGuard lockguard(mutex);// 但是未来任何一个线程想获取单例都必须调用getThreadPool接口// 但是一定会存在大量的申请和释放锁的行为这个是无用且浪费资源的// pthread_mutex_lock(mutex);if (nullptr thread_ptr){thread_ptr new ThreadPoolT(num);}// pthread_mutex_unlock(mutex);}return thread_ptr;}// 1. run()void run(){for (auto iter : threads_){iter-start();// std::cout iter-name() 启动成功 std::endl;logMessage(NORMAL, %s %s, iter-name().c_str(), 启动成功);}}// 线程池本质也是一个生产消费模型// void *routine(void *args)// 消费过程static void* routine(void* args){ThreadData* td (ThreadData*)args;ThreadPoolT* tp (ThreadPoolT *)td-args_;while (true){T task;{lockGuard lockguard(tp-getMutex());while (tp-isEmpty())tp-waitCond();// 读取任务task tp-getTask(); // 任务队列是共享的- 将任务从共享拿到自己的私有空间}task(td-name_);// lock// while(task_queue_.empty()) wait();// 获取任务// unlock// 处理任务}}// 2. pushTask()void pushTask(const T task){lockGuard lockguard(lock);task_queue_.push(task);pthread_cond_signal(cond);}// test func// void joins()// {// for (auto iter : threads_)// {// iter-join();// }// }~ThreadPool(){for (auto iter : threads_){iter-join();delete iter;}pthread_mutex_destroy(lock);pthread_cond_destroy(cond);}private:std::vectorThread* threads_;int num_;std::queueT task_queue_;static ThreadPoolT* thread_ptr;static pthread_mutex_t mutex;// 方案2:// queue1,queue2// std::queueT *p_queue, *c_queue// p_queue-queue1// c_queue-queue2// p_queue - 生产一批任务之后swap(p_queue,c_queue),唤醒所有线程/一个线程// 当消费者处理完毕的时候你也可以进行swap(p_queue,c_queue)// 因为我们生产和消费用的是不同的队列未来我们要进行资源的处理的时候仅仅是指针pthread_mutex_t lock;pthread_cond_t cond;
};
template typename T
ThreadPoolT* ThreadPoolT::thread_ptr nullptr;template typename T
pthread_mutex_t ThreadPoolT::mutex PTHREAD_MUTEX_INITIALIZER; 进入{}创建lockGuard对象,并调用构造函数出{}调用lockGuard对象的析构函数这里使用双重判断,主要为了拦截大量的在已经创建好单例的时候,剩余线程请求单例的而直接访问锁的行为
testMain.cc
#include threadPool.hpp
#include Task.hpp
#include ctime
#include cstdlib
#include iostream
#include unistd.h// void *run(void *args)
// {
// while(true)
// {
// ThreadPoolTask::getThreadPool();
// }
// }int main()
{// logMessage(NORMAL, %s %d %c %f \n, 这是一条日志信息, 1234, c, 3.14);srand((unsigned long)time(nullptr) ^ getpid());// ThreadPoolTask *tp new ThreadPoolTask();// ThreadPoolTask *tp ThreadPoolTask::getThreadPool();// 那么如果单例本身也在被多线程申请使用呢ThreadPoolTask::getThreadPool()-run();//thread1,2,3,4while(true){//生产的过程制作任务的时候要花时间int x rand()%100 1;usleep(7721);int y rand()%30 1;Task t(x, y, [](int x, int y)-int{return x y;});// std::cout 制作任务完成: x y ? std::endl;logMessage(DEBUG, 制作任务完成: %d%d?, x, y);logMessage(DEBUG, 制作任务完成: %d%d?, x, y);logMessage(DEBUG, 制作任务完成: %d%d?, x, y);logMessage(DEBUG, 制作任务完成: %d%d?, x, y);// 推送任务到线程池中ThreadPoolTask::getThreadPool()-pushTask(t);sleep(1);}return 0;
}threadpool.log
[NORMAL] [1675916038] Thread-1 启动成功
[NORMAL] [1675916038] Thread-2 启动成功
[NORMAL] [1675916038] Thread-3 启动成功
[WARNING] [1675916038] Thread-1处理完成: 212041 | Task.hpp | 20
[WARNING] [1675916039] Thread-2处理完成: 693099 | Task.hpp | 20
[WARNING] [1675916040] Thread-3处理完成: 9721118 | Task.hpp | 20
[WARNING] [1675916041] Thread-1处理完成: 742195 | Task.hpp | 20
[WARNING] [1675916042] Thread-2处理完成: 302454 | Task.hpp | 20
[WARNING] [1675916043] Thread-3处理完成: 9517112 | Task.hpp | 20
[WARNING] [1675916044] Thread-1处理完成: 7627103 | Task.hpp | 20
[WARNING] [1675916045] Thread-2处理完成: 67774 | Task.hpp | 20
[WARNING] [1675916046] Thread-3处理完成: 38442 | Task.hpp | 20
[WARNING] [1675916047] Thread-1处理完成: 621981 | Task.hpp | 20
[WARNING] [1675916048] Thread-2处理完成: 9321114 | Task.hpp | 20
[WARNING] [1675916049] Thread-3处理完成: 641377 | Task.hpp | 20
[WARNING] [1675916050] Thread-1处理完成: 81384 | Task.hpp | 20
[WARNING] [1675916051] Thread-2处理完成: 8626112 | Task.hpp | 20
[WARNING] [1675916052] Thread-3处理完成: 242145 | Task.hpp | 20
[WARNING] [1675916053] Thread-1处理完成: 69675 | Task.hpp | 20