义乌市网站制作,制作自己盈利的网站,网站积分规则设计,餐馆效果图网站文章目录 线程池介绍线程池核心参数核心线程数#xff08;Core Pool Size#xff09;最大线程数#xff08;Maximum Pool Size#xff09;队列#xff08;Queue#xff09;线程空闲超时时间#xff08;KeepAliveTime#xff09;拒绝策略#xff08;RejectedExecutionH… 文章目录 线程池介绍线程池核心参数核心线程数Core Pool Size最大线程数Maximum Pool Size队列Queue线程空闲超时时间KeepAliveTime拒绝策略RejectedExecutionHandler 线程池执行流程 快速消费线程池快速消费线程池组件相关依赖快速消费队列快速消费线程池获取配置文件的配置配置线程池Bean到容器中 说明 线程池介绍
线程池作为多线程编程中的重要工具旨在通过复用已创建的线程来减少线程创建与销毁的开销提升系统资源利用率和并发性能。要有效地使用线程池理解和配置其核心参数至关重要。
线程池核心参数
创建一个线程池的代码如下可以看到构造方法需要传递几个参数下文会详细展示每个参数的含义
// 导包
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;// 创建线程池
return new ThreadPoolExecutor(poolConfigProperties.getCoreSize(),poolConfigProperties.getMaxSize(),poolConfigProperties.getKeepAliveTime(),TimeUnit.SECONDS,//队列的最大容量new LinkedBlockingDeque(600),//使用默认的工程Executors.defaultThreadFactory(),//使用拒绝新来的拒绝策略new ThreadPoolExecutor.CallerRunsPolicy()
);核心线程数Core Pool Size
核心线程数是指线程池在初始化时创建并保持活动状态的线程数量。即使这些线程当前没有任务执行它们也不会被回收。核心线程数通常根据系统资源、预期并发负载和任务特性来设定。核心线程在池中长期存在能够快速响应新提交的任务减少任务提交后的等待时间。
最大线程数Maximum Pool Size
最大线程数限制了线程池能同时容纳的线程总数。当核心线程数无法满足当前任务需求时线程池会创建额外的线程直至达到最大线程数。超过这个阈值后线程池将采取拒绝策略处理新提交的任务。合理设置最大线程数既能防止资源过度消耗导致系统过载又能确保在高并发场景下有足够的线程处理任务。
队列Queue
线程池通常配合任务队列使用用于暂存待处理的任务。当所有核心线程都处于忙碌状态且未达到最大线程数时新提交的任务会被放入队列中等待。常见的队列类型包括无界队列如 LinkedBlockingQueue、有界队列如 ArrayBlockingQueue和优先级队列如 PriorityBlockingQueue。队列的选择和容量大小直接影响线程池的阻塞策略和任务调度效率。
线程空闲超时时间KeepAliveTime
当线程池中存在超出核心线程数的非核心线程并且这些线程在一段时间内即 KeepAliveTime没有执行任何任务则会自动终止。这个参数有助于释放闲置资源避免资源浪费。对于长期存在大量任务的系统可以适当增大或关闭这个超时时间。
拒绝策略RejectedExecutionHandler
当线程池和队列都无法接纳新任务时需要采用拒绝策略来处理。常见的拒绝策略有
AbortPolicy默认策略直接抛出 RejectedExecutionException。CallerRunsPolicy由提交任务的线程自行执行任务。DiscardPolicy默默地丢弃任务不抛出异常也不执行。DiscardOldestPolicy丢弃队列中最旧的任务尝试提交新任务。 线程池执行流程 初始阶段线程池创建并启动核心线程数指定数量的线程。此时如果有任务提交直接由这些核心线程执行。 核心线程饱和当所有核心线程都在执行任务且任务队列尚未满时新提交的任务被放入队列等待。 队列满载若任务提交速率持续高于线程处理速度队列达到其容量上限。此时线程池开始创建新的线程不超过最大线程数直接执行新提交的任务。 达到最大线程数若任务增长仍然无法遏制线程池达到最大线程数。此时新提交的任务将触发拒绝策略。 任务减少与线程收缩当任务提交速率降低线程池中的线程开始完成任务并变得空闲。对于非核心线程若在 KeepAliveTime 时间内未获得新任务将被终止。系统逐渐回归到更低的线程数直至仅保留核心线程。
在任务量增长的过程中线程池通过动态调整线程数量和利用任务队列既保证了系统的响应能力又防止了资源过度消耗。
快速消费线程池
快速消费线程池通过对上述线程池进行改造当核心线程饱和时再提交的任务不是先加入到队列中而是直接创建非核心线程来执行新提交任务。快速消费线程池可以加快任务的执行减少任务的堆积。
快速消费线程池组件 相关依赖
dependenciesdependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactIdscopeprovided/scope/dependency
/dependencies快速消费队列
该类继承自LinkedBlockingQueue并对其offer方法进行定制以配合EagerThreadPoolExecutor实现更灵活的任务调度策略。主要目的是在满足特定条件时促使线程池创建非核心线程以快速处理任务而非直接将任务放入队列等待处理。
import lombok.Data;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;/*** 快速消费任务队列*/
Data
public class EagerTaskQueueR extends Runnable extends LinkedBlockingQueueRunnable {private EagerThreadPoolExecutor executor;/*** 构造函数传入队列容量参数用于初始化LinkedBlockingQueue。** param capacity 队列的最大容量*/public EagerTaskQueue(int capacity) {super(capacity);}/*** 重写父类LinkedBlockingQueue的offer方法实现自定义的任务入队逻辑* 当没有到达最大线程时返回false让其创建非核心线程** param runnable 待添加的任务对象* return 如果任务成功加入队列或触发线程池创建非核心线程则返回true否则返回false*/Overridepublic boolean offer(Runnable runnable) {// 获取当前线程池的线程数量int currentPoolThreadSize executor.getPoolSize();// 检查是否有核心线程处于空闲状态已提交任务数小于当前线程数if (executor.getSubmittedTaskCount() currentPoolThreadSize) {// 如果有核心线程正在空闲将任务加入阻塞队列由核心线程进行处理任务return super.offer(runnable);}// 检查当前线程池线程数量是否小于最大线程数if (currentPoolThreadSize executor.getMaximumPoolSize()) {
// System.out.println(线程池线程数量小于最大线程数返回 False线程池会创建非核心线程);// 当前线程池线程数量小于最大线程数返回false触发线程池创建非核心线程处理任务return false;}// 如果当前线程池数量大于最大线程数任务加入阻塞队列等待线程池中的已有线程处理return super.offer(runnable);}/**** param runnable 待添加的任务对象* param timeout 等待加入队列的超时时间* param timeUnit 超时时间单位* return 如果任务成功加入队列或触发线程池创建非核心线程则返回true否则返回false* throws InterruptedException 如果在等待过程中线程被中断* throws RejectedExecutionException 如果线程池已关闭*/public boolean retryOffer(Runnable runnable, long timeout, TimeUnit timeUnit) throws InterruptedException {// 如果线程池已关闭则抛出RejectedExecutionException异常。if (executor.isShutdown()) {throw new RejectedExecutionException(Executor is shutdown!);}return super.offer(runnable, timeout, timeUnit);}
}快速消费线程池
该类继承自ThreadPoolExecutor并对其进行定制以实现更灵活的任务调度策略。主要特点包括
使用自定义的EagerTaskQueue作为工作队列支持根据线程池状态动态调整任务入队逻辑。维护正在处理的任务数量计数器submittedTaskCount以便EagerTaskQueue判断是否有核心线程处于空闲状态。在execute方法中处理任务提交失败的情况尝试将任务重新投递到队列或使用拒绝策略。
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;/*** 快速消费线程池*/
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {/*** 使用AtomicInteger记录当前正在处理的任务数量提供线程安全的计数操作。*/private final AtomicInteger submittedTaskCount new AtomicInteger(0);/*** 构造函数接受线程池相关的配置参数包括核心线程数、最大线程数、线程存活时间、时间单位、工作队列、线程工厂和拒绝策略。* 工作队列类型为自定义的EagerTaskQueue用于实现特殊的任务入队逻辑。** param corePoolSize 核心线程数* param maximumPoolSize 最大线程数* param keepAliveTime 线程空闲后的存活时间* param unit 时间单位* param workQueue 工作队列类型为EagerTaskQueue* param threadFactory 线程工厂用于创建新线程* param handler 拒绝策略当线程池和队列无法接受新任务时的处理方式*/public EagerThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,EagerTaskQueueRunnable workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);}/*** 创建一个EagerThreadPoolExecutor实例的便捷方法* 包括创建EagerTaskQueue并设置其与线程池的关联** param corePoolSize 核心线程数* param maximumPoolSize 最大线程数* param keepAliveTime 线程空闲后的存活时间* param unit 时间单位* param queueCapacity 队列容量* param threadFactory 线程工厂用于创建新线程* param handler 拒绝策略当线程池和队列无法接受新任务时的处理方式* return 创建的EagerThreadPoolExecutor实例*/public static EagerThreadPoolExecutor createEagerThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,int queueCapacity,ThreadFactory threadFactory,RejectedExecutionHandler handler) {EagerTaskQueue eagerTaskQueue new EagerTaskQueue(queueCapacity);EagerThreadPoolExecutor eagerThreadPoolExecutor new EagerThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, eagerTaskQueue, threadFactory, handler);eagerTaskQueue.setExecutor(eagerThreadPoolExecutor);return eagerThreadPoolExecutor;}/*** 获取当前正在处理的任务数量。** return 当前正在处理的任务数量*/public int getSubmittedTaskCount() {return submittedTaskCount.get();}/*** 重写父类的afterExecute方法当任务执行完成后将正在执行的任务数量减一。* 这是ThreadPoolExecutor提供的钩子方法用于在任务执行结束后进行清理或其他操作。** param r 执行完毕的任务* param t 执行过程中抛出的异常如果有的话*/Overrideprotected void afterExecute(Runnable r, Throwable t) {// 任务执行完成将正在执行数量-1submittedTaskCount.decrementAndGet();}/*** 重写父类的execute方法用于提交任务到线程池。* 在提交任务之前先将正在执行的任务数量加一。若提交失败根据具体情况尝试重新投递任务或使用拒绝策略。** param command 待提交的任务* throws RejectedExecutionException 如果任务无法被接受且无法重新投递到队列*/Overridepublic void execute(Runnable command) {
// System.out.println(使用快速消费线程池执行任务);// 将正在执行任务数量 1submittedTaskCount.incrementAndGet();try {super.execute(command);} catch (RejectedExecutionException ex) {// 任务被拒绝间隔一定时间将任务重新投递到队列EagerTaskQueue eagerTaskQueue (EagerTaskQueue) super.getQueue();try {// 将任务重新投递到队列if (!eagerTaskQueue.retryOffer(command, 10, TimeUnit.MILLISECONDS)) {// 队列已满使用拒绝策略并减少计数submittedTaskCount.decrementAndGet();throw new RejectedExecutionException(Queue capacity is full., ex);}} catch (InterruptedException iex) {// 重试失败将正在执行任务数量 - 1submittedTaskCount.decrementAndGet();throw new RejectedExecutionException(iex);}} catch (Exception ex) {// 执行失败将正在执行任务数量 - 1submittedTaskCount.decrementAndGet();throw ex;}}
}获取配置文件的配置 import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;ConfigurationProperties(prefix sss.thread)
Component//将该配置放到容器中
Data
public class ThreadPoolConfigProperties {private Integer coreSize;private Integer maxSize;private Integer keepAliveTime;}配置线程池Bean到容器中
import com.dam.eager.EagerThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;Configuration
public class MyThreadConfig {/*** param poolConfigProperties 如果需要使用到ThreadPoolConfigProperties一定要使用Component将其加入到容器中* return*/Beanpublic ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties poolConfigProperties) {// 普通线程池
// return new ThreadPoolExecutor(poolConfigProperties.getCoreSize(),
// poolConfigProperties.getMaxSize(),
// poolConfigProperties.getKeepAliveTime(),
// TimeUnit.SECONDS,
// //队列的最大容量
// new LinkedBlockingDeque(600),
// //使用默认的工程
// Executors.defaultThreadFactory(),
// //使用拒绝新来的拒绝策略
// new ThreadPoolExecutor.CallerRunsPolicy()
// );// 快速消费线程池return EagerThreadPoolExecutor.createEagerThreadPoolExecutor(poolConfigProperties.getCoreSize(),poolConfigProperties.getMaxSize(),poolConfigProperties.getKeepAliveTime(),TimeUnit.SECONDS,// 队列的最大容量600,// 使用默认的工程Executors.defaultThreadFactory(),// 使用拒绝新来的拒绝策略new ThreadPoolExecutor.CallerRunsPolicy());}
}说明
快速线程池的实现参考马哥 12306 的代码代码仓库为12306该项目含金量较高有兴趣的同学可以去学习一下。