Spring Boot 自定义线程池实现异步开发相信看过陈某的文章都了解,但是在实际开发中需要在父子线程之间传递一些数据,比如用户信息,链路信息等等
比如用户登录信息使用ThreadLocal存放保证线程隔离,代码如下:
/** *@description用户上下文信息 */ publicclassOauthContext{ privatestaticfinalThreadLocalloginValThreadLocal=newThreadLocal<>(); publicstaticLoginValget(){ returnloginValThreadLocal.get(); } publicstaticvoidset(LoginValloginVal){ loginValThreadLocal.set(loginVal); } publicstaticvoidclear(){ loginValThreadLocal.remove(); } }
那么子线程想要获取这个LoginVal如何做呢?
今天就来介绍几种优雅的方式实现Spring Boot 内部的父子线程的数据传递。

1. 手动设置
每执行一次异步线程都要分为两步:
获取父线程的LoginVal
将LoginVal设置到子线程,达到复用
代码如下:
publicvoidhandlerAsync(){
//1.获取父线程的loginVal
LoginValloginVal=OauthContext.get();
log.info("父线程的值:{}",OauthContext.get());
CompletableFuture.runAsync(()->{
//2.设置子线程的值,复用
OauthContext.set(loginVal);
log.info("子线程的值:{}",OauthContext.get());
});
}
虽然能够实现目的,但是每次开异步线程都需要手动设置,重复代码太多,看了头疼,你认为优雅吗?
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://github.com/YunaiV/ruoyi-vue-pro
2. 线程池设置TaskDecorator
TaskDecorator是什么?官方api的大致意思:这是一个执行回调方法的装饰器,主要应用于传递上下文,或者提供任务的监控/统计信息。
知道有这么一个东西,如何去使用?
TaskDecorator是一个接口,首先需要去实现它,代码如下:
/**
*@description上下文装饰器
*/
publicclassContextTaskDecoratorimplementsTaskDecorator{
@Override
publicRunnabledecorate(Runnablerunnable){
//获取父线程的loginVal
LoginValloginVal=OauthContext.get();
return()->{
try{
//将主线程的请求信息,设置到子线程中
OauthContext.set(loginVal);
//执行子线程,这一步不要忘了
runnable.run();
}finally{
//线程结束,清空这些信息,否则可能造成内存泄漏
OauthContext.clear();
}
};
}
}
这里我只是设置了LoginVal,实际开发中其他的共享数据,比如SecurityContext,RequestAttributes....
TaskDecorator需要结合线程池使用,实际开发中异步线程建议使用线程池,只需要在对应的线程池配置一下,代码如下:
@Bean("taskExecutor")
publicThreadPoolTaskExecutortaskExecutor(){
ThreadPoolTaskExecutorpoolTaskExecutor=newThreadPoolTaskExecutor();
poolTaskExecutor.setCorePoolSize(xx);
poolTaskExecutor.setMaxPoolSize(xx);
//设置线程活跃时间(秒)
poolTaskExecutor.setKeepAliveSeconds(xx);
//设置队列容量
poolTaskExecutor.setQueueCapacity(xx);
//设置TaskDecorator,用于解决父子线程间的数据复用
poolTaskExecutor.setTaskDecorator(newContextTaskDecorator());
poolTaskExecutor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());
//等待所有任务结束后再关闭线程池
poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
returnpoolTaskExecutor;
}
此时业务代码就不需要去设置子线程的值,直接使用即可,代码如下:
publicvoidhandlerAsync(){
log.info("父线程的用户信息:{}",OauthContext.get());
//执行异步任务,需要指定的线程池
CompletableFuture.runAsync(()->log.info("子线程的用户信息:{}",OauthContext.get()),taskExecutor);
}
来看一下结果,如下图:

这里使用的是CompletableFuture执行异步任务,使用@Async这个注解同样是可行的。
注意 :无论使用何种方式,都需要指定线程池
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://github.com/YunaiV/yudao-cloud
3. InheritableThreadLocal
这种方案不建议使用,InheritableThreadLocal虽然能够实现父子线程间的复用,但是在线程池中使用会存在复用的问题。
这种方案使用也是非常简单,直接用InheritableThreadLocal替换ThreadLocal即可,代码如下:
/**
*@description用户上下文信息
*/
publicclassOauthContext{
privatestaticfinalInheritableThreadLocalloginValThreadLocal=newInheritableThreadLocal<>();
publicstaticLoginValget(){
returnloginValThreadLocal.get();
}
publicstaticvoidset(LoginValloginVal){
loginValThreadLocal.set(loginVal);
}
publicstaticvoidclear(){
loginValThreadLocal.remove();
}
}
4. TransmittableThreadLocal
TransmittableThreadLocal是阿里开源的工具,弥补了InheritableThreadLocal的缺陷,在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题。
使用起来也是非常简单,添加依赖如下:
com.alibaba transmittable-thread-local 2.14.2
OauthContext改造代码如下:
/**
*@description用户上下文信息
*/
publicclassOauthContext{
privatestaticfinalTransmittableThreadLocalloginValThreadLocal=newTransmittableThreadLocal<>();
publicstaticLoginValget(){
returnloginValThreadLocal.get();
}
publicstaticvoidset(LoginValloginVal){
loginValThreadLocal.set(loginVal);
}
publicstaticvoidclear(){
loginValThreadLocal.remove();
}
}
TransmittableThreadLocal原理
从定义来看,TransimittableThreadLocal继承于InheritableThreadLocal,并实现TtlCopier接口,它里面只有一个copy方法。所以主要是对InheritableThreadLocal的扩展。
publicclassTransmittableThreadLocalextendsInheritableThreadLocal implementsTtlCopier
在TransimittableThreadLocal中添加holder属性。这个属性的作用就是被标记为具备线程传递资格的对象都会被添加到这个对象中。
要标记一个类,比较容易想到的方式,就是给这个类新增一个Type字段,还有一个方法就是将具备这种类型的的对象都添加到一个静态全局集合中。之后使用时,这个集合里的所有值都具备这个标记。
//1.holder本身是一个InheritableThreadLocal对象 //2.这个holder对象的value是WeakHashMap,?> //2.1WeekHashMap的value总是null,且不可能被使用。 //2.2WeekHasshMap支持value=null privatestaticInheritableThreadLocal ,?>>holder=newInheritableThreadLocal ,?>>(){ @Override protectedWeakHashMap ,?>initialValue(){ returnnewWeakHashMap ,Object>(); } /** *重写了childValue方法,实现上直接将父线程的属性作为子线程的本地变量对象。 */ @Override protectedWeakHashMap ,?>childValue(WeakHashMap ,?>parentValue){ returnnewWeakHashMap ,Object>(parentValue); } };
应用代码是通过TtlExecutors工具类对线程池对象进行包装。工具类只是简单的判断,输入的线程池是否已经被包装过、非空校验等,然后返回包装类ExecutorServiceTtlWrapper。根据不同的线程池类型,有不同和的包装类。
@Nullable
publicstaticExecutorServicegetTtlExecutorService(@NullableExecutorServiceexecutorService){
if(TtlAgent.isTtlAgentLoaded()||executorService==null||executorServiceinstanceofTtlEnhanced){
returnexecutorService;
}
returnnewExecutorServiceTtlWrapper(executorService);
}
进入包装类ExecutorServiceTtlWrapper。可以注意到不论是通过ExecutorServiceTtlWrapper#submit方法或者是ExecutorTtlWrapper#execute方法,都会将线程对象包装成TtlCallable或者TtlRunnable,用于在真正执行run方法前做一些业务逻辑。
/** *在ExecutorServiceTtlWrapper实现submit方法 */ @NonNull @Override publicFuture submit(@NonNullCallable task){ returnexecutorService.submit(TtlCallable.get(task)); } /** *在ExecutorTtlWrapper实现execute方法 */ @Override publicvoidexecute(@NonNullRunnablecommand){ executor.execute(TtlRunnable.get(command)); }
所以,重点的核心逻辑应该是在TtlCallable#call()或者TtlRunnable#run()中。以下以TtlCallable为例,TtlRunnable同理类似。在分析call()方法之前,先看一个类Transmitter
publicstaticclassTransmitter{
/**
*捕获当前线程中的是所有TransimittableThreadLocal和注册ThreadLocal的值。
*/
@NonNull
publicstaticObjectcapture(){
returnnewSnapshot(captureTtlValues(),captureThreadLocalValues());
}
/**
*捕获TransimittableThreadLocal的值,将holder中的所有值都添加到HashMap后返回。
*/
privatestaticHashMap,Object>captureTtlValues(){
HashMap,Object>ttl2Value=
newHashMap,Object>();
for(TransmittableThreadLocal
进入TtlCallable#call()方法。
@Override
publicVcall()throwsException{
Objectcaptured=capturedRef.get();
if(captured==null||releaseTtlValueReferenceAfterCall&&
!capturedRef.compareAndSet(captured,null)){
thrownewIllegalStateException("TTLvaluereferenceisreleasedaftercall!");
}
//调用replay方法将捕获到的当前线程的本地变量,传递给线程池线程的本地变量,
//并且获取到线程池线程覆盖之前的本地变量副本。
Objectbackup=replay(captured);
try{
//线程方法调用
returncallable.call();
}finally{
//使用副本进行恢复。
restore(backup);
}
}
到这基本上线程池方式传递本地变量的核心代码已经大概看完了。总的来说在创建TtlCallable对象是,调用capture()方法捕获调用方的本地线程变量,在call()执行时,将捕获到的线程变量,替换到线程池所对应获取到的线程的本地变量中,并且在执行完成之后,将其本地变量恢复到调用之前。
总结
上述列举了4种方案,陈某这里推荐方案2和方案4,其中两种方案的缺点非常明显,实际开发中也是采用的方案2或者方案4。
-
接口
+关注
关注
33文章
9444浏览量
156143 -
spring
+关注
关注
0文章
341浏览量
15776 -
Boot
+关注
关注
0文章
154浏览量
37495 -
线程
+关注
关注
0文章
508浏览量
20759 -
数据传递
+关注
关注
1文章
3浏览量
1838
原文标题:用这4招 优雅的实现Spring Boot 异步线程间数据传递
文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。
发布评论请先 登录
Spring Boot如何实现异步任务
Spring Boot虚拟线程和Webflux性能对比
LabVIEW多线程编程数据传递教程
请问C6678核间数据传递方式是什么?为什么是这样?
启动Spring Boot项目应用的三种方法
基于Spring Cloud和Euraka的优雅下线以及灰度发布
Spring Boot Web相关的基础知识
简述Spring Boot数据校验
Spring Boot如何优雅实现数据加密存储、模糊匹配和脱敏
Spring Boot Actuator快速入门
Spring Boot启动 Eureka流程
Spring Boot的启动原理

用这4招 优雅的实现Spring Boot异步线程间数据传递
评论