一、功能说明
SpringBoot的定时任务的加强工具,实现对SpringBoot原生的定时任务进行动态管理,完全兼容原生@Scheduled注解,无需对原本的定时任务进行修改
二、快速使用
具体的功能已经封装成SpringBoot-starter即插即用
com.github.guoyixing spring-boot-starter-super-scheduled 0.3.1
三、实现原理
1、动态管理实现
(1) 配置管理介绍
@Component("superScheduledConfig") publicclassSuperScheduledConfig{ /** *执行定时任务的线程池 */ privateThreadPoolTaskSchedulertaskScheduler; /** *定时任务名称与定时任务回调钩子的关联关系容器 */ privateMapnameToScheduledFuture=newConcurrentHashMap<>(); /** *定时任务名称与定时任务需要执行的逻辑的关联关系容器 */ privateMap nameToRunnable=newConcurrentHashMap<>(); /** *定时任务名称与定时任务的源信息的关联关系容器 */ privateMap nameToScheduledSource=newConcurrentHashMap<>(); /*普通的get/sets省略*/ }
(2) 使用后处理器拦截SpringBoot原本的定时任务
实现ApplicationContextAware接口拿到SpringBoot的上下文
实现BeanPostProcessor接口,将这个类标记为后处理器,后处理器会在每个bean实例化之后执行
使用@DependsOn注解强制依赖SuperScheduledConfig类,让SpringBoot实例化SuperScheduledPostProcessor类之前先实例化SuperScheduledConfig类
主要实现逻辑在postProcessAfterInitialization()方法中
@DependsOn({"superScheduledConfig"}) @Component @Order publicclassSuperScheduledPostProcessorimplementsBeanPostProcessor,ApplicationContextAware{ protectedfinalLoglogger=LogFactory.getLog(getClass()); privateApplicationContextapplicationContext; /** *实例化bean之前的操作 *@parambeanbean实例 *@parambeanNamebean的Name */ @Override publicObjectpostProcessBeforeInitialization(Objectbean,StringbeanName)throwsBeansException{ returnbean; } /** *实例化bean之后的操作 *@parambeanbean实例 *@parambeanNamebean的Name */ @Override publicObjectpostProcessAfterInitialization(Objectbean, StringbeanName)throwsBeansException{ //1.获取配置管理器 SuperScheduledConfigsuperScheduledConfig=applicationContext.getBean(SuperScheduledConfig.class); //2.获取当前实例化完成的bean的所有方法 Method[]methods=bean.getClass().getDeclaredMethods(); //循环处理对每个方法逐一处理 if(methods.length>0){ for(Methodmethod:methods){ //3.尝试在该方法上获取@Scheduled注解(SpringBoot的定时任务注解) Scheduledannotation=method.getAnnotation(Scheduled.class); //如果无法获取到@Scheduled注解,就跳过这个方法 if(annotation==null){ continue; } //4.创建定时任务的源属性 //创建定时任务的源属性(用来记录定时任务的配置,初始化的时候记录的是注解上原本的属性) ScheduledSourcescheduledSource=newScheduledSource(annotation,method,bean); //对注解上获取到源属性中的属性进行检测 if(!scheduledSource.check()){ thrownewSuperScheduledException("在"+beanName+"Bean中"+method.getName()+"方法的注解参数错误"); } //生成定时任务的名称(id),使用beanName+“.”+方法名 Stringname=beanName+"."+method.getName(); //将以key-value的形式,将源数据存入配置管理器中,key:定时任务的名称value:源数据 superScheduledConfig.addScheduledSource(name,scheduledSource); try{ //5.将原本SpringBoot的定时任务取消掉 clearOriginalScheduled(annotation); }catch(Exceptione){ thrownewSuperScheduledException("在关闭原始方法"+beanName+method.getName()+"时出现错误"); } } } //最后bean保持原有返回 returnbean; } /** *修改注解原先的属性 *@paramannotation注解实例对象 *@throwsException */ privatevoidclearOriginalScheduled(Scheduledannotation)throwsException{ changeAnnotationValue(annotation,"cron",Scheduled.CRON_DISABLED); changeAnnotationValue(annotation,"fixedDelay",-1L); changeAnnotationValue(annotation,"fixedDelayString",""); changeAnnotationValue(annotation,"fixedRate",-1L); changeAnnotationValue(annotation,"fixedRateString",""); changeAnnotationValue(annotation,"initialDelay",-1L); changeAnnotationValue(annotation,"initialDelayString",""); } /** *获取SpringBoot的上下文 *@paramapplicationContextSpringBoot的上下文 */ @Override publicvoidsetApplicationContext(ApplicationContextapplicationContext)throwsBeansException{ this.applicationContext=applicationContext; } }
(3) 使用ApplicationRunner初始化自定义的定时任务运行器
实现ApplicationContextAware接口拿到SpringBoot的上下文
使用@DependsOn注解强制依赖threadPoolTaskScheduler类
实现ApplicationRunner接口,在所有bean初始化结束之后,运行自定义逻辑
主要实现逻辑在run()方法中
@DependsOn("threadPoolTaskScheduler") @Component publicclassSuperScheduledApplicationRunnerimplementsApplicationRunner,ApplicationContextAware{ protectedfinalLoglogger=LogFactory.getLog(getClass()); privateDateTimeFormatterdf=DateTimeFormatter.ofPattern("yyyy-MM-ddHHss"); privateApplicationContextapplicationContext; /** *定时任务配置管理器 */ @Autowired privateSuperScheduledConfigsuperScheduledConfig; /** *定时任务执行线程 */ @Autowired privateThreadPoolTaskSchedulerthreadPoolTaskScheduler; @Override publicvoidrun(ApplicationArgumentsargs){ //1.定时任务配置管理器中缓存定时任务执行线程 superScheduledConfig.setTaskScheduler(threadPoolTaskScheduler); //2.获取所有定时任务源数据 MapnameToScheduledSource=superScheduledConfig.getNameToScheduledSource(); //逐一处理定时任务 for(Stringname:nameToScheduledSource.keySet()){ //3.获取定时任务源数据 ScheduledSourcescheduledSource=nameToScheduledSource.get(name); //4.获取所有增强类 String[]baseStrengthenBeanNames=applicationContext.getBeanNamesForType(BaseStrengthen.class); //5.创建执行控制器 SuperScheduledRunnablerunnable=newSuperScheduledRunnable(); //配置执行控制器 runnable.setMethod(scheduledSource.getMethod()); runnable.setBean(scheduledSource.getBean()); //6.逐一处理增强类(增强器实现原理后面具体分析) List points=newArrayList<>(baseStrengthenBeanNames.length); for(StringbaseStrengthenBeanName:baseStrengthenBeanNames){ //7.将增强器代理成point ObjectbaseStrengthenBean=applicationContext.getBean(baseStrengthenBeanName); //创建代理 Pointproxy=ProxyUtils.getInstance(Point.class,newRunnableBaseInterceptor(baseStrengthenBean,runnable)); proxy.setSuperScheduledName(name); //8.所有的points连成起来 points.add(proxy); } //将point形成调用链 runnable.setChain(newChain(points)); //将执行逻辑封装并缓存到定时任务配置管理器中 superScheduledConfig.addRunnable(name,runnable::invoke); try{ //8.启动定时任务 ScheduledFuture>schedule=ScheduledFutureFactory.create(threadPoolTaskScheduler ,scheduledSource,runnable::invoke); //将线程回调钩子存到任务配置管理器中 superScheduledConfig.addScheduledFuture(name,schedule); logger.info(df.format(LocalDateTime.now())+"任务"+name+"已经启动..."); }catch(Exceptione){ thrownewSuperScheduledException("任务"+name+"启动失败,错误信息:"+e.getLocalizedMessage()); } } } @Override publicvoidsetApplicationContext(ApplicationContextapplicationContext)throwsBeansException{ this.applicationContext=applicationContext; } }
(4) 进行动态管理
@Component publicclassSuperScheduledManager{ protectedfinalLoglogger=LogFactory.getLog(getClass()); privateDateTimeFormatterdf=DateTimeFormatter.ofPattern("yyyy-MM-ddHHss"); @Autowired privateSuperScheduledConfigsuperScheduledConfig; /** *修改Scheduled的执行周期 * *@paramnamescheduled的名称 *@paramcroncron表达式 */ publicvoidsetScheduledCron(Stringname,Stringcron){ //终止原先的任务 cancelScheduled(name); //创建新的任务 ScheduledSourcescheduledSource=superScheduledConfig.getScheduledSource(name); scheduledSource.clear(); scheduledSource.setCron(cron); addScheduled(name,scheduledSource); } /** *修改Scheduled的fixedDelay * *@paramnamescheduled的名称 *@paramfixedDelay上一次执行完毕时间点之后多长时间再执行 */ publicvoidsetScheduledFixedDelay(Stringname,LongfixedDelay){ //终止原先的任务 cancelScheduled(name); //创建新的任务 ScheduledSourcescheduledSource=superScheduledConfig.getScheduledSource(name); scheduledSource.clear(); scheduledSource.setFixedDelay(fixedDelay); addScheduled(name,scheduledSource); } /** *修改Scheduled的fixedRate * *@paramnamescheduled的名称 *@paramfixedRate上一次开始执行之后多长时间再执行 */ publicvoidsetScheduledFixedRate(Stringname,LongfixedRate){ //终止原先的任务 cancelScheduled(name); //创建新的任务 ScheduledSourcescheduledSource=superScheduledConfig.getScheduledSource(name); scheduledSource.clear(); scheduledSource.setFixedRate(fixedRate); addScheduled(name,scheduledSource); } /** *查询所有启动的Scheduled */ publicListgetRunScheduledName(){ Set names=superScheduledConfig.getNameToScheduledFuture().keySet(); returnnewArrayList<>(names); } /** *查询所有的Scheduled */ publicList getAllSuperScheduledName(){ Set names=superScheduledConfig.getNameToRunnable().keySet(); returnnewArrayList<>(names); } /** *终止Scheduled * *@paramnamescheduled的名称 */ publicvoidcancelScheduled(Stringname){ ScheduledFuturescheduledFuture=superScheduledConfig.getScheduledFuture(name); scheduledFuture.cancel(true); superScheduledConfig.removeScheduledFuture(name); logger.info(df.format(LocalDateTime.now())+"任务"+name+"已经终止..."); } /** *启动Scheduled * *@paramnamescheduled的名称 *@paramscheduledSource定时任务的源信息 */ publicvoidaddScheduled(Stringname,ScheduledSourcescheduledSource){ if(getRunScheduledName().contains(name)){ thrownewSuperScheduledException("定时任务"+name+"已经被启动过了"); } if(!scheduledSource.check()){ thrownewSuperScheduledException("定时任务"+name+"源数据内容错误"); } scheduledSource.refreshType(); Runnablerunnable=superScheduledConfig.getRunnable(name); ThreadPoolTaskSchedulertaskScheduler=superScheduledConfig.getTaskScheduler(); ScheduledFuture>schedule=ScheduledFutureFactory.create(taskScheduler,scheduledSource,runnable); logger.info(df.format(LocalDateTime.now())+"任务"+name+"已经启动..."); superScheduledConfig.addScheduledSource(name,scheduledSource); superScheduledConfig.addScheduledFuture(name,schedule); } /** *以cron类型启动Scheduled * *@paramnamescheduled的名称 *@paramcroncron表达式 */ publicvoidaddCronScheduled(Stringname,Stringcron){ ScheduledSourcescheduledSource=newScheduledSource(); scheduledSource.setCron(cron); addScheduled(name,scheduledSource); } /** *以fixedDelay类型启动Scheduled * *@paramnamescheduled的名称 *@paramfixedDelay上一次执行完毕时间点之后多长时间再执行 *@paraminitialDelay第一次执行的延迟时间 */ publicvoidaddFixedDelayScheduled(Stringname,LongfixedDelay,Long...initialDelay){ ScheduledSourcescheduledSource=newScheduledSource(); scheduledSource.setFixedDelay(fixedDelay); if(initialDelay!=null&&initialDelay.length==1){ scheduledSource.setInitialDelay(initialDelay[0]); }elseif(initialDelay!=null&&initialDelay.length>1){ thrownewSuperScheduledException("第一次执行的延迟时间只能传入一个参数"); } addScheduled(name,scheduledSource); } /** *以fixedRate类型启动Scheduled * *@paramnamescheduled的名称 *@paramfixedRate上一次开始执行之后多长时间再执行 *@paraminitialDelay第一次执行的延迟时间 */ publicvoidaddFixedRateScheduled(Stringname,LongfixedRate,Long...initialDelay){ ScheduledSourcescheduledSource=newScheduledSource(); scheduledSource.setFixedRate(fixedRate); if(initialDelay!=null&&initialDelay.length==1){ scheduledSource.setInitialDelay(initialDelay[0]); }elseif(initialDelay!=null&&initialDelay.length>1){ thrownewSuperScheduledException("第一次执行的延迟时间只能传入一个参数"); } addScheduled(name,scheduledSource); } /** *手动执行一次任务 * *@paramnamescheduled的名称 */ publicvoidrunScheduled(Stringname){ Runnablerunnable=superScheduledConfig.getRunnable(name); runnable.run(); } }
2、增强接口实现
增强器实现的整体思路与SpringAop的思路一致,实现没有Aop复杂
(1) 增强接口
@Order(Ordered.HIGHEST_PRECEDENCE) publicinterfaceBaseStrengthen{ /** *前置强化方法 * *@parambeanbean实例(或者是被代理的bean) *@parammethod执行的方法对象 *@paramargs方法参数 */ voidbefore(Objectbean,Methodmethod,Object[]args); /** *后置强化方法 *出现异常不会执行 *如果未出现异常,在afterFinally方法之后执行 * *@parambeanbean实例(或者是被代理的bean) *@parammethod执行的方法对象 *@paramargs方法参数 */ voidafter(Objectbean,Methodmethod,Object[]args); /** *异常强化方法 * *@parambeanbean实例(或者是被代理的bean) *@parammethod执行的方法对象 *@paramargs方法参数 */ voidexception(Objectbean,Methodmethod,Object[]args); /** *Finally强化方法,出现异常也会执行 * *@parambeanbean实例(或者是被代理的bean) *@parammethod执行的方法对象 *@paramargs方法参数 */ voidafterFinally(Objectbean,Methodmethod,Object[]args); }
(2) 代理抽象类
publicabstractclassPoint{ /** *定时任务名 */ privateStringsuperScheduledName; /** *抽象的执行方法,使用代理实现 *@paramrunnable定时任务执行器 */ publicabstractObjectinvoke(SuperScheduledRunnablerunnable); /*普通的get/sets省略*/ }
(3) 调用链类
publicclassChain{ privateListlist; privateintindex=-1; /** *索引自增1 */ publicintincIndex(){ return++index; } /** *索引还原 */ publicvoidresetIndex(){ this.index=-1; } }
(4) cglib动态代理实现
使用cglib代理增强器,将增强器全部代理成调用链节点Point
publicclassRunnableBaseInterceptorimplementsMethodInterceptor{ /** *定时任务执行器 */ privateSuperScheduledRunnablerunnable; /** *定时任务增强类 */ privateBaseStrengthenstrengthen; @Override publicObjectintercept(Objectobj,Methodmethod,Object[]args,MethodProxymethodProxy)throwsThrowable{ Objectresult; //如果执行的是invoke()方法 if("invoke".equals(method.getName())){ //前置强化方法 strengthen.before(obj,method,args); try{ //调用执行器中的invoke()方法 result=runnable.invoke(); }catch(Exceptione){ //异常强化方法 strengthen.exception(obj,method,args); thrownewSuperScheduledException(strengthen.getClass()+"中强化执行时发生错误",e); }finally{ //Finally强化方法,出现异常也会执行 strengthen.afterFinally(obj,method,args); } //后置强化方法 strengthen.after(obj,method,args); }else{ //直接执行方法 result=methodProxy.invokeSuper(obj,args); } returnresult; } publicRunnableBaseInterceptor(Objectobject,SuperScheduledRunnablerunnable){ this.runnable=runnable; if(BaseStrengthen.class.isAssignableFrom(object.getClass())){ this.strengthen=(BaseStrengthen)object; }else{ thrownewSuperScheduledException(object.getClass()+"对象不是BaseStrengthen类型"); } } publicRunnableBaseInterceptor(){ } }
(5) 定时任务执行器实现
publicclassSuperScheduledRunnable{ /** *原始的方法 */ privateMethodmethod; /** *方法所在的bean */ privateObjectbean; /** *增强器的调用链 */ privateChainchain; publicObjectinvoke(){ Objectresult; //索引自增1 if(chain.incIndex()==chain.getList().size()){ //调用链中的增强方法已经全部执行结束 try{ //调用链索引初始化 chain.resetIndex(); //增强器全部执行完毕,执行原本的方法 result=method.invoke(bean); }catch(IllegalAccessException|InvocationTargetExceptione){ thrownewSuperScheduledException(e.getLocalizedMessage()); } }else{ //获取被代理后的方法增强器 Pointpoint=chain.getList().get(chain.getIndex()); //执行增强器代理 //增强器代理中,会回调方法执行器,形成调用链,逐一运行调用链中的增强器 result=point.invoke(this); } returnresult; } /*普通的get/sets省略*/ }
(6) 增强器代理逻辑
com.gyx.superscheduled.core.SuperScheduledApplicationRunner类中的代码片段
//创建执行控制器 SuperScheduledRunnablerunnable=newSuperScheduledRunnable(); runnable.setMethod(scheduledSource.getMethod()); runnable.setBean(scheduledSource.getBean()); //用来存放增强器的代理对象 Listpoints=newArrayList<>(baseStrengthenBeanNames.length); //循环所有的增强器的beanName for(StringbaseStrengthenBeanName:baseStrengthenBeanNames){ //获取增强器的bean对象 ObjectbaseStrengthenBean=applicationContext.getBean(baseStrengthenBeanName); //将增强器代理成Point节点 Pointproxy=ProxyUtils.getInstance(Point.class,newRunnableBaseInterceptor(baseStrengthenBean,runnable)); proxy.setSuperScheduledName(name); //增强器的代理对象缓存到list中 points.add(proxy); } //将增强器代理实例的集合生成调用链 //执行控制器中设置调用链 runnable.setChain(newChain(points));
审核编辑:刘清
-
处理器
+关注
关注
68文章
18240浏览量
222015 -
控制器
+关注
关注
112文章
15191浏览量
171085 -
增强器
+关注
关注
1文章
43浏览量
8149 -
SpringBoot
+关注
关注
0文章
172浏览量
105
原文标题:SpringBoot 定时任务动态管理通用解决方案
文章出处:【微信号:AndroidPush,微信公众号:Android编程精选】欢迎添加关注!文章转载请注明出处。
发布评论请先 登录
相关推荐
评论