杨庄网站建设,软件开发模型案例,网站建设吉金手指排名11,中国能源建设集团有限公司招聘网Spring Boot 自定义线程池实现异步开发相信大家都了解#xff0c;但是在实际开发中需要在父子线程之间传递一些数据#xff0c;比如用户信息#xff0c;链路信息等等
比如用户登录信息使用ThreadLocal存放保证线程隔离#xff0c;代码如下#xff1a;
/*** author 公众号…Spring Boot 自定义线程池实现异步开发相信大家都了解但是在实际开发中需要在父子线程之间传递一些数据比如用户信息链路信息等等
比如用户登录信息使用ThreadLocal存放保证线程隔离代码如下
/*** author 公众号码猿技术专栏* description 用户上下文信息*/
public class OauthContext {private static final ThreadLocalLoginVal loginValThreadLocalnew ThreadLocal();public static LoginVal get(){return loginValThreadLocal.get();}public static void set(LoginVal loginVal){loginValThreadLocal.set(loginVal);}public static void clear(){loginValThreadLocal.remove();}
}那么子线程想要获取这个LoginVal如何做呢
今天就来介绍几种优雅的方式实现Spring Boot 内部的父子线程的数据传递。 1. 手动设置
每执行一次异步线程都要分为两步
获取父线程的LoginVal将LoginVal设置到子线程达到复用
代码如下
public void handlerAsync() {//1\. 获取父线程的loginValLoginVal loginVal OauthContext.get();log.info(父线程的值{},OauthContext.get());CompletableFuture.runAsync(()-{//2\. 设置子线程的值复用OauthContext.set(loginVal);log.info(子线程的值{},OauthContext.get());});}虽然能够实现目的但是每次开异步线程都需要手动设置重复代码太多看了头疼你认为优雅吗
2. 线程池设置TaskDecorator
TaskDecorator是什么官方api的大致意思这是一个执行回调方法的装饰器主要应用于传递上下文或者提供任务的监控/统计信息。
知道有这么一个东西如何去使用
TaskDecorator是一个接口首先需要去实现它代码如下
/*** author 公众号码猿技术专栏* description 上下文装饰器*/
public class ContextTaskDecorator implements TaskDecorator {Overridepublic Runnable decorate(Runnable runnable) {//获取父线程的loginValLoginVal loginVal OauthContext.get();return () - {try {// 将主线程的请求信息设置到子线程中OauthContext.set(loginVal);// 执行子线程这一步不要忘了runnable.run();} finally {// 线程结束清空这些信息否则可能造成内存泄漏OauthContext.clear();}};}
}这里我只是设置了LoginVal实际开发中其他的共享数据比如SecurityContextRequestAttributes....
TaskDecorator需要结合线程池使用实际开发中异步线程建议使用线程池只需要在对应的线程池配置一下代码如下
Bean(taskExecutor)
public ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor poolTaskExecutor new ThreadPoolTaskExecutor();poolTaskExecutor.setCorePoolSize(xx);poolTaskExecutor.setMaxPoolSize(xx);// 设置线程活跃时间秒poolTaskExecutor.setKeepAliveSeconds(xx);// 设置队列容量poolTaskExecutor.setQueueCapacity(xx);//设置TaskDecorator用于解决父子线程间的数据复用poolTaskExecutor.setTaskDecorator(new ContextTaskDecorator());poolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 等待所有任务结束后再关闭线程池poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);return poolTaskExecutor;}此时业务代码就不需要去设置子线程的值直接使用即可代码如下
public void handlerAsync() {log.info(父线程的用户信息{}, OauthContext.get());//执行异步任务需要指定的线程池CompletableFuture.runAsync(()- log.info(子线程的用户信息{}, OauthContext.get()),taskExecutor);}来看一下结果如下图 这里使用的是CompletableFuture执行异步任务使用Async这个注解同样是可行的。 注意无论使用何种方式都需要指定线程池 3. InheritableThreadLocal
这种方案不建议使用InheritableThreadLocal虽然能够实现父子线程间的复用但是在线程池中使用会存在复用的问题具体的可以看陈某之前的文章微服务中使用阿里开源的TTL优雅的实现身份信息的线程间复用
这种方案使用也是非常简单直接用InheritableThreadLocal替换ThreadLocal即可代码如下
/*** author 公众号码猿技术专栏* description 用户上下文信息*/
public class OauthContext {private static final InheritableThreadLocalLoginVal loginValThreadLocalnew InheritableThreadLocal();public static LoginVal get(){return loginValThreadLocal.get();}public static void set(LoginVal loginVal){loginValThreadLocal.set(loginVal);}public static void clear(){loginValThreadLocal.remove();}
}4. TransmittableThreadLocal
TransmittableThreadLocal是阿里开源的工具弥补了InheritableThreadLocal的缺陷在使用线程池等会池化复用线程的执行组件情况下提供ThreadLocal值的传递功能解决异步执行时上下文传递的问题。
使用起来也是非常简单添加依赖如下
dependencygroupIdcom.alibaba/groupIdartifactIdtransmittable-thread-local/artifactIdversion2.14.2/version
/dependencyOauthContext改造代码如下
/*** author 公众号码猿技术专栏* description 用户上下文信息*/
public class OauthContext {private static final TransmittableThreadLocalLoginVal loginValThreadLocalnew TransmittableThreadLocal();public static LoginVal get(){return loginValThreadLocal.get();}public static void set(LoginVal loginVal){loginValThreadLocal.set(loginVal);}public static void clear(){loginValThreadLocal.remove();}
}关于TransmittableThreadLocal想深入了解其原理可以看陈某之前的文章微服务中使用阿里开源的TTL优雅的实现身份信息的线程间复用应用还是非常广泛的
TransmittableThreadLocal原理
从定义来看TransimittableThreadLocal继承于InheritableThreadLocal并实现TtlCopier接口它里面只有一个copy方法。所以主要是对InheritableThreadLocal的扩展。
public class TransmittableThreadLocalT extends InheritableThreadLocalT implements TtlCopierT 在TransimittableThreadLocal中添加holder属性。这个属性的作用就是被标记为具备线程传递资格的对象都会被添加到这个对象中。
要标记一个类比较容易想到的方式就是给这个类新增一个Type字段还有一个方法就是将具备这种类型的的对象都添加到一个静态全局集合中。之后使用时这个集合里的所有值都具备这个标记。
// 1\. holder本身是一个InheritableThreadLocal对象
// 2\. 这个holder对象的value是WeakHashMapTransmittableThreadLocalObject, ?
// 2.1 WeekHashMap的value总是null,且不可能被使用。
// 2.2 WeekHasshMap支持valuenull
private static InheritableThreadLocalWeakHashMapTransmittableThreadLocalObject, ? holder new InheritableThreadLocalWeakHashMapTransmittableThreadLocalObject, ?() {Overrideprotected WeakHashMapTransmittableThreadLocalObject, ? initialValue() {return new WeakHashMapTransmittableThreadLocalObject, Object();}/*** 重写了childValue方法实现上直接将父线程的属性作为子线程的本地变量对象。*/Overrideprotected WeakHashMapTransmittableThreadLocalObject, ? childValue(WeakHashMapTransmittableThreadLocalObject, ? parentValue) {return new WeakHashMapTransmittableThreadLocalObject, Object(parentValue);}
};应用代码是通过TtlExecutors工具类对线程池对象进行包装。工具类只是简单的判断输入的线程池是否已经被包装过、非空校验等然后返回包装类ExecutorServiceTtlWrapper。根据不同的线程池类型有不同和的包装类。
Nullable
public static ExecutorService getTtlExecutorService(Nullable ExecutorService executorService) {if (TtlAgent.isTtlAgentLoaded() || executorService null || executorService instanceof TtlEnhanced) {return executorService;}return new ExecutorServiceTtlWrapper(executorService);
}进入包装类ExecutorServiceTtlWrapper。可以注意到不论是通过ExecutorServiceTtlWrapper#submit方法或者是ExecutorTtlWrapper#execute方法都会将线程对象包装成TtlCallable或者TtlRunnable用于在真正执行run方法前做一些业务逻辑。
/*** 在ExecutorServiceTtlWrapper实现submit方法*/
NonNull
Override
public T FutureT submit(NonNull CallableT task) {return executorService.submit(TtlCallable.get(task));
}/*** 在ExecutorTtlWrapper实现execute方法*/
Override
public void execute(NonNull Runnable command) {executor.execute(TtlRunnable.get(command));
}所以重点的核心逻辑应该是在TtlCallable#call()或者TtlRunnable#run()中。以下以TtlCallable为例TtlRunnable同理类似。在分析call()方法之前先看一个类Transmitter
public static class Transmitter {/*** 捕获当前线程中的是所有TransimittableThreadLocal和注册ThreadLocal的值。*/NonNullpublic static Object capture() {return new Snapshot(captureTtlValues(), captureThreadLocalValues());}/*** 捕获TransimittableThreadLocal的值,将holder中的所有值都添加到HashMap后返回。*/private static HashMapTransmittableThreadLocalObject, Object captureTtlValues() {HashMapTransmittableThreadLocalObject, Object ttl2Value new HashMapTransmittableThreadLocalObject, Object();for (TransmittableThreadLocalObject threadLocal : holder.get().keySet()) {ttl2Value.put(threadLocal, threadLocal.copyValue());}return ttl2Value;}/*** 捕获注册的ThreadLocal的值,也就是原本线程中的ThreadLocal,可以注册到TTL中在* 进行线程池本地变量传递时也会被传递。*/private static HashMapThreadLocalObject, Object captureThreadLocalValues() {final HashMapThreadLocalObject, Object threadLocal2Value new HashMapThreadLocalObject, Object();for(Map.EntryThreadLocalObject,TtlCopierObjectentry:threadLocalHolder.entrySet()){final ThreadLocalObject threadLocal entry.getKey();final TtlCopierObject copier entry.getValue();threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));}return threadLocal2Value;}/*** 将捕获到的本地变量进行替换子线程的本地变量并且返回子线程现有的本地变量副本backup。* 用于在执行run/call方法之后将本地变量副本恢复。*/NonNullpublic static Object replay(NonNull Object captured) {final Snapshot capturedSnapshot (Snapshot) captured;return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value));}/*** 替换TransmittableThreadLocal*/NonNullprivate static HashMapTransmittableThreadLocalObject, Object replayTtlValues(NonNull HashMapTransmittableThreadLocalObject, Object captured) {// 创建副本backupHashMapTransmittableThreadLocalObject, Object backup new HashMapTransmittableThreadLocalObject, Object();for (final IteratorTransmittableThreadLocalObject iterator holder.get().keySet().iterator(); iterator.hasNext(); ) {TransmittableThreadLocalObject threadLocal iterator.next();// 对当前线程的本地变量进行副本拷贝backup.put(threadLocal, threadLocal.get());// 若出现调用线程中不存在某个线程变量而线程池中线程有则删除线程池中对应的本地变量if (!captured.containsKey(threadLocal)) {iterator.remove();threadLocal.superRemove();}}// 将捕获的TTL值打入线程池获取到的线程TTL中。setTtlValuesTo(captured);// 是一个扩展点调用TTL的beforeExecute方法。默认实现为空doExecuteCallback(true);return backup;}private static HashMapThreadLocalObject, Object replayThreadLocalValues(NonNull HashMapThreadLocalObject, Object captured) {final HashMapThreadLocalObject, Object backup new HashMapThreadLocalObject, Object();for (Map.EntryThreadLocalObject, Object entry : captured.entrySet()) {final ThreadLocalObject threadLocal entry.getKey();backup.put(threadLocal, threadLocal.get());final Object value entry.getValue();if (value threadLocalClearMark) threadLocal.remove();else threadLocal.set(value);}return backup;}/*** 清除单线线程的所有TTL和TL并返回清除之气的backup*/NonNullpublic static Object clear() {final HashMapTransmittableThreadLocalObject, Object ttl2Value new HashMapTransmittableThreadLocalObject, Object();final HashMapThreadLocalObject, Object threadLocal2Value new HashMapThreadLocalObject, Object();for(Map.EntryThreadLocalObject,TtlCopierObjectentry:threadLocalHolder.entrySet()){final ThreadLocalObject threadLocal entry.getKey();threadLocal2Value.put(threadLocal, threadLocalClearMark);}return replay(new Snapshot(ttl2Value, threadLocal2Value));}/*** 还原*/public static void restore(NonNull Object backup) {final Snapshot backupSnapshot (Snapshot) backup;restoreTtlValues(backupSnapshot.ttl2Value);restoreThreadLocalValues(backupSnapshot.threadLocal2Value);}private static void restoreTtlValues(NonNull HashMapTransmittableThreadLocalObject, Object backup) {// 扩展点调用TTL的afterExecutedoExecuteCallback(false);for (final IteratorTransmittableThreadLocalObject iterator holder.get().keySet().iterator(); iterator.hasNext(); ) {TransmittableThreadLocalObject threadLocal iterator.next();if (!backup.containsKey(threadLocal)) {iterator.remove();threadLocal.superRemove();}}// 将本地变量恢复成备份版本setTtlValuesTo(backup);}private static void setTtlValuesTo(NonNull HashMapTransmittableThreadLocalObject, Object ttlValues) {for (Map.EntryTransmittableThreadLocalObject, Object entry : ttlValues.entrySet()) {TransmittableThreadLocalObject threadLocal entry.getKey();threadLocal.set(entry.getValue());}}private static void restoreThreadLocalValues(NonNull HashMapThreadLocalObject, Object backup) {for (Map.EntryThreadLocalObject, Object entry : backup.entrySet()) {final ThreadLocalObject threadLocal entry.getKey();threadLocal.set(entry.getValue());}}/*** 快照类保存TTL和TL*/private static class Snapshot {final HashMapTransmittableThreadLocalObject, Object ttl2Value;final HashMapThreadLocalObject, Object threadLocal2Value;private Snapshot(HashMapTransmittableThreadLocalObject, Object ttl2Value,HashMapThreadLocalObject, Object threadLocal2Value) {this.ttl2Value ttl2Value;this.threadLocal2Value threadLocal2Value;}}进入TtlCallable#call()方法。
Override
public V call() throws Exception {Object captured capturedRef.get();if (captured null || releaseTtlValueReferenceAfterCall !capturedRef.compareAndSet(captured, null)) {throw new IllegalStateException(TTL value reference is released after call!);}// 调用replay方法将捕获到的当前线程的本地变量传递给线程池线程的本地变量// 并且获取到线程池线程覆盖之前的本地变量副本。Object backup replay(captured);try {// 线程方法调用return callable.call();} finally {// 使用副本进行恢复。restore(backup);}
}到这基本上线程池方式传递本地变量的核心代码已经大概看完了。总的来说在创建TtlCallable对象是调用capture()方法捕获调用方的本地线程变量在call()执行时将捕获到的线程变量替换到线程池所对应获取到的线程的本地变量中并且在执行完成之后将其本地变量恢复到调用之前。
总结
上述列举了4种方案这里推荐方案2和方案4其中两种方案的缺点非常明显实际开发中也是采用的方案2或者方案4