李锋镝的博客

  • 首页
  • 时间轴
  • 插件
  • 评论区显眼包🔥
  • 左邻右舍
  • 博友圈
  • 关于我
    • 关于我
    • 另一个网站
    • 我的导航站
    • 网站地图
  • 留言
  • 赞助
Destiny
自是人生长恨水长东
  1. 首页
  2. 原创
  3. 正文

使用RocketMQ时,服务启动过程中,Consumer在服务未启动时消费消息问题处理

2022年6月23日 81点热度 0人点赞 0条评论

背景

我们使用RocketMQ时,一般Consumer启动都是使用的@PostConstruct注解。(@PostConstruct:用于在执行任何初始化时执行依赖注入后需要执行的方法。),或者使用bean的方式配置。

配置如下:

生产者配置

在配置类中配置所有生产者,在业务中注入使用,将生产者的启动和销毁绑定到 Bean 的初始化和销毁上:

@Configuration
public class MQProducerConfig {

    // 第一个生产者
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQProducer demo1MQProducer() {
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
        defaultMQProducer.setNamesrvAddr("<nameServer>");
        defaultMQProducer.setProducerGroup("<group>");
        defaultMQProducer.setInstanceName("<instanceName>");
        // 其他生产者配置
        return defaultMQProducer;
    }

    // 第二个生产者
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQProducer demo2MQProducer() {
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
        defaultMQProducer.setNamesrvAddr("<nameServer>");
        defaultMQProducer.setProducerGroup("<group>");
        defaultMQProducer.setInstanceName("<instanceName>");
        // 其他生产者配置
        return defaultMQProducer;
    }

    // ......
}

消费者配置

在配置类中配置所有消费者,将消费者的启动和销毁绑定到 Bean 的初始化和销毁上:

@Configuration
public class MQConsumerConfig {

    // 第一个消费者
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQPushConsumer demo1Consumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>");
        consumer.setNamesrvAddr("<nameServer>");
        consumer.setInstanceName("<instanceName>");
        consumer.subscribe("<topic>", "<tag>");
        consumer.setMessageListener(<listener>);
        // 其他消费者配置
        return consumer;
    }

    // 第二个消费者
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQPushConsumer demo2Consumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>");
        consumer.setNamesrvAddr("<nameServer>");
        consumer.setInstanceName("<instanceName>");
        consumer.subscribe("<topic>", "<tag>");
        consumer.setMessageListener(<listener>);
        // 其他消费者配置
        return consumer;
    }

    // ......
}

优化

上述配置在项目启动 Bean 加载的时候就会启动生产者和消费者,导致项目启动慢,并且会在项目还未启动完,就会有大量消息涌入,所以可以使用 ApplicationRunner 或 CommandLineRunner 接口在项目启动成功后再执行 MQ 的启动。同时,去掉Consumer的@PostConstruct注解。

注解如下:

// 去掉initMethod配置
@Bean(destroyMethod = "shutdown")

将上述配置中的消费者启动不再绑定到 Bean 初始化阶段。

新的消费者配置如下:

@Configuration
public class MQConsumerConfig {

    // 第一个消费者
    @Bean(destroyMethod = "shutdown")
    public DefaultMQPushConsumer demo1Consumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>");
        consumer.setNamesrvAddr("<nameServer>");
        consumer.setInstanceName("<instanceName>");
        consumer.subscribe("<topic>", "<tag>");
        consumer.setMessageListener(<listener>);
        // 其他消费者配置
        return consumer;
    }

    // 第二个消费者
    @Bean(destroyMethod = "shutdown")
    public DefaultMQPushConsumer demo2Consumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>");
        consumer.setNamesrvAddr("<nameServer>");
        consumer.setInstanceName("<instanceName>");
        consumer.subscribe("<topic>", "<tag>");
        consumer.setMessageListener(<listener>);
        // 其他消费者配置
        return consumer;
    }

    // ......
}

在项目启动完成节点统一启动消费者:

@Slf4j
@Component
public class MqConsumerApplicationRunner implements ApplicationRunner {

    @Autowired
    private Map<String, DefaultMQPushConsumer> defaultMQPushConsumerMap;

    @Override
    public void run(ApplicationArguments args) {
        if (CollectionUtils.isEmpty(defaultMQPushConsumerMap)) {
            return;
        }
        defaultMQPushConsumerMap.forEach((bean, consumer) -> {
            try {
                consumer.start();
            } catch (MQClientException e) {
                log.error("Consumer bean:[{}] start error.", bean, e);
            }
        }
    }
}
除非注明,否则均为李锋镝的博客原创文章,转载必须以链接形式标明本文链接

本文链接:https://www.lifengdi.com/article/3888

相关文章

  • SpringBoot常用注解
  • CompletableFuture使用详解
  • SpringBoot 中内置的 49 个常用工具类
  • SpringBoot 实现接口防刷的 5 种实现方案
  • SpringBoot 实现 RSA+AES 自动接口解密
本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可
标签: JAVA MQ RocketMQ SpringBoot
最后更新:2022年6月23日

李锋镝

既然选择了远方,便只顾风雨兼程。

打赏 点赞
< 上一篇
下一篇 >

文章评论

1 2 3 4 5 6 7 8 9 11 12 13 14 15 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 46 47 48 49 50 51 52 53 54 55 57 58 60 61 62 63 64 65 66 67 69 72 74 76 77 78 79 80 81 82 85 86 87 90 92 93 94 95 96 97 98 99
取消回复

位卑未敢忘忧国,事定犹须待阖棺。

那年今日(12月17日)

  • 1981年:德国足球运动员蒂姆·维泽出生
  • 1971年:印度和东巴基斯坦达成停火协议
  • 1909年:比利时国王利奥波德二世逝世
  • 1905年:狙击之王西蒙·海耶出生
  • 1902年:京师大学堂正式开学
  • 更多历史事件
最新 热点 随机
最新 热点 随机
AI原生数据库新标杆:seekdb深度解析,轻量架构与混合搜索的双重革命 做了一个WordPress文章热力图插件 Spring WebFlux底层原理深度剖析-从响应式流到事件循环的全链路拆解 Spring WebFlux深度解析:异步非阻塞架构与实战落地指南 规范驱动AI编程:用OpenSpec实现100%可控开发,从需求到代码的全流程闭环 WordPress网站换了个字体,差点儿把样式换崩了
玩博客的人是不是越来越少了?准备入手个亚太的ECS,友友们有什么建议吗?使用WireGuard在Ubuntu 24.04系统搭建VPNWordPress实现用户评论等级排行榜插件Gemini 3 Pro 深度测评:多模态AI编程的跨代际突破,从一句话到完整应用的全链路革命WordPress网站换了个字体,差点儿把样式换崩了
使用itext和freemarker来根据Html模板生成PDF文件,加水印、印章 项目中不用 redis 分布式锁,怎么防止用户重复提交? SpringBoot框架自动配置之spring.factories和AutoConfiguration.imports JAVA线程池简析(JDK1.6) IDEA版本2020.*全局MAVEN配置 Gemini 3 深度解析:从像素级复刻到 AGI 雏形,多模态 AI 如何重构开发与创作?
标签聚合
JVM WordPress SQL 日常 K8s 架构 SpringBoot AI编程 MySQL ElasticSearch 多线程 分布式 数据库 AI JAVA docker 设计模式 Spring IDEA Redis
友情链接
  • Blogs·CN
  • Honesty
  • 临窗旋墨
  • 哥斯拉
  • 彬红茶日记
  • 志文工作室
  • 搬砖日记
  • 旧时繁华
  • 林羽凡
  • 瓦匠个人小站
  • 皮皮社
  • 知向前端
  • 蜗牛工作室
  • 韩小韩博客
  • 风渡言

COPYRIGHT © 2025 lifengdi.com. ALL RIGHTS RESERVED.

Theme Kratos Made By Dylan

津ICP备2024022503号-3