建站之星好不,帝国网站管理系统前台,网站建设必要性,创意设计广告文章目录 前言一、线程池的相关接口和实现类1.Executor接口2.ExecutorService接口3.AbstractExecutorService接口4.ThreadPoolExecutor 实现类 二、ThreadPoolExecutor源码解析1.Worker内部类2.execute()方法3.addWorker()方法 总结 前言
线程池内部维护了若干个线程#xff… 文章目录 前言一、线程池的相关接口和实现类1.Executor接口2.ExecutorService接口3.AbstractExecutorService接口4.ThreadPoolExecutor 实现类 二、ThreadPoolExecutor源码解析1.Worker内部类2.execute()方法3.addWorker()方法 总结 前言
线程池内部维护了若干个线程没有任务的时候这些线程都处于等待空闲状态。如果有新的线程任务就分配一个空闲线程执行。如果所有线程都处于忙碌状态线程池会创建一个新线程进行处理或者放入队列工作队列中等待。 线程池在多线程编程中扮演着重要角色它能够管理和复用线程提高并发执行效率。 在之前的学习中我们知道了线程池的基本流程如下而今天我们则用这个流程配合着源码的来重新分析线程池的执行流程。 一、线程池的相关接口和实现类
1.Executor接口
public interface Executor {/*** Executes the given command at some time in the future. The command* may execute in a new thread, in a pooled thread, or in the calling* thread, at the discretion of the {code Executor} implementation.** param command the runnable task* throws RejectedExecutionException if this task cannot be* accepted for execution* throws NullPointerException if command is null*/void execute(Runnable command);
}Executor接口作为线程池技术中的顶层接口它的作用是用来定义线程池中用于提交并执行线程任务的核心方法exuecte()方法。未来线程池中所有的线程任务都将由exuecte()方法来执行。
2.ExecutorService接口
public interface ExecutorService extends Executor {//.....
}ExecutorService接口继承了Executor接口扩展了awaitTermination()、submit()、shutdown()等专门用于管理线程任务的方法。
3.AbstractExecutorService接口
public abstract class AbstractExecutorService implements ExecutorService {//....
}ExecutorService接口的抽象实现类AbstractExecutorService为不同的线程池实现类提供submit()、invokeAll()等部分方法的公共实现。但是由于在不同线程池中的核心方法exuecte()执行策略不同所以在AbstractExecutorService并未提供该方法的具体实现。
4.ThreadPoolExecutor 实现类
public class ThreadPoolExecutor extends AbstractExecutorService {//...
}ThreadPoolExecutor实现类是AbstractExecutorService接口的的两个重要实现类之一ForkJoinPool是另一个也是要掌握的关于线程池的重点区域 ThreadPoolExecutor线程池通过Woker工作线程、BlockingQueue阻塞工作队列 以及 拒绝策略实现了一个标准的线程池 二、ThreadPoolExecutor源码解析
在对源码进行解析之前我们先看看官方给我们关于ThreadPoolExecutor的解析是什么 The main pool control state, ctl, is an atomic integer packing * two conceptual fields * workerCount, indicating the effective number of threads * runState, indicating whether running, shutting down etc * … 这句话什么意思呢 主池控制状态ctl是一个原子整数封装
private final AtomicInteger ctl new AtomicInteger(ctlOf(RUNNING, 0));再结合ctl的实例化我们这个发现这个ctl是一个具有原子性的整数再往后就是告诉我们这个具有原子性的整数是由两个概念组成的workerCount工作线程数和runState运行状态。 他是一个32位的整数具体表示形式位 所以说它的作用就是通过位运算来存储线程池的状态和活动线程数信息。
1.Worker内部类
每个Woker类的对象都代表线程池中的一个工作线程。 Worker类是ThreadPoolExecutor类中定义的一个私有内部类保存了每个Worker工作线程要执行的Runnable线程任务和Thread线程对象。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID 6138294804551838833L;//用于存储当前工作线程的引用。final Thread thread;//用于存储该工作线程要执行的第一个任务。Runnable firstTask;//用于记录该工作线程已经完成的任务数量volatile long completedTasks;//初始化工作线程的状态设置第一个任务并创建一个线程对象Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask firstTask;this.thread getThreadFactory().newThread(this);}//Runnable 接口的 run 方法调用了 runWorker(this)将工作线程自身作为参数传递给 runWorker方法public void run() {runWorker(this);}//因为继承了AbstractQueuedSynchronizer 类一下方法都是基于线程安全的方法//..... }当ThreadPoolExecutor线程池通过exeute()方法执行1个线程任务时会调用addWorker()方法创建一个Woker工作线程对象。并且创建好的Worker工作线程对象会被添加到一个HashSet workders工作线程集合统一由线程池进行管理。 当Worker工作线程在第一次执行完成线程任务后这个Worker工作线程并不会销毁而是会以循环的方式通过线程池的getTask()方法获取阻塞工作队列中新的Runnable线程任务并通过当前Worker工作线程中所绑定Thread线程完成新线程任务的执行从而实现了线程池的中Thread线程的重复使用。
2.execute()方法
ThreadPoolExecutor线程池中会通过execute(Runnable command)方法执行Runnable类型的线程任务。 在分析execute方法之前也来看看官方是怎么解释这个方法的 /** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldnt, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/简单来说就是将execute方法分成三个步骤 1.如果工作线程的数量小于核心线程数则通过addWorker()方法创建新的Worker工作线程并添加至workers工作线程集合 2.如果一个任务可以成功排队(工作线程的数量大于核心线程数)并且线程池处于RUNNING状态那么线程池会将Runnable类型的线程任务缓存至workQueue阻塞工作队列等待某个空闲工作线程获取并执行该任务 3.如果我们不能排队任务那么我们尝试添加一个新的线程。如果它失败了我们知道我们被关闭或饱和了,因此拒绝该任务。
源码:
/* *param 要提交给线程池的任务
*/
public void execute(Runnable command) {//如果传入的任务为空抛出空指针异常。if (command null)throw new NullPointerException();//获取当前线程池的状态和活动线程数。int c ctl.get();//如果当前线程数小于核心线程数if (workerCountOf(c) corePoolSize) {//如果可以通过addWorker方法创建一个新的工作线程来执行任务//因为当前线程数小于核心线程数所以第二个参数穿入true代表创建的是核心线程if (addWorker(command, true))return;//创建成功直接返回。//创建失败获取更新后的线程池状态c ctl.get();}//如果线程池状态是运行中且工作队列能够接受新任务if (isRunning(c) workQueue.offer(command)) {//任务进入工作队列//重新获取线程池的状态和工作线程数int recheck ctl.get();//如果线程池不是运行状态则删除任务if (! isRunning(recheck) remove(command))、//执行拒绝策略reject(command);// 如果工作线程数等于零通过addWorker()方法检查线程池状态和工作队列else if (workerCountOf(recheck) 0)addWorker(null, false);}//如果无法将任务添加到队列中也无法创建新的工作线程那么拒绝任务的执行else if (!addWorker(command, false))reject(command);}3.addWorker()方法
在分析execute方法中发现execute方法多次调用了addWorker()创建一个工作线程用于执行当前线程任务。 addWorker可以分为两个执行部分检查线程池的状态和工作线程数量和创建并执行工作线程。 第1部分检查线程池的状态和工作线程数量
//参数1.传入的任务2.是否创建核心线程
private boolean addWorker(Runnable firstTask, boolean core) {// 循环检查线程池的状态直到符合创建工作线程的条件通过retry标签break退出retry:for (;;) {//获取线程池运行状态int c ctl.get();int rs runStateOf(c);//如果线程池处于开始关闭的状态获取线程任务为空同时工作队列不等于空if (rs SHUTDOWN ! (rs SHUTDOWN firstTask null ! workQueue.isEmpty()))return false;//检查工作线程数量for (;;) {//获取当前工作线程数int wc workerCountOf(c);//如果工作线程数量如果超出线程池的最大容量或者核心线程数(最大线程数)//三元运算符表示的是当前要的是核心线程还是非核心线程if (wc CAPACITY ||wc (core ? corePoolSize : maximumPoolSize))return false;//不再创建新的线程//通过ctl对象将当前工作线程数量1并通过retry标签break退出外层循环if (compareAndIncrementWorkerCount(c))break retry;//再次获取线程池状态,检查是否发生变化c ctl.get(); if (runStateOf(c) ! rs)continue retry;//...}}}第二部分:创建并执行线程工程 //....//用于判断工作线程是否启动和保存boolean workerStarted false;boolean workerAdded false;Worker w null;try {//创建新工作线程,并通过线程工厂创建Thread线程w new Worker(firstTask);//获取新工作线程的Thread线程对象,用于启动真正的线程final Thread t w.thread; if (t ! null) {//获取线程池的reentrantLock主锁保证线程安全final ReentrantLock mainLock this.mainLock;mainLock.lock();try {//检查线程池运行状态int rs runStateOf(ctl.get());//如果线程池状态小于关闭状态或者线程池状态为关闭且没有初始任务if (rs SHUTDOWN ||(rs SHUTDOWN firstTask null)) {//如果工作线程已经在运行存活抛出非法线程状态异常。if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//保存工作线程workers.add(w);//记录线程池的最大工作线程数int s workers.size();if (s largestPoolSize)largestPoolSize s;workerAdded true;}} finally {mainLock.unlock();}//正式启动线程if (workerAdded) {t.start();workerStarted true;}}} finally {//如果工作线程没有成功启动,则调用添加失败的方法if (! workerStarted)addWorkerFailed(w);}//返回线程启动状态return workerStarted;总结
execute方法是ThreadPoolExecutor线程池执行的开始它完整实现了Executor接口定义execute()方法这个方法作用是执行一个Runnable类型的线程任务。整体的执行流程是