0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

什么是动态线程池?动态线程池的简单实现思路

jf_ro2CN3Fa 来源:稀土掘金 2024-02-28 10:42 次阅读

什么是动态线程池?

在线程池日常实践中我们常常会遇到以下问题:

代码中创建了一个线程池却不知道核心参数设置多少比较合适。

参数设置好后,上线发现需要调整,改代码重启服务非常麻烦。

线程池相对于开发人员来说是个黑箱,运行情况在出现问题 前很难被感知。

因此,动态可监控线程池一种针对以上痛点开发的线程池管理工具。主要可实现功能有:提供对 Spring 应用内线程池实例的全局管控、应用运行时动态变更线程池参数以及线程池数据采集和监控阈值报警。

已经实现的优秀开源动态线程池

hippo4j、dynamic-tp.....

实现思路

核心管理类

需要能实现对线程池的

服务注册

获取已经注册好的线程池

以及对注册号线程池参数的刷新。

对于每一个线程池,我们使用一个线程池名字作为标识每个线程池的唯一ID。

伪代码实现

publicclassDtpRegistry{
/**
*储存线程池
*/
privatestaticfinalMapEXECUTOR_MAP=newConcurrentHashMap<>();

/**
*获取线程池
*@paramexecutorName线程池名字
*/
publicstaticExecutorgetExecutor(StringexecutorName){
returnEXECUTOR_MAP.get(executorName);
}

/**
*线程池注册
*@paramexecutorName线程池名字
*/
publicstaticvoidregistry(StringexecutorName,Executorexecutor){
//注册
EXECUTOR_MAP.put(executorName,executorWrapper);
}


/**
*刷新线程池参数
*@paramexecutorName线程池名字
*@paramproperties线程池参数
*/
publicstaticvoidrefresh(StringexecutorName,ThreadPoolPropertiesproperties){
Executorexecutor=EXECUTOR_MAP.get(executorName)
//刷新参数
//.......
}

}

如何创建线程池?

STEP 1. 我们可以使用yml配置文件的方式配置一个线程池,将线程池实例的创建交由Spring容器。

相关配置

publicclassDtpProperties{

privateListexecutors;

}

publicclassThreadPoolProperties{
/**
*标识每个线程池的唯一名字
*/
privateStringpoolName;
privateStringpoolType="common";

/**
*是否为守护线程
*/
privatebooleanisDaemon=false;

/**
*以下都是核心参数
*/
privateintcorePoolSize=1;
privateintmaximumPoolSize=1;
privatelongkeepAliveTime;
privateTimeUnittimeUnit=TimeUnit.SECONDS;
privateStringqueueType="arrayBlockingQueue";
privateintqueueSize=5;
privateStringthreadFactoryPrefix="-td-";
privateStringRejectedExecutionHandler;
}

yml example:

spring:
dtp:
executors:
#线程池1
-poolName:dtpExecutor1
corePoolSize:5
maximumPoolSize:10
#线程池2
-poolName:dtpExecutor2
corePoolSize:2
maximumPoolSize:15

STEP 2 根据配置信息添加线程池的BeanDefinition

关键类

@Slf4j
publicclassDtpImportBeanDefinitionRegistrarimplementsImportBeanDefinitionRegistrar,EnvironmentAware{
privateEnvironmentenvironment;

@Override
publicvoidregisterBeanDefinitions(AnnotationMetadataimportingClassMetadata,BeanDefinitionRegistryregistry){
log.info("注册");
//绑定资源
DtpPropertiesdtpProperties=newDtpProperties();
ResourceBundlerUtil.bind(environment,dtpProperties);
Listexecutors=dtpProperties.getExecutors();
if(Objects.isNull(executors)){
log.info("未检测本地到配置文件线程池");
return;
}
//注册beanDefinition
executors.forEach((executorProp)->{
BeanUtil.registerIfAbsent(registry,executorProp);
});
}


@Override
publicvoidsetEnvironment(Environmentenvironment){
this.environment=environment;
}
}


/**
*
*工具类
*
*/
publicclassBeanUtil{
publicstaticvoidregisterIfAbsent(BeanDefinitionRegistryregistry,ThreadPoolPropertiesexecutorProp){
register(registry,executorProp.getPoolName(),executorProp);
}

publicstaticvoidregister(BeanDefinitionRegistryregistry,StringbeanName,ThreadPoolPropertiesexecutorProp){
ClassexecutorType=ExecutorType.getClazz(executorProp.getPoolType());
Object[]args=assembleArgs(executorProp);
register(registry,beanName,executorType,args);
}

publicstaticvoidregister(BeanDefinitionRegistryregistry,StringbeanName,Classclazz,Object[]args){
BeanDefinitionBuilderbuilder=BeanDefinitionBuilder.genericBeanDefinition(clazz);
for(Objectarg:args){
builder.addConstructorArgValue(arg);
}
registry.registerBeanDefinition(beanName,builder.getBeanDefinition());
}

privatestaticObject[]assembleArgs(ThreadPoolPropertiesexecutorProp){
returnnewObject[]{
executorProp.getCorePoolSize(),
executorProp.getMaximumPoolSize(),
executorProp.getKeepAliveTime(),
executorProp.getTimeUnit(),
QueueType.getInstance(executorProp.getQueueType(),executorProp.getQueueSize()),
newNamedThreadFactory(
executorProp.getPoolName()+executorProp.getThreadFactoryPrefix(),
executorProp.isDaemon()
),
//先默认不做设置
RejectPolicy.ABORT.getValue()
};
}
}

下面解释一下这个类的作用,environment实例中储存着spring启动时解析的yml配置,所以我们spring提供的Binder将配置绑定到我们前面定义的DtpProperties类中,方便后续使用。接下来的比较简单,就是将线程池的BeanDefinition注册到IOC容器中,让spring去帮我们实例化这个bean。

STEP 3. 将已经实例化的线程池注册到核心类 DtpRegistry 中

我们注册了 beanDefinition 后,spring会帮我们实例化出来, 在这之后我们可以根据需要将这个bean进行进一步的处理,spring也提供了很多机制让我们对bean的生命周期管理进行更多的扩展。对应到这里我们就是将实例化出来的线程池注册到核心类 DtpRegistry 中进行管理。

这里我们使用 BeanPostProcessor 进行处理。

@Slf4j
publicclassDtpBeanPostProcessorimplementsBeanPostProcessor{
privateDefaultListableBeanFactorybeanFactory;

@Override
publicObjectpostProcessAfterInitialization(Objectbean,StringbeanName)throwsBeansException{
if(beaninstanceofDtpExecutor){
//直接纳入管理
DtpRegistry.registry(beanName,(DtpExecutor)bean);
}
returnbean;
}
}

这里的逻辑很简单, 就是判断一下这个bean是不是线程池,是就统一管理起来。

STEP 4. 启用 BeanDefinitionRegistrar 和 BeanPostProcessor

在springboot程序中,只要加一个@MapperScan注解就能启用mybatis的功能,我们可以学习其在spring中的启用方式,自定义一个注解:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(DtpImportSelector.class)
public@interfaceEnableDynamicThreadPool{
}

其中,比较关键的是@Import注解,spring会导入注解中的类DtpImportSelector

而DtpImportSelector这个类实现了:

publicclassDtpImportSelectorimplementsDeferredImportSelector{
@Override
publicString[]selectImports(AnnotationMetadataimportingClassMetadata){
returnnewString[]{
DtpImportBeanDefinitionRegistrar.class.getName(),
DtpBeanPostProcessor.class.getName()
};
}
}

这样,只要我们再启动类或者配置类上加上@EnableDynamicThreadPool这个注解,spring就会将DtpImportBeanDefinitionRegistrar和DtpBeanPostProcessor这两个类加入spring容器管理,从而实现我们的线程池的注册。

@SpringBootApplication
@EnableDynamicThreadPool
publicclassApplication{
publicstaticvoidmain(String[]args){
SpringApplication.run(Application.class,args);
}
}

如何实现线程池配置的动态刷新

首先明确一点,对于线程池的实现类,例如:ThreadPoolExecutor等,都有提供核心参数对应的 set 方法,让我们实现参数修改。因此,在核心类DtpRegistry中的refresh方法,我们可以这样写:

publicclassDtpRegistry{
/**
*储存线程池
*/
privatestaticfinalMapEXECUTOR_MAP=newConcurrentHashMap<>();
/**
*刷新线程池参数
*@paramexecutorName线程池名字
*@paramproperties线程池参数
*/
publicstaticvoidrefresh(StringexecutorName,ThreadPoolPropertiesproperties){
ThreadPoolExecutorexecutor=EXECUTOR_MAP.get(executorName)

//设置参数
executor.setCorePoolSize(...);
executor.setMaximumPoolSize(...);
......
}

}

而这些新参数怎么来呢?我们可以引入Nacos、Apollo等配置中心,实现他们的监听器方法,在监听器方法里调用DtpRegistry的refresh方法刷新即可。





审核编辑:刘清

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 线程池
    +关注

    关注

    0

    文章

    53

    浏览量

    6768

原文标题:动态线程池的简单实现思路

文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    面试现场被要求写一个线程,150行代码搞定 - 第3节 #硬声创作季

    程序代码线程
    充八万
    发布于 :2023年08月17日 16:27:54

    面试现场被要求写一个线程,150行代码搞定 - 第4节 #硬声创作季

    程序代码线程
    充八万
    发布于 :2023年08月17日 16:28:44

    面试现场被要求写一个线程,150行代码搞定 - 第7节 #硬声创作季

    程序代码线程
    充八万
    发布于 :2023年08月17日 16:31:15

    面试现场被要求写一个线程,150行代码搞定 - 第10节 #硬声创作季

    程序代码线程
    充八万
    发布于 :2023年08月17日 16:33:55

    面试现场被要求写一个线程,150行代码搞定 - 第13节 #硬声创作季

    程序代码线程
    充八万
    发布于 :2023年08月17日 16:36:25

    面试现场被要求写一个线程,150行代码搞定 - 第16节 #硬声创作季

    程序代码线程
    充八万
    发布于 :2023年08月17日 16:38:56

    面试现场被要求写一个线程,150行代码搞定 - 第21节 #硬声创作季

    程序代码线程
    充八万
    发布于 :2023年08月17日 16:43:07

    是什么 短信猫使用方法

    ◇ 整理信息:可通过设定整理规则,自动整理接收到的信息 ◇ 收发记录:可查看待发、已发、接收、失败的信息 ◇ 查看日志:可动态记录系统工作情况 ◇ 自动回复:设定各种短信、彩信的自动回复规则,实现短信
    发表于 04-23 15:38

    初学RT-thread线程动态创建

    RT-thread初学线程动态创建线程静态创建线程钩子函数定时器获取系统时间动态创建定时器静态创建定时器信号量静态创建与
    发表于 02-24 07:32

    线程是如何实现

    线程的概念是什么?线程是如何实现的?
    发表于 02-28 06:20

    线程创建的两种方法

    ,让过来的任务立刻能够使用,就形成了线程。在Python3中,创建线程是通过concurrent.futures函数库中的Thread
    发表于 03-16 16:15

    关于RT-Thread内存管理的内存简析

    :支持线程挂起。内存无空闲内存块时,申请线程会被挂起,直到有可用内存块。简单理解,就是将相同大小的内存块通过某种方式放在一起,就好比将各个内存块放在类似于水池的容器里,需要用的时候,
    发表于 04-06 17:02

    RT-Thread内存管理之内存实现分析

    能尽量避免内存碎片化。此外,RT-Thread 的内存支持线程挂起功能,当内存池中无空闲内存块时,申请线程会被挂起,直到内存池中有新的可用内存块,再将挂起的申请线程唤醒。内存堆管理相
    发表于 10-17 15:06

    动态图和线程关系的混合软件水印算法分析

    嵌入到线程关系矩阵的水印信息,最后对算法性能进行仿真测试。结果表明,本文算法充分利用了动态图水印和线程关系的优点,实现了优势互补,不仅提高了水印的数据率,而且增强了水印的抗攻击性。
    发表于 11-03 10:09 0次下载
    <b class='flag-5'>动态</b>图和<b class='flag-5'>线程</b>关系的混合软件水印算法分析

    基于Nacos的简单动态线程实现

    本文以Nacos作为服务配置中心,以修改线程池核心线程数、最大线程数为例,实现一个简单动态
    发表于 01-06 14:14 639次阅读