o2o网站建设技术,最好的seo优化公司,界面设计的软件,桂林两江四湖夜景图片线程池概述
线程池#xff08;Thread Pool#xff09;是一种基于池化思想管理线程的工具#xff0c;经常出现在多线程服务器中#xff0c;如MySQL。
线程过多会带来额外的开销#xff0c;其中包括创建销毁线程的开销、调度线程的开销等等#xff0c;同时也降低了计算机…线程池概述
线程池Thread Pool是一种基于池化思想管理线程的工具经常出现在多线程服务器中如MySQL。
线程过多会带来额外的开销其中包括创建销毁线程的开销、调度线程的开销等等同时也降低了计算机的整体性能。线程池维护多个线程等待监督管理者分配可并发执行的任务。这种做法一方面避免了处理任务时创建销毁线程开销的代价另一方面避免了线程数量膨胀导致的过分调度问题保证了对内核的充分利用。
使用线程池可以带来的一系列好处
降低资源消耗通过池化技术重复利用已创建的线程降低线程创建和销毁造成的损耗。提高响应速度任务到达时无需等待线程创建即可立即执行。提高线程的可管理性线程是稀缺资源如果无限制创建不仅会消耗系统资源还会因为线程的不合理分布导致资源调度失衡降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。提供更多更强大的功能线程池具备可拓展性允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor就允许任务延期执行或定期执行。
线程池解决的问题
线程池解决的核心问题就是资源管理问题。在并发环境下系统不能够确定在任意时刻中有多少任务需要执行有多少资源需要投入。这种不确定性将带来以下若干问题
频繁申请/销毁资源和调度资源将带来额外的消耗可能会非常巨大。对资源无限申请缺少抑制手段易引发系统资源耗尽的风险。系统无法合理管理内部的资源分布会降低系统的稳定性。
为解决资源分配这个问题线程池采用了“池化”Pooling思想。为了最大化收益并最小化风险而将资源统一在一起管理的一种思想。
“池化”思想不仅仅能应用在计算机领域在金融、设备、人员管理、工作管理等领域也有相关的应用。
在计算机领域中的表现为统一管理IT资源包括服务器、存储、和网络资源等等。通过共享资源使用户在低投入中获益。除去线程池还有其他比较典型的几种使用策略包括
内存池(Memory Pooling)预先申请内存提升申请内存速度减少内存碎片。连接池(Connection Pooling)预先申请数据库连接提升申请连接的速度降低系统的开销。对象实例池(Object Pooling)循环使用对象减少资源在初始化和释放时的昂贵损耗。
线程池核心设计与实现 JDK 1.8线程池ThreadPoolExecutor的UML类图 顶层接口Executor提供了一种思想将任务提交和任务执行进行解耦。用户无需关注如何创建线程如何调度线程来执行任务用户只需提供Runnable对象将任务的运行逻辑提交到执行器(Executor)中由Executor框架完成线程的调配和任务的执行部分。ExecutorService接口增加了一些能力
扩充执行任务的能力补充可以为一个或一批异步任务生成Future的方法提供了管控线程池的方法比如停止线程池的运行。
AbstractExecutorService则是上层的抽象类将执行任务的流程串联了起来保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类ThreadPoolExecutor实现了顶层接口Executor。ThreadPoolExecutor实现最复杂的运行部分ThreadPoolExecutor将会一方面维护自身的生命周期另一方面同时管理线程和任务使两者良好的结合从而执行并行任务。
ThreadPoolExecutor运行机制 ThreadPoolExecutor运行流程 线程池在内部实际上构建了一个生产者消费者模型将线程和任务两者解耦并不直接关联从而良好的缓冲任务复用线程。
线程池的运行主要分成两部分任务管理、线程管理。
任务管理部分充当生产者的角色当任务提交后线程池会判断该任务后续的流转
直接申请线程执行该任务缓冲到队列中等待线程执行拒绝该任务。
线程管理部分是消费者它们被统一维护在线程池内根据任务请求进行线程的分配当线程执行完任务后则会继续获取新的任务去执行最终当线程获取不到任务的时候线程就会被回收。
生命周期管理
线程池运行的状态并不是用户显式设置的而是伴随着线程池的运行由内部来维护。线程池内部使用一个变量维护两个值运行状态(runState)和线程数量 (workerCount)。在具体实现中线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起如下代码所示
private final AtomicInteger ctl new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS Integer.SIZE - 3;
private static final int CAPACITY (1 COUNT_BITS) - 1;ctl这个AtomicInteger类型变量是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段 它同时包含两部分的信息线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)高3位保存runState低29位保存workerCount两个变量之间互不干扰。用一个变量去存储两个值可避免在做相关决策时出现不一致的情况不必为了维护两者的一致而占用锁资源。通过阅读线程池源代码也可以发现经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式相比于基本运算速度也会快很多。
关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示
private static int runStateOf(int c) { return c ~CAPACITY; } //计算当前运行状态
private static int workerCountOf(int c) { return c CAPACITY; } //计算当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; } //通过状态和线程数生成ctl五种运行状态
ThreadPoolExecutor的运行状态有5种分别为
private static final int RUNNING -1 COUNT_BITS;
private static final int SHUTDOWN 0 COUNT_BITS;
private static final int STOP 1 COUNT_BITS;
private static final int TIDYING 2 COUNT_BITS;
private static final int TERMINATED 3 COUNT_BITS;
运行状态状态描述RUNNING 能接受新提交的任务并且也能处理阻塞队列中的任务。 SHUTDOWN 关闭状态。不再接受新提交的任务但却可以继续处理阻塞队列中已保存的任务。 STOP 不能接受新任务也不处理队列中的惹怒我会中断正在处理的线程。 TIDYING 所有的任务都已终止workerCount有效线程数为0 TERMINATED 在terminated()方法执行完后进入该状态
原子锁
private final ReentrantLock mainLock new ReentrantLock();
private final Condition termination mainLock.newCondition();
生命周期转换 线程池生命周期 Executor 框架的使用 主线程首先创建实现 Runnable或Callable 接口的任务对象。把创建完成的事项 Runnable/Callable 接口的对象直接交给ExecutorService执行。ExecutorService.execute(Runnable command) 或者把 Runnable对象或Callable对象提交给ExecutorService执行 ExecutorService cachedThreadPool Executors.newCachedThreadPool();
cachedThreadPool.execute(Runnable command);
cachedThreadPool.submit(Runnable task);
cachedThreadPool.submit(CallableT task); 如果执行ExecutorService execute(...)ExecutorService 将返回一个事项Future接口的对象。由于FutureTask实现了Runnable也可以在使用时创建FutureTask。然后交给ExecutorService执行。最后主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。 核心数据结构
public class ThreadPoolExecutor extends AbstractExecutorService {//...private final AtomicInteger ctl new AtomicInteger(ctlOf(RUNNING, 0));// 存放任务的阻塞队列private final BlockingQueueRunnable workQueue;// 对线程池内部各种变量进行互斥访问控制private final ReentrantLock mainLock new ReentrantLock();// 线程集合private final HashSetWorker workers new HashSetWorker();//...
}
每一个线程是一个Worker对象。Worker是ThreadPoolExector的内部类核心数据结构如下
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {// ...// Worker封装的线程final Thread thread;// Worker接收到的第1个任务Runnable firstTask;// Worker执行完毕的任务个数volatile long completedTasks;// ...
} Worker继承于AQS也就是说Worker本身就是一把锁。 这把锁用于线程池的关闭、线程执行任务的过程中。
七大核心参数
参数说明corePoolSize核心线程数量线程池维护线程的最少数量maximumPoolSize线程池维护线程的最大数量keepAliveTime线程池除核心线程外的其他线程的最长空闲时间超过该时间的空闲线程会被销毁unitkeepAliveTime的单位TimeUnit中的几个静态属性NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDSworkQueue线程池所使用的任务缓冲队列threadFactory线程工厂用于创建线程一般用默认的即可handler线程池对拒绝任务的处理策略
public ThreadPoolExecutor(int corePoolSize, // 核心线程数int maximumPoolSize, // 最大线程数long keepAliveTime, // 当线程数大于核心线程数时多余的空闲线程存活最长时间TimeUnit unit, // 空闲线程存活时间的时间单位BlockingQueueRunnable workQueue, // 任务队列ThreadFactory threadFactory, // 线程工厂用来创建线程RejectedExecutionHandler handler) { // 拒绝策略this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}
线程池的处理流程 线程池的处理流程 如果此时线程池中的数量小于corePoolSize即使线程池中的线程都处于空闲状态也要创建新的线程来处理被添加的任务。如果此时线程池中的数量等于corePoolSize但是缓冲队列workQueue未满那么任务被放入缓冲队列。如果此时线程池中的数量大于等于corePoolSize缓冲队列workQueue满并且线程池中的数量小于maximumPoolSize建新的线程来处理被添加的任务。如果此时线程池中的数量大于corePoolSize缓冲队列workQueue满并且线程池中的数量等于maximumPoolSize那么通过 handler所指定的策略来处理此任务。当线程池中的线程数量大于 corePoolSize时如果某线程空闲时间超过keepAliveTime线程将被终止。这样线程池可以动态的调整池中的线程数。
总结处理任务判断的优先级为 核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize如果三者都满了使用handler处理被拒绝的任务。
注意
当workQueue使用的是无界限队列时maximumPoolSize参数就变的无意义了比如new LinkedBlockingQueue(),或者new ArrayBlockingQueue(Integer.MAX_VALUE)。使用SynchronousQueue队列时由于该队列没有容量的特性所以不会对任务进行排队如果线程池中没有空闲线程会立即创建一个新线程来接收这个任务。maximumPoolSize要设置大一点。核心线程和最大线程数量相等时keepAliveTime无作用。
线程池关闭
shutdown() 不接收新任务,会处理已添加任务shutdownNow() 不接受新任务,不处理已添加任务,中断正在处理的任务
常用线程池 Executors .newCachedThreadPool(); 创建一个可缓存线程池如果线程池长度超过处理需要可灵活回收空闲线程若无可回收则新建线程。 内部实现new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue()); Executors .newFixedThreadPool(int); 创建一个定长线程池可控制线程最大并发数超出的线程会在队列中等待。 内部实现new ThreadPoolExecutor(nThreads, nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue()); Executors .newSingleThreadExecutor(); 创建一个单线程化的线程池它只会用唯一的工作线程来执行任务保证所有任务按照顺序执行。 内部实现new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue()) Executors .newScheduledThreadPool(int) 创建一个定长线程池支持定时及周期性任务执行。 内部实现new ScheduledThreadPoolExecutor(corePoolSize) Executors
.newSingleThreadScheduledExecutor() 创建一个单线程的任务调度池(定时任务/延时任务)
队列 名称描述ArrayBlockingQueue一个用数组实现的有界阻塞队列此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。LinkedBlockingQueue一个由链表结构组成的有界队列此队列按照先进先出FIFO)的原则对元素进行排序。此队列的默认长度为Integer.MAXVALUE所以默认创建的该队列有容量危险。PriorityBlockingQueue一个支持线程优先级排序的无界队列默认自然序进行排序也可以自定义实现compareTo(方法来指定元素排字规则不能保证同优先级元素的顺序。DelayQueue一个实现PriorityBlockingQueue实现延迟获取的无界队列在创建元素时可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。SynchronousQueue一个不存储元素的阻塞队列每一个put操作必须等待take操作否则不能添加元素。支持公平锁和非公平锁SynchronousQueue的-个使用场景是在线程池里。Executors.newCachedThreadPool0就使用了SynchronousQueue这个线程池根据需要(新任务到来时)创建新的线程如果有空闲线程则会重复使用线程空闲了60秒后会被回收。LinkedTransferQueue一个由链表结构组成的无界阳塞队列相当于其它队列LinkedTransferQueue队列多了transfer和trvTransfer方法。LinkedBlockingDeque一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素多线程并发时可以将锁的竞争最多降到一半。 拒绝策略
任务拒绝模块是线程池的保护部分线程池有一个最大的容量当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize时就需要拒绝掉该任务采取任务拒绝策略保护线程池。
拒绝策略是一个接口其设计如下
public interface RejectedExecutionHandler {void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
} 四种拒绝策略
名称描述ThreadPoolExecutor.AbortPolicy丢弃任务并抛出RejectedExecutionException异常。 这是线程池默认的拒绝策略在任务不能再提交的时候抛出异常及时反馈程序运行状态。如果是比较关键的业务推荐使用此拒绝策略这样子在系统不能承载更大的并发量的时候能够及时的通过异常发现。ThreadPoolExecutor.DiscardPolicy丢弃任务但是不抛出异常。 使用此策略可能会使我们无法发现系统的异常状态。建议是一些无关紧要的业务采用此策略。ThreadPoolExecutor.DiscardOldestPolicy丢弃队列最前面的任务然后重新提交被拒绝的任务。是否要采用此种拒绝策略还得根据实际业务是否允许丢弃老任务来认真衡量。ThreadPoolExecutor.CallerRunsPolicy由调用线程(提交任务的线程)处理该任务。这种情况是需要让所有任务都执行完毕那么就适合大量计算的任务类型去执行多线程仅仅是增大吞吐量的手段最终必须要让每个任务都执行完毕。
Worker线程管理
Worker线程
线程池为了掌握线程的状态并维护线程的生命周期设计了线程池内的工作线程Worker。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{final Thread thread;//Worker持有的线程Runnable firstTask;//初始化的任务可以为null
}
Worker执行任务的模型 Worker执行任务 线程池需要管理线程的生命周期需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。
Worker是通过继承AQS使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock而是使用AQS为的就是实现不可重入的特性去反应线程现在的执行状态。
lock方法一旦获取了独占锁表示当前线程正在执行任务中。如果正在执行任务则不应该中断线程。如果该线程现在不是独占锁的状态也就是空闲的状态说明它没有在处理任务这时可以对该线程进行中断。线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态如果线程是空闲状态则可以安全回收。
在线程回收过程中就使用到了这种特性回收过程如下图所示 线程池回收过程 线程池使用场景
场景1快速响应用户请求
场景2快速处理批量任务
【附】阿里巴巴Java开发手册中对线程池的使用规范
【强制】创建线程或线程池时请指定有意义的线程名称方便出错时回溯【强制】线程资源必须通过线程池提供不允许在应用中自行显式创建线程。 说明 使用线程池的好处是减少在创建和销毁线程上所花的时间以及系统资源的开销解决资源不足的问题。如果不使用线程池有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。【强制】线程池不允许使用 Executors 去创建而是通过 ThreadPoolExecutor 的方式创建这样的处理方式让写的同学更加明确线程池的运行规则规避资源耗尽的风险。 说明 Executors 返回的线程池对象的弊端如下 1 FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE可能会堆积大量的请求从而导致 OOM。 2 CachedThreadPool 和 ScheduledThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE 可能会创建大量的线程从而导致 OOM。