网站建设资金,简述网站的推广策略,电话营销网站推广,公司网站怎么做实名认证ThreadPoolExecutor 是开发中最常用的线程池#xff0c;今天来简单学习一下它的用法以及内部构造。 1、线程池存在的意义#xff1f; 一般在jvm上#xff0c;用户线程和操作系统内核线程是1#xff1a;1的关系#xff0c;也就是说#xff0c;每次创建、销毁线程的时候今天来简单学习一下它的用法以及内部构造。 1、线程池存在的意义 一般在jvm上用户线程和操作系统内核线程是11的关系也就是说每次创建、销毁线程的时候都会进行内核调用产生比较大的开销有了线程池就可以重复的利用线程资源大幅度降低创建和回收的效率。
此外线程池还可以帮助我们维护线程ID线程状态等信息。 2、ThreadPoolExecutor的作用 开发者将任务提交给ThreadPoolExecutorThreadPoolExecutor负责分配工作线程Worker来执行任务任务完成后工作线程不进行回收而是继续等待后续任务。 3、如何使用ThreadPoolExecutor 直接new一个ThreadPoolExecutor使用的时候调用其excute方法传入一个任务即可。
ThreadPoolExecutor包含了7个入参
* corePoolSize : 核心线程数* maximumPoolSize 线程池中允许的最大线程数* keepAliveTime线程数目大于核心线程数时多余空闲线程在终止前等待新任务的最长时间* unit保存时间的单位* workQueue 用于任务执行前保存任务的队列* threadFactory 执行器创建新线程的工厂 Executors.defaultThreadFactory() 默认工厂* handler 拒绝策略由于达到线程边界和队列容量而阻止执行时使用的处理程序 new ThreadPoolExecutor.AbortPolicy()抛出异常
public class TestThreadPool {public static void main(String[] args) {/*** 共有七个参数* corePoolSize : 核心线程数* maximumPoolSize 线程池中允许的最大线程数* keepAliveTime线程数目大于核心线程数时多余空闲线程在终止前等待新任务的最长时间* unit保存时间的单位* workQueue 用于任务执行前保存任务的队列* threadFactory 执行器创建新线程的工厂 Executors.defaultThreadFactory() 默认工厂* handler 由于达到线程边界和队列容量而阻止执行时使用的处理程序 new ThreadPoolExecutor.AbortPolicy()抛出异常*/ExecutorService executorService new ThreadPoolExecutor(3, 5, 1L, TimeUnit.SECONDS,new ArrayBlockingQueue(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());for (int i 0; i 7; i) {int finalI i;executorService.execute(() - {System.out.println(Thread.currentThread().getName() 办理业务 finalI);});}executorService.shutdown();}
} 学会了如何使用接下来看看ThreadPoolExecutor的源码吧 1、ThreadPoolExecutor定义的一些属性 除了构造器中需要传入的7个参数外还要重点关注一下以下属性
ctl原子类int, ThreadPoolExecutor用这样一个32位的int类型维护了两个核心属性 线程池状态高三位表示工作线程的个数低29位表示因此一个ThreadPoolExecutor最多运行2^29个工作线程workers存放工作线程的集合是一个HashSet因此在添加工作线程到workers里面的时候要加锁保证线程安全mainLock一个ReentrantLock可重入锁 /*** ctl基于一个int类型维护了线程池的两个核心属性* 1 线程池的状态 高三位* 2 工作线程的个数低二十九位 -- 因此线程池中最多允许 2^29 个工作线程个数*/private final AtomicInteger ctl new AtomicInteger(ctlOf(RUNNING, 0));/*** 下面的参数就是为了方便运算定义的常量*/// Integer.SIZE32 因此COUNT_BITS29private static final int COUNT_BITS Integer.SIZE - 3;// 左移29位减一也就是2^29private static final int CAPACITY (1 COUNT_BITS) - 1;// 默认状态可以正常接收执行处理任务private static final int RUNNING -1 COUNT_BITS;// 执行shutdown()方法可以变成SHUTDOWN状态 优雅的关闭线程池// 不接受新任务但是可以处理完已经提交的任务private static final int SHUTDOWN 0 COUNT_BITS; // 执行shutdownNow()方法变成stop状态// 不接受新任务也不会处理阻塞队列中未执行的任务并设置正在执行的线程 中断标志位private static final int STOP 1 COUNT_BITS;// 所有任务执行完毕池子中的工作线程数为0等待执行 terminated()钩子方法private static final int TIDYING 2 COUNT_BITS;// terminated()钩子方法继承的时候可以实现一些自己的业务执行完毕private static final int TERMINATED 3 COUNT_BITS;
线程池状态之间的转换关系图如下 2、内部类Worker 这里只介绍一个主要的内部类Worker这个就是工作线程
继承AQS说明内部存在同步的需求为了使用AQS中的state状态实现了Runnable接口说明worker本身就是一个异步的任务调度者
当某个Worker获取到一个任务时便持有锁直到将任务在当前线程内执行完成后再释放锁然后在获取新的任务。 其实这里加锁其实分成了两个部分 Worker初始化的时候进行了一次加锁。state状态初始化为-1然后在初始化成功后去执行任务之前将state置为0,真正开始执行任务时先进行一次加锁执行完任务的时候进行解锁。 // Worker构造器
Worker(Runnable firstTask) {// AQS的方法将state设置为-1setState(-1); // inhibit interrupts until runWorker// 设置任务并使用 用户传入的线程工厂 创建线程this.firstTask firstTask;this.thread getThreadFactory().newThread(this);
} 2.1为什么要将state的状态设置为-1 答就是为了保证worker在“从初始化到开始执行任务”这个期间不接受中断信号以保证当前的worker能够正常初始化完成。 详细解释 1、首先看一下 Worker 内部类中定义了一个interruptIfStarted方法如果state为-1就不允许接收中断信号 void interruptIfStarted() {Thread t;if (getState() 0 (t thread) ! null !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}
} 2、worker开始执行任务的时候也就是调用run方法的时候会把state重新设置为0此时就可以接收中断信号了看代码。 2.2 线程池如何执行任务如何拉取工作队列中的任务 主要代码逻辑在runWorker方法中工作线程worker开始执行任务的时候会调用自己的run方法而runWorker的入参就是worker自身。
执行初始化的任务worker对象初始化的时候是携带了任务的那么就优先执行自身携带的任务 每次执行任务前先判断线程池状态执行从工作队列中拉取的任务从工作队列中拉取任务是通过getTask方法这个方法里面是一个死循环 首先还是判断线程池状态如果当前是核心线程那就执行take方法这个方法就是一直等待从工作队列中获取任务直到成功或者当前线程中断如果不是核心线程那就执行poll方法在指定时间内从工作队列中拉取任务超时就退出 --- 初始化线程池传递的keepAliveTime如果当前没有任务就要将工作线程关闭了getTask方法里面通过CAS修改了线程池的工作线程数目。---- 一个线程的关闭就是在run方法结束之后结合2中介绍如果是核心线程就会一直等待下去非核心线程在超时之后就会执行完runWorker方法然后关闭该工作线程。
// runWorker方法传入的是一个Worker对象
final void runWorker(Worker w) {// 获取当前线程并取出第一个任务Thread wt Thread.currentThread();Runnable task w.firstTask;w.firstTask null;w.unlock(); // allow interruptsboolean completedAbruptly true;try {// Worker对象启动的时候是携带了任务的优先执行携带的任务// 第一个循环结束task null, 然后就通过getTask()从工作队列中拉取任务while (task ! null || (task getTask()) ! null) {w.lock();// 判断当前线程池状态是否是stop如果是就强制中断当前线程if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() runStateAtLeast(ctl.get(), STOP))) !wt.isInterrupted())wt.interrupt();try {// 钩子函数beforeExecute(wt, task);Throwable thrown null;try {// 执行任务task.run();} catch (RuntimeException x) {thrown x; throw x;} catch (Error x) {thrown x; throw x;} catch (Throwable x) {thrown x; throw new Error(x);} finally {// 钩子函数afterExecute(task, thrown);}} finally {task null;w.completedTasks;w.unlock();}}completedAbruptly false;} finally {processWorkerExit(w, completedAbruptly);}
}// 从工作队列中拉取任务是一个死循环
// 如果该线程是 核心线程基于take方法从工作队列中拉取任务死等下去直到线程中断
// 如果非核心线程基于poll方法拉取指定时间的任务
private Runnable getTask() {boolean timedOut false; // Did the last poll() time out?for (; ; ) {int c ctl.get();int rs runStateOf(c);// 还是先判断线程池状态如果是stop 或者SHUTDOWN且工作队列中 没有任务了就可以干掉当前任务if (rs SHUTDOWN (rs STOP || workQueue.isEmpty())) {// CAS工作线程个数减一 // 结束一个线程就是run方法结束。// 外层是在while循环里面调用的run方法这里取不到就不能继续进入循环 上次循环的run方法执行完成也就是说明当前工作线程结束了。decrementWorkerCount();return null;}int wc workerCountOf(c);// Are workers subject to culling?boolean timed allowCoreThreadTimeOut || wc corePoolSize;// 省略一堆判断try {// 这里的逻辑就是如果是核心线程就用take方法如果不是核心线程就用poll方法// poll方法拉取阻塞队列中的任务指定等待时间// take方法死等下去直到线程中断Runnable r timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();if (r ! null) {return r;}timedOut true;} catch (InterruptedException retry) {timedOut false;}}
} 3、线程池的主要执行方法excute() 3.1 excute方法主要是当用户提交一个任务给线程池之后线程池的处理逻辑
如果当前线程数小于核心线程数就新创建一个worker执行任务因为并发的原因可能会创建失败这时就尝试将任务放到任务队列中如果此时工作队列也满了加入失败就尝试创建非核心工作线程处理任务如果此时线程个数也已经达到了允许的最大线程数的限制那么就执行拒绝策略。
/*** 任务交给线程池处理的时候执行excute方法* 1、首先判断当前工作线程的个数是否小于定义的核心线程个数如果小于就创建核心工作线程执行当前任务返回* 2、如果不小于核心工作线程个数那么就尝试把任务放到工作队列中如放入成功就返回了* 3、如果工作队列已经满了就判断当前工作线程数是否超过允许的最大线程数如果小于就创建非核心工作线程执行该任务返回* 4. 如果当前工作线程数目也等于允许的最大线程数了就要执行拒绝策略了。* * * param command 要传递的任务*/
public void execute(Runnable command) {// 非空校验if (command null)throw new NullPointerException();// 拿到线程池的状态以及工作线程的个数int c ctl.get();// workerCountOf(c)就是从int类型的熟悉中拿到工作线程的个数// 如果工作线程的个数小于核心线程数那就创建工作线程if (workerCountOf(c) corePoolSize) {// 创建工作线程创建成功就返回if (addWorker(command, true))return;// 创建失败可能是并发环境下有其他线程创建成功了重新取一下ctlc ctl.get();}// 核心线程数已经达到了预期的数量就先尝试把任务放到工作队列中// 首先判断线程池状态是running然后将任务放进队列中if (isRunning(c) workQueue.offer(command)) {int recheck ctl.get();// 任务入队前后线程池的状态可能会改变如果此时不是running状态就要对新加入的任务从队列中删除并且执行拒绝策略if (! isRunning(recheck) remove(command))reject(command);// 核心线程数可能设置为0如果为0就要创建非核心工作线程else if (workerCountOf(recheck) 0)addWorker(null, false);}// 任务放到队列中失败尝试构建非核心线程去处理当前任务// 创建成功就结束了else if (!addWorker(command, false))// 非核心线程创建失败就执行拒绝策略。reject(command);
} 3.2 线程池中创建worker的方法, addWorker: addWorker方法就是用来创建线程然后启动线程执行任务的。下面来看看他的具体逻辑
首先是两层的for循环 外层for循环用于判断线程池的状态 如果不是running状态那么就不能添加新任务了此外经过一系列判断确保在shutdown状态下有工作线程能够处理阻塞队列中的任务因此核心工作线程数目是可以为0的内层for循环用于判断线程池中工作线程的个数 如果超过了要求的数目就返回否则通过cas来使当前线程个数加1然后就是创建worker并执行任务 创建出来的worker要加入 HashSet类型的 worker中因此要加锁处理添加成功就执行该任务并返回true否则返回false
// 创建工作线程包括核心线程和非核心线程
private boolean addWorker(Runnable firstTask, boolean core) {// 外层for循环判断线程池的状态// 内存for循环判断线程池的个数retry:for (; ; ) {int c ctl.get();int rs runStateOf(c); // rs就是高三位的线程池状态// rs SHUTDOWN 说明线程池不是running状态不能接收新任务 -- 添加失败返回false// SHUTDOWN可以正常处理工作队列的任务后面的判断为了解决在SHUTDOWN状态下没有工作线程处理工作队列中的任务的情况if (rs SHUTDOWN !(rs SHUTDOWN firstTask null !workQueue.isEmpty())) {return false;}for (; ; ) {int wc workerCountOf(c);// 工作线程数目大于等于最大数目2^29或者大于等于 核心线程数/最大线程数 返回false添加失败--- 根据是否创建核心线程确定的 if (wc CAPACITY || wc (core ? corePoolSize : maximumPoolSize)) {return false;}// cas修改工作线程数目成功就跳出循环if (compareAndIncrementWorkerCount(c)) {break retry;}// CAS修改失败了重新获取新的ctlc ctl.get(); // 重新判断线程池状态如果和原来不一样就重新外层循环因为外层循环是判断线程池状态的// 如果一样说明线程池状态没变继续内存循环就行了if (runStateOf(c) ! rs) {continue retry;}}}// 定义两个标记工作线程是否启动工作线程是否创建boolean workerStarted false;boolean workerAdded false;// w就是要创建的工作线程Worker w null;try {/*** 创建工作线程并且把任务交给这个线程new里面调用了线程工厂* Worker(Runnable firstTask) {* setState(-1); // inhibit interrupts until runWorker* this.firstTask firstTask;* this.thread getThreadFactory().newThread(this);* }*/w new Worker(firstTask);// 获取new worker时得到的线程对象final Thread t w.thread;// 因为线程工厂是用户传进来的所以thread可能为null这里判断一下增加代码的健壮性if (t ! null) {// 这里用锁对workers进行操作将创建出来的工作线程加到workers 中// 这个workers是一个hashSetfinal ReentrantLock mainLock this.mainLock;mainLock.lock();try {int rs runStateOf(ctl.get());if (rs SHUTDOWN || (rs SHUTDOWN firstTask null)) {if (t.isAlive()) {throw new IllegalThreadStateException();}// 在此处添加workers.add(w);int s workers.size();if (s largestPoolSize) {largestPoolSize s;}workerAdded true;}} finally {mainLock.unlock();}if (workerAdded) {// 工作线程添加成功了就启动线程t.start();workerStarted true;}}} finally {if (!workerStarted) {addWorkerFailed(w);}}return workerStarted;
} 4 线程池的关闭流程 4.1 shutdown() 方法
这个方法被称为温柔的终止线程池不接受新任务但是会处理完正在运行的任务和阻塞队列中的任务。--- 这个方法会将线程池状态修改为 SHUTDOWN
怎么判断哪些任务是空闲的哪些任务是正在运行的呢其实还是根据worker中的state的值来判断的在循环workers的过程中会尝试通过CAS将当前worker的状态从0修改到1只有空闲状态状态的工作线程的state为0修改成功然后执行interrupt命令工作状态则不会。
// 温柔的终止线程池不接受新任务但是会处理完正在运行的和阻塞队列中的任务
public void shutdown() {final ReentrantLock mainLock this.mainLock;// 可重入锁加锁因为要操作workers了这是一个hashsetmainLock.lock();try {// 权限校验checkShutdownAccess();// 将线程池状态设置为shutdownadvanceRunState(SHUTDOWN);// 这个方法里面中断所有的空闲线程interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}// 尝试终止线程池tryTerminate();
}// 中断所有的空闲线程
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock this.mainLock;mainLock.lock();try {for (ThreadPoolExecutor.Worker w : workers) {Thread t w.thread;// w.tryLock() --- 这里最终会调用CAS尝试把state从0修改到1// 但是我们知道工作中的线程是加了锁的state的值不为0因此工作线程CAS失败不会进入判断// 只有空闲线程才会修改成功然后执行interrupt方法if (!t.isInterrupted() w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {// tryLock里面cas成功了state要减一w.unlock();}}// onlyOne如果为true最多interrupt一个worker// 只有当终止流程已经开始但线程池还有worker线程时,tryTerminate()方法会做调用onlyOne为true的调用// 在这种情况下最多有一个worker被中断为了传播shutdown信号以免所有的线程都在等待// 为保证线程池最终能终止这个操作总是中断一个空闲workerif (onlyOne) {break;}}} finally {mainLock.unlock();}
} 4.2 shutdownNow()方法
shutdownNow方法就很暴力直接中断所有线程即便当前线程正在执行任务也会执行interrupt方法然后将工作队列中的任务返回。
// 中断所有线程
public ListRunnable shutdownNow() {ListRunnable tasks;final ReentrantLock mainLock this.mainLock;mainLock.lock();try {// 权限校验checkShutdownAccess();// 修改状态为stopadvanceRunState(STOP);// 简单粗暴终止所有线程interruptWorkers();// 将工作线程中的任务都放到一个list中返回tasks drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;
}// 简单粗暴除了初始化状态的线程全部中断
private void interruptWorkers() {final ReentrantLock mainLock this.mainLock;mainLock.lock();try {for (ThreadPoolExecutor.Worker w : workers)w.interruptIfStarted();} finally {mainLock.unlock();}
} 4.3 tryTerminate() 方法
我们看到在shutdown和shutdownNow方法中都调用了tryTerminate() 方法事实上所有可能导致线程池产终止的地方都调用了tryTerminate() 方法这个方法中在工作线程不为0的时候会去中断线程池中的一个空闲的线程这样做的目的是当满足终结线程池的条件但是工作线程数不为0 这个时候需要中断一个空闲的工作线程去确保线程池关闭的信号得以传播。
想象这样一种场景调用shutdown时多个worker正在运行且此时工作队列也不为空当所有的任务都执行完毕时核心线程会被 queue.take()阻塞无法终止线程但是因为调用了showdown后续也无法接收新任务了这是不合理的因此需要在showdown之后还可以发出中断信号。事实上所有可能导致线程池产终止的地方都调用了tryTerminate() 方法如果线程池进入了终止流程但是还有空闲线程就中断一个空闲线程。
// 每个工作线程结束的时候都会调用tryTerminate方法
final void tryTerminate() {for (; ; ) {int c ctl.get();// 还是判断线程池的状态if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) SHUTDOWN !workQueue.isEmpty())) {return;}// 工作线程不为0则会中断工作线程集合中的第一个空闲的线程// ONLY_ONE为 true表示只中断一个线程会在遍历的时候跳出循环// 当满足终结线程池的条件但是工作线程数不为0这个时候需要中断一个空闲的工作线程去确保线程池关闭的信号得以传播。if (workerCountOf(c) ! 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock this.mainLock;mainLock.lock();try {// CAS设置线程池状态为TIDYINGif (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {// 最后更新线程池状态为TERMINATEDctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}
} 介绍一下Thread中的几个方法。
stop()方法立即停止当前线程并释放当前线程的锁已经弃用了。interrupt()方法给当前线程设置一个中断标志位当前任务还会继续执行isInterrupted()方法返回当前线程的中断标志位。interrupted()方法这个是一个静态方法也是查询当前线程的中断标志位但是查询之后会把当前线程的中断标记位清除。