一、功能说明
SpringBoot的定时任务的加强工具,实现对SpringBoot原生的定时任务进行动态管理,完全兼容原生@Scheduled注解,无需对原本的定时任务进行修改
二、快速使用
具体的功能已经封装成SpringBoot-starter即插即用
com.github.guoyixing spring-boot-starter-super-scheduled 0.3.1
三、实现原理
1、动态管理实现
(1) 配置管理介绍
@Component("superScheduledConfig")
publicclassSuperScheduledConfig{
/**
*执行定时任务的线程池
*/
privateThreadPoolTaskSchedulertaskScheduler;
/**
*定时任务名称与定时任务回调钩子的关联关系容器
*/
privateMapnameToScheduledFuture=newConcurrentHashMap<>();
/**
*定时任务名称与定时任务需要执行的逻辑的关联关系容器
*/
privateMapnameToRunnable=newConcurrentHashMap<>();
/**
*定时任务名称与定时任务的源信息的关联关系容器
*/
privateMapnameToScheduledSource=newConcurrentHashMap<>();
/*普通的get/sets省略*/
}
(2) 使用后处理器拦截SpringBoot原本的定时任务
实现ApplicationContextAware接口拿到SpringBoot的上下文
实现BeanPostProcessor接口,将这个类标记为后处理器,后处理器会在每个bean实例化之后执行
使用@DependsOn注解强制依赖SuperScheduledConfig类,让SpringBoot实例化SuperScheduledPostProcessor类之前先实例化SuperScheduledConfig类
主要实现逻辑在postProcessAfterInitialization()方法中

@DependsOn({"superScheduledConfig"})
@Component
@Order
publicclassSuperScheduledPostProcessorimplementsBeanPostProcessor,ApplicationContextAware{
protectedfinalLoglogger=LogFactory.getLog(getClass());
privateApplicationContextapplicationContext;
/**
*实例化bean之前的操作
*@parambeanbean实例
*@parambeanNamebean的Name
*/
@Override
publicObjectpostProcessBeforeInitialization(Objectbean,StringbeanName)throwsBeansException{
returnbean;
}
/**
*实例化bean之后的操作
*@parambeanbean实例
*@parambeanNamebean的Name
*/
@Override
publicObjectpostProcessAfterInitialization(Objectbean,
StringbeanName)throwsBeansException{
//1.获取配置管理器
SuperScheduledConfigsuperScheduledConfig=applicationContext.getBean(SuperScheduledConfig.class);
//2.获取当前实例化完成的bean的所有方法
Method[]methods=bean.getClass().getDeclaredMethods();
//循环处理对每个方法逐一处理
if(methods.length>0){
for(Methodmethod:methods){
//3.尝试在该方法上获取@Scheduled注解(SpringBoot的定时任务注解)
Scheduledannotation=method.getAnnotation(Scheduled.class);
//如果无法获取到@Scheduled注解,就跳过这个方法
if(annotation==null){
continue;
}
//4.创建定时任务的源属性
//创建定时任务的源属性(用来记录定时任务的配置,初始化的时候记录的是注解上原本的属性)
ScheduledSourcescheduledSource=newScheduledSource(annotation,method,bean);
//对注解上获取到源属性中的属性进行检测
if(!scheduledSource.check()){
thrownewSuperScheduledException("在"+beanName+"Bean中"+method.getName()+"方法的注解参数错误");
}
//生成定时任务的名称(id),使用beanName+“.”+方法名
Stringname=beanName+"."+method.getName();
//将以key-value的形式,将源数据存入配置管理器中,key:定时任务的名称value:源数据
superScheduledConfig.addScheduledSource(name,scheduledSource);
try{
//5.将原本SpringBoot的定时任务取消掉
clearOriginalScheduled(annotation);
}catch(Exceptione){
thrownewSuperScheduledException("在关闭原始方法"+beanName+method.getName()+"时出现错误");
}
}
}
//最后bean保持原有返回
returnbean;
}
/**
*修改注解原先的属性
*@paramannotation注解实例对象
*@throwsException
*/
privatevoidclearOriginalScheduled(Scheduledannotation)throwsException{
changeAnnotationValue(annotation,"cron",Scheduled.CRON_DISABLED);
changeAnnotationValue(annotation,"fixedDelay",-1L);
changeAnnotationValue(annotation,"fixedDelayString","");
changeAnnotationValue(annotation,"fixedRate",-1L);
changeAnnotationValue(annotation,"fixedRateString","");
changeAnnotationValue(annotation,"initialDelay",-1L);
changeAnnotationValue(annotation,"initialDelayString","");
}
/**
*获取SpringBoot的上下文
*@paramapplicationContextSpringBoot的上下文
*/
@Override
publicvoidsetApplicationContext(ApplicationContextapplicationContext)throwsBeansException{
this.applicationContext=applicationContext;
}
}
(3) 使用ApplicationRunner初始化自定义的定时任务运行器
实现ApplicationContextAware接口拿到SpringBoot的上下文
使用@DependsOn注解强制依赖threadPoolTaskScheduler类
实现ApplicationRunner接口,在所有bean初始化结束之后,运行自定义逻辑
主要实现逻辑在run()方法中

@DependsOn("threadPoolTaskScheduler")
@Component
publicclassSuperScheduledApplicationRunnerimplementsApplicationRunner,ApplicationContextAware{
protectedfinalLoglogger=LogFactory.getLog(getClass());
privateDateTimeFormatterdf=DateTimeFormatter.ofPattern("yyyy-MM-ddHHss");
privateApplicationContextapplicationContext;
/**
*定时任务配置管理器
*/
@Autowired
privateSuperScheduledConfigsuperScheduledConfig;
/**
*定时任务执行线程
*/
@Autowired
privateThreadPoolTaskSchedulerthreadPoolTaskScheduler;
@Override
publicvoidrun(ApplicationArgumentsargs){
//1.定时任务配置管理器中缓存定时任务执行线程
superScheduledConfig.setTaskScheduler(threadPoolTaskScheduler);
//2.获取所有定时任务源数据
MapnameToScheduledSource=superScheduledConfig.getNameToScheduledSource();
//逐一处理定时任务
for(Stringname:nameToScheduledSource.keySet()){
//3.获取定时任务源数据
ScheduledSourcescheduledSource=nameToScheduledSource.get(name);
//4.获取所有增强类
String[]baseStrengthenBeanNames=applicationContext.getBeanNamesForType(BaseStrengthen.class);
//5.创建执行控制器
SuperScheduledRunnablerunnable=newSuperScheduledRunnable();
//配置执行控制器
runnable.setMethod(scheduledSource.getMethod());
runnable.setBean(scheduledSource.getBean());
//6.逐一处理增强类(增强器实现原理后面具体分析)
Listpoints=newArrayList<>(baseStrengthenBeanNames.length);
for(StringbaseStrengthenBeanName:baseStrengthenBeanNames){
//7.将增强器代理成point
ObjectbaseStrengthenBean=applicationContext.getBean(baseStrengthenBeanName);
//创建代理
Pointproxy=ProxyUtils.getInstance(Point.class,newRunnableBaseInterceptor(baseStrengthenBean,runnable));
proxy.setSuperScheduledName(name);
//8.所有的points连成起来
points.add(proxy);
}
//将point形成调用链
runnable.setChain(newChain(points));
//将执行逻辑封装并缓存到定时任务配置管理器中
superScheduledConfig.addRunnable(name,runnable::invoke);
try{
//8.启动定时任务
ScheduledFuture>schedule=ScheduledFutureFactory.create(threadPoolTaskScheduler
,scheduledSource,runnable::invoke);
//将线程回调钩子存到任务配置管理器中
superScheduledConfig.addScheduledFuture(name,schedule);
logger.info(df.format(LocalDateTime.now())+"任务"+name+"已经启动...");
}catch(Exceptione){
thrownewSuperScheduledException("任务"+name+"启动失败,错误信息:"+e.getLocalizedMessage());
}
}
}
@Override
publicvoidsetApplicationContext(ApplicationContextapplicationContext)throwsBeansException{
this.applicationContext=applicationContext;
}
}
(4) 进行动态管理
@Component
publicclassSuperScheduledManager{
protectedfinalLoglogger=LogFactory.getLog(getClass());
privateDateTimeFormatterdf=DateTimeFormatter.ofPattern("yyyy-MM-ddHHss");
@Autowired
privateSuperScheduledConfigsuperScheduledConfig;
/**
*修改Scheduled的执行周期
*
*@paramnamescheduled的名称
*@paramcroncron表达式
*/
publicvoidsetScheduledCron(Stringname,Stringcron){
//终止原先的任务
cancelScheduled(name);
//创建新的任务
ScheduledSourcescheduledSource=superScheduledConfig.getScheduledSource(name);
scheduledSource.clear();
scheduledSource.setCron(cron);
addScheduled(name,scheduledSource);
}
/**
*修改Scheduled的fixedDelay
*
*@paramnamescheduled的名称
*@paramfixedDelay上一次执行完毕时间点之后多长时间再执行
*/
publicvoidsetScheduledFixedDelay(Stringname,LongfixedDelay){
//终止原先的任务
cancelScheduled(name);
//创建新的任务
ScheduledSourcescheduledSource=superScheduledConfig.getScheduledSource(name);
scheduledSource.clear();
scheduledSource.setFixedDelay(fixedDelay);
addScheduled(name,scheduledSource);
}
/**
*修改Scheduled的fixedRate
*
*@paramnamescheduled的名称
*@paramfixedRate上一次开始执行之后多长时间再执行
*/
publicvoidsetScheduledFixedRate(Stringname,LongfixedRate){
//终止原先的任务
cancelScheduled(name);
//创建新的任务
ScheduledSourcescheduledSource=superScheduledConfig.getScheduledSource(name);
scheduledSource.clear();
scheduledSource.setFixedRate(fixedRate);
addScheduled(name,scheduledSource);
}
/**
*查询所有启动的Scheduled
*/
publicListgetRunScheduledName(){
Setnames=superScheduledConfig.getNameToScheduledFuture().keySet();
returnnewArrayList<>(names);
}
/**
*查询所有的Scheduled
*/
publicListgetAllSuperScheduledName(){
Setnames=superScheduledConfig.getNameToRunnable().keySet();
returnnewArrayList<>(names);
}
/**
*终止Scheduled
*
*@paramnamescheduled的名称
*/
publicvoidcancelScheduled(Stringname){
ScheduledFuturescheduledFuture=superScheduledConfig.getScheduledFuture(name);
scheduledFuture.cancel(true);
superScheduledConfig.removeScheduledFuture(name);
logger.info(df.format(LocalDateTime.now())+"任务"+name+"已经终止...");
}
/**
*启动Scheduled
*
*@paramnamescheduled的名称
*@paramscheduledSource定时任务的源信息
*/
publicvoidaddScheduled(Stringname,ScheduledSourcescheduledSource){
if(getRunScheduledName().contains(name)){
thrownewSuperScheduledException("定时任务"+name+"已经被启动过了");
}
if(!scheduledSource.check()){
thrownewSuperScheduledException("定时任务"+name+"源数据内容错误");
}
scheduledSource.refreshType();
Runnablerunnable=superScheduledConfig.getRunnable(name);
ThreadPoolTaskSchedulertaskScheduler=superScheduledConfig.getTaskScheduler();
ScheduledFuture>schedule=ScheduledFutureFactory.create(taskScheduler,scheduledSource,runnable);
logger.info(df.format(LocalDateTime.now())+"任务"+name+"已经启动...");
superScheduledConfig.addScheduledSource(name,scheduledSource);
superScheduledConfig.addScheduledFuture(name,schedule);
}
/**
*以cron类型启动Scheduled
*
*@paramnamescheduled的名称
*@paramcroncron表达式
*/
publicvoidaddCronScheduled(Stringname,Stringcron){
ScheduledSourcescheduledSource=newScheduledSource();
scheduledSource.setCron(cron);
addScheduled(name,scheduledSource);
}
/**
*以fixedDelay类型启动Scheduled
*
*@paramnamescheduled的名称
*@paramfixedDelay上一次执行完毕时间点之后多长时间再执行
*@paraminitialDelay第一次执行的延迟时间
*/
publicvoidaddFixedDelayScheduled(Stringname,LongfixedDelay,Long...initialDelay){
ScheduledSourcescheduledSource=newScheduledSource();
scheduledSource.setFixedDelay(fixedDelay);
if(initialDelay!=null&&initialDelay.length==1){
scheduledSource.setInitialDelay(initialDelay[0]);
}elseif(initialDelay!=null&&initialDelay.length>1){
thrownewSuperScheduledException("第一次执行的延迟时间只能传入一个参数");
}
addScheduled(name,scheduledSource);
}
/**
*以fixedRate类型启动Scheduled
*
*@paramnamescheduled的名称
*@paramfixedRate上一次开始执行之后多长时间再执行
*@paraminitialDelay第一次执行的延迟时间
*/
publicvoidaddFixedRateScheduled(Stringname,LongfixedRate,Long...initialDelay){
ScheduledSourcescheduledSource=newScheduledSource();
scheduledSource.setFixedRate(fixedRate);
if(initialDelay!=null&&initialDelay.length==1){
scheduledSource.setInitialDelay(initialDelay[0]);
}elseif(initialDelay!=null&&initialDelay.length>1){
thrownewSuperScheduledException("第一次执行的延迟时间只能传入一个参数");
}
addScheduled(name,scheduledSource);
}
/**
*手动执行一次任务
*
*@paramnamescheduled的名称
*/
publicvoidrunScheduled(Stringname){
Runnablerunnable=superScheduledConfig.getRunnable(name);
runnable.run();
}
}
2、增强接口实现
增强器实现的整体思路与SpringAop的思路一致,实现没有Aop复杂
(1) 增强接口
@Order(Ordered.HIGHEST_PRECEDENCE)
publicinterfaceBaseStrengthen{
/**
*前置强化方法
*
*@parambeanbean实例(或者是被代理的bean)
*@parammethod执行的方法对象
*@paramargs方法参数
*/
voidbefore(Objectbean,Methodmethod,Object[]args);
/**
*后置强化方法
*出现异常不会执行
*如果未出现异常,在afterFinally方法之后执行
*
*@parambeanbean实例(或者是被代理的bean)
*@parammethod执行的方法对象
*@paramargs方法参数
*/
voidafter(Objectbean,Methodmethod,Object[]args);
/**
*异常强化方法
*
*@parambeanbean实例(或者是被代理的bean)
*@parammethod执行的方法对象
*@paramargs方法参数
*/
voidexception(Objectbean,Methodmethod,Object[]args);
/**
*Finally强化方法,出现异常也会执行
*
*@parambeanbean实例(或者是被代理的bean)
*@parammethod执行的方法对象
*@paramargs方法参数
*/
voidafterFinally(Objectbean,Methodmethod,Object[]args);
}
(2) 代理抽象类
publicabstractclassPoint{
/**
*定时任务名
*/
privateStringsuperScheduledName;
/**
*抽象的执行方法,使用代理实现
*@paramrunnable定时任务执行器
*/
publicabstractObjectinvoke(SuperScheduledRunnablerunnable);
/*普通的get/sets省略*/
}
(3) 调用链类
publicclassChain{
privateListlist;
privateintindex=-1;
/**
*索引自增1
*/
publicintincIndex(){
return++index;
}
/**
*索引还原
*/
publicvoidresetIndex(){
this.index=-1;
}
}
(4) cglib动态代理实现
使用cglib代理增强器,将增强器全部代理成调用链节点Point
publicclassRunnableBaseInterceptorimplementsMethodInterceptor{
/**
*定时任务执行器
*/
privateSuperScheduledRunnablerunnable;
/**
*定时任务增强类
*/
privateBaseStrengthenstrengthen;
@Override
publicObjectintercept(Objectobj,Methodmethod,Object[]args,MethodProxymethodProxy)throwsThrowable{
Objectresult;
//如果执行的是invoke()方法
if("invoke".equals(method.getName())){
//前置强化方法
strengthen.before(obj,method,args);
try{
//调用执行器中的invoke()方法
result=runnable.invoke();
}catch(Exceptione){
//异常强化方法
strengthen.exception(obj,method,args);
thrownewSuperScheduledException(strengthen.getClass()+"中强化执行时发生错误",e);
}finally{
//Finally强化方法,出现异常也会执行
strengthen.afterFinally(obj,method,args);
}
//后置强化方法
strengthen.after(obj,method,args);
}else{
//直接执行方法
result=methodProxy.invokeSuper(obj,args);
}
returnresult;
}
publicRunnableBaseInterceptor(Objectobject,SuperScheduledRunnablerunnable){
this.runnable=runnable;
if(BaseStrengthen.class.isAssignableFrom(object.getClass())){
this.strengthen=(BaseStrengthen)object;
}else{
thrownewSuperScheduledException(object.getClass()+"对象不是BaseStrengthen类型");
}
}
publicRunnableBaseInterceptor(){
}
}
(5) 定时任务执行器实现
publicclassSuperScheduledRunnable{
/**
*原始的方法
*/
privateMethodmethod;
/**
*方法所在的bean
*/
privateObjectbean;
/**
*增强器的调用链
*/
privateChainchain;
publicObjectinvoke(){
Objectresult;
//索引自增1
if(chain.incIndex()==chain.getList().size()){
//调用链中的增强方法已经全部执行结束
try{
//调用链索引初始化
chain.resetIndex();
//增强器全部执行完毕,执行原本的方法
result=method.invoke(bean);
}catch(IllegalAccessException|InvocationTargetExceptione){
thrownewSuperScheduledException(e.getLocalizedMessage());
}
}else{
//获取被代理后的方法增强器
Pointpoint=chain.getList().get(chain.getIndex());
//执行增强器代理
//增强器代理中,会回调方法执行器,形成调用链,逐一运行调用链中的增强器
result=point.invoke(this);
}
returnresult;
}
/*普通的get/sets省略*/
}
(6) 增强器代理逻辑
com.gyx.superscheduled.core.SuperScheduledApplicationRunner类中的代码片段
//创建执行控制器 SuperScheduledRunnablerunnable=newSuperScheduledRunnable(); runnable.setMethod(scheduledSource.getMethod()); runnable.setBean(scheduledSource.getBean()); //用来存放增强器的代理对象 Listpoints=newArrayList<>(baseStrengthenBeanNames.length); //循环所有的增强器的beanName for(StringbaseStrengthenBeanName:baseStrengthenBeanNames){ //获取增强器的bean对象 ObjectbaseStrengthenBean=applicationContext.getBean(baseStrengthenBeanName); //将增强器代理成Point节点 Pointproxy=ProxyUtils.getInstance(Point.class,newRunnableBaseInterceptor(baseStrengthenBean,runnable)); proxy.setSuperScheduledName(name); //增强器的代理对象缓存到list中 points.add(proxy); } //将增强器代理实例的集合生成调用链 //执行控制器中设置调用链 runnable.setChain(newChain(points));
审核编辑:刘清
-
处理器
+关注
关注
68文章
20148浏览量
247035 -
控制器
+关注
关注
114文章
17638浏览量
190205 -
增强器
+关注
关注
1文章
49浏览量
8700 -
SpringBoot
+关注
关注
0文章
177浏览量
628
原文标题:SpringBoot 定时任务动态管理通用解决方案
文章出处:【微信号:AndroidPush,微信公众号:Android编程精选】欢迎添加关注!文章转载请注明出处。
发布评论请先 登录

求一种SpringBoot定时任务动态管理通用解决方案
评论