李锋镝的博客

  • 首页
  • 时间轴
  • 留言
  • 插件
  • 左邻右舍
  • 我的日常
  • 关于我
    • 关于我
    • 另一个网站
  • 知识库
  • 赞助
Destiny
自是人生长恨水长东
  1. 首页
  2. 原创
  3. 正文

动态线程池框架DynamicTp使用以及架构设计

2025年4月29日 95点热度 0人点赞 0条评论

DynamicTp 是什么

DynamicTp 是一个基于 Java 的动态线程池框架,特性如下:

  • 代码零侵入:我们改变了线程池以往的使用姿势,所有配置均放在配置中心,服务启动时会从配置中心拉取配置生成线程池对象放到 Spring 容器中,使用时直接从 Spring 容器中获取,对业务代码零侵入

  • 轻量简单:使用起来极其简单,引入相应依赖,接入只需简单 4 步就可完成,顺利 3 分钟搞定,相当丝滑

  • 通知告警:提供多种通知告警维度(配置变更通知、活性报警、队列容量阈值报警、拒绝触发报警、任务执行或等待超时报警),触发配置阈值实时推送告警信息,已支持企微、钉钉、飞书、邮件、云之家报警,同时提供 SPI 接口可自定义扩展实现

  • 运行监控:定时采集线程池指标数据(20 多种指标,包含线程池维度、队列维度、任务维度、tps、tpxx 等),支持通过 MicroMeter、JsonLog、JMX 三种方式定时获取,也可以通过 SpringBoot Endpoint 端点实时获取最新指标数据,同时提供 SPI 接口可自定义扩展实现

  • 任务增强:提供任务包装功能(比 Spring 线程池任务包装更强大),实现 TaskWrapper 接口即可,如 MdcTaskWrapper、TtlTaskWrapper、SwTraceTaskWrapper、OpenTelemetryWrapper,可以支持线程池上下文信息传递

  • 多配置中心支持:支持多种主流配置中心,包括 Nacos、Apollo、Zookeeper、Consul、Etcd、Polaris、ServiceComb,同时也提供 SPI 接口可自定义扩展实现

  • 中间件线程池管理:集成管理常用第三方组件的线程池,已集成 Tomcat、Jetty、Undertow、Dubbo、RocketMq、Hystrix、Grpc、Motan、Okhttp3、Brpc、Tars、SofaRpc、RabbitMq、Liteflow 等组件的线程池管理(动态调参、监控、报警)

  • 多模式:提供了增强线程池 DtpExecutor,IO 密集型场景使用的线程池 EagerDtpExecutor,调度线程池 ScheduledDtpExecutor,有序线程池 OrderedDtpExecutor,可以根据业务场景选择合适的线程池

  • 兼容性:JUC 普通线程池和 Spring 中的 ThreadPoolTaskExecutor 也可以被框架管理,只需@Bean 定义时加 @DynamicTp 注解即可

  • 可靠性:依靠 Spring 生命周期管理,可以做到优雅关闭线程池,在 Spring 容器关闭前尽可能多的处理队列中的任务

  • 高可扩展:框架核心功能都提供 SPI 接口供用户自定义个性化实现(配置中心、配置文件解析、通知告警、监控数据采集、任务包装、拒绝策略等等)

  • 线上大规模应用:参考美团线程池实践,美团内部已经有该理论成熟的应用经验

示例代码

以下是一个使用 DynamicTp 实现动态调整线程池线程数量的完整实例代码,本示例基于 Spring Boot 框架,使用 Nacos 作为配置中心。

1. 添加依赖

在 pom.xml 中添加 DynamicTp 和 Nacos 相关依赖:

<dependencies>
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- DynamicTp -->
    <dependency>
        <groupId>cn.dynamic-tp</groupId>
        <artifactId>dynamic-tp-spring-boot-starter</artifactId>
        <version>1.1.4</version>
    </dependency>
    <!-- Nacos Config -->
    <dependency>
        <groupId>com.alibaba.nacos</groupId>
        <artifactId>nacos-client</artifactId>
        <version>2.2.0</version>
    </dependency>
</dependencies>

2. 配置 Nacos

在 application.yml 中配置 Nacos 信息:

spring:
  application:
    name: dynamic-tp-demo
  cloud:
    nacos:
      config:
        server-addr: 127.0.0.1:8848
        file-extension: yaml

3. 配置线程池

在 Nacos 配置中心创建一个 dynamic-tp-demo.yaml 文件,添加线程池配置:

spring:
  dynamic:
    tp:
      enabled: true
      executors:
        - threadPoolName: myDynamicTp
          corePoolSize: 5
          maxPoolSize: 10
          queueCapacity: 200
          keepAliveTime: 60
          timeUnit: SECONDS

4. 创建线程池 Bean

在 Spring Boot 项目中创建线程池 Bean:

import com.dtp.core.annotation.DynamicTp;
import com.dtp.core.support.ThreadPoolBuilder;
import com.dtp.core.thread.DtpExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ExecutorService;

@Configuration
public class ThreadPoolConfig {

    @Bean
    @DynamicTp
    public ExecutorService myDynamicTp() {
        return ThreadPoolBuilder.newBuilder()
               .threadPoolName("myDynamicTp")
               .buildDynamic();
    }
}

5. 创建测试服务

创建一个服务类,使用线程池执行任务:

import org.springframework.stereotype.Service;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

@Service
public class TaskService {

    private final ExecutorService executorService;

    public TaskService(ExecutorService myDynamicTp) {
        this.executorService = myDynamicTp;
    }

    public void submitTask() {
        executorService.submit(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println("Task executed by thread: " + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }
}

6. 创建控制器

创建一个控制器来触发任务提交:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TaskController {

    @Autowired
    private TaskService taskService;

    @GetMapping("/submitTask")
    public String submitTask() {
        taskService.submitTask();
        return "Task submitted.";
    }
}

7. 运行项目并测试

启动 Spring Boot 项目,访问 http://localhost:8080/submitTask 来提交任务。

8. 动态调整线程池参数

在 Nacos 配置中心修改 dynamic-tp-demo.yaml 文件中的线程池参数,例如将 corePoolSize 改为 8,maxPoolSize 改为 15,保存配置后,DynamicTp 会自动监听配置变更并动态调整线程池的线程数量。

架构设计

框架功能大体可以分为以下几个模块

  1. 配置变更监听模块

  2. 线程池管理模块

  3. 监控模块

  4. 通知告警模块

  5. 三方组件线程池管理模块

更新线程池逻辑

具体逻辑在 DtpRegistry 类中的 doRefreshPoolSize 方法,其功能是根据新的线程池属性配置(DtpExecutorProps)来更新线程池的核心线程数(corePoolSize)和最大线程数(maximumPoolSize)。

代码如下:

    private static void doRefreshPoolSize(ExecutorAdapter<?> executor, DtpExecutorProps props) {
        if (props.getMaximumPoolSize() < executor.getMaximumPoolSize()) {
            if (!Objects.equals(executor.getCorePoolSize(), props.getCorePoolSize())) {
                executor.setCorePoolSize(props.getCorePoolSize());
            }
            if (!Objects.equals(executor.getMaximumPoolSize(), props.getMaximumPoolSize())) {
                executor.setMaximumPoolSize(props.getMaximumPoolSize());
            }
            return;
        }
        if (!Objects.equals(executor.getMaximumPoolSize(), props.getMaximumPoolSize())) {
            executor.setMaximumPoolSize(props.getMaximumPoolSize());
        }
        if (!Objects.equals(executor.getCorePoolSize(), props.getCorePoolSize())) {
            executor.setCorePoolSize(props.getCorePoolSize());
        }
    }
  1. 方法参数:

    • ExecutorAdapter<?> executor:表示要更新配置的线程池执行器适配器,通过这个适配器可以调用线程池的各种设置方法,如设置核心线程数和最大线程数。
    • DtpExecutorProps props:包含了新的线程池属性配置,其中包括新的核心线程数和最大线程数等信息。
  2. 主要逻辑:

    • 首先判断新的最大线程数(props.getMaximumPoolSize())是否小于当前线程池的最大线程数(executor.getMaximumPoolSize()):
      • 如果新的最大线程数小于当前最大线程数,说明需要缩小线程池规模。此时先检查核心线程数是否需要更新,即比较当前核心线程数(executor.getCorePoolSize())和新的核心线程数(props.getCorePoolSize())是否相等。如果不相等,则调用 executor.setCorePoolSize(props.getCorePoolSize()) 方法将核心线程数更新为新的值。
      • 接着检查最大线程数是否需要更新,比较当前最大线程数和新的最大线程数是否相等。如果不相等,则调用 executor.setMaximumPoolSize(props.getMaximumPoolSize()) 方法将最大线程数更新为新的值。
      • 完成上述操作后,通过 return 语句结束方法,不再执行后续逻辑。
    • 如果新的最大线程数不小于当前最大线程数,说明不需要缩小线程池规模,或者是要扩大线程池规模。此时先检查最大线程数是否需要更新,即比较当前最大线程数和新的最大线程数是否相等。如果不相等,则调用 executor.setMaximumPoolSize(props.getMaximumPoolSize()) 方法将最大线程数更新为新的值。
    • 最后检查核心线程数是否需要更新,比较当前核心线程数和新的核心线程数是否相等。如果不相等,则调用 executor.setCorePoolSize(props.getCorePoolSize()) 方法将核心线程数更新为新的值。
除非注明,否则均为李锋镝的博客原创文章,转载必须以链接形式标明本文链接

本文链接:https://www.lifengdi.com/archives/article/4383

相关文章

  • 别再背线程池的七大参数了,现在面试官都这么问
  • 以面试官视角万字解读线程池10大经典面试题
  • 动态线程池 DynamicTp 的使用方法
  • 为什么生产环境不建议使用Executors快捷创建线程池?
  • JAVA之从线程安全说到锁
本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可
标签: DynamicTp JAVA 动态线程池 多线程 线程池
最后更新:2025年4月29日

李锋镝

既然选择了远方,便只顾风雨兼程。

打赏 点赞
< 上一篇
下一篇 >

文章评论

1 2 3 4 5 6 7 8 9 11 12 13 14 15 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 46 47 48 49 50 51 52 53 54 55 57 58 60 61 62 63 64 65 66 67 69 72 74 76 77 78 79 80 81 82 85 86 87 90 92 93 94 95 96 97 98 99
取消回复

COPYRIGHT © 2025 lifengdi.com. ALL RIGHTS RESERVED.

Theme Kratos Made By Dylan

津ICP备2024022503号-3