李锋镝的博客

  • 首页
  • 时间轴
  • 留言
  • 插件
  • 左邻右舍
  • 关于我
    • 关于我
    • 另一个网站
  • 知识库
  • 赞助
Destiny
自是人生长恨水长东
  1. 首页
  2. 原创
  3. 正文

使用RocketMQ时,服务启动过程中,Consumer在服务未启动时消费消息问题处理

2022年6月23日 7387点热度 0人点赞 0条评论

背景

我们使用RocketMQ时,一般Consumer启动都是使用的@PostConstruct注解。(@PostConstruct:用于在执行任何初始化时执行依赖注入后需要执行的方法。),或者使用bean的方式配置。

配置如下:

生产者配置

在配置类中配置所有生产者,在业务中注入使用,将生产者的启动和销毁绑定到 Bean 的初始化和销毁上:

@Configuration
public class MQProducerConfig {

    // 第一个生产者
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQProducer demo1MQProducer() {
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
        defaultMQProducer.setNamesrvAddr("<nameServer>");
        defaultMQProducer.setProducerGroup("<group>");
        defaultMQProducer.setInstanceName("<instanceName>");
        // 其他生产者配置
        return defaultMQProducer;
    }

    // 第二个生产者
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQProducer demo2MQProducer() {
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
        defaultMQProducer.setNamesrvAddr("<nameServer>");
        defaultMQProducer.setProducerGroup("<group>");
        defaultMQProducer.setInstanceName("<instanceName>");
        // 其他生产者配置
        return defaultMQProducer;
    }

    // ......
}

消费者配置

在配置类中配置所有消费者,将消费者的启动和销毁绑定到 Bean 的初始化和销毁上:

@Configuration
public class MQConsumerConfig {

    // 第一个消费者
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQPushConsumer demo1Consumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>");
        consumer.setNamesrvAddr("<nameServer>");
        consumer.setInstanceName("<instanceName>");
        consumer.subscribe("<topic>", "<tag>");
        consumer.setMessageListener(<listener>);
        // 其他消费者配置
        return consumer;
    }

    // 第二个消费者
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQPushConsumer demo2Consumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>");
        consumer.setNamesrvAddr("<nameServer>");
        consumer.setInstanceName("<instanceName>");
        consumer.subscribe("<topic>", "<tag>");
        consumer.setMessageListener(<listener>);
        // 其他消费者配置
        return consumer;
    }

    // ......
}

优化

上述配置在项目启动 Bean 加载的时候就会启动生产者和消费者,导致项目启动慢,并且会在项目还未启动完,就会有大量消息涌入,所以可以使用 ApplicationRunner 或 CommandLineRunner 接口在项目启动成功后再执行 MQ 的启动。同时,去掉Consumer的@PostConstruct注解。

注解如下:

// 去掉initMethod配置
@Bean(destroyMethod = "shutdown")

将上述配置中的消费者启动不再绑定到 Bean 初始化阶段。

新的消费者配置如下:

@Configuration
public class MQConsumerConfig {

    // 第一个消费者
    @Bean(destroyMethod = "shutdown")
    public DefaultMQPushConsumer demo1Consumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>");
        consumer.setNamesrvAddr("<nameServer>");
        consumer.setInstanceName("<instanceName>");
        consumer.subscribe("<topic>", "<tag>");
        consumer.setMessageListener(<listener>);
        // 其他消费者配置
        return consumer;
    }

    // 第二个消费者
    @Bean(destroyMethod = "shutdown")
    public DefaultMQPushConsumer demo2Consumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>");
        consumer.setNamesrvAddr("<nameServer>");
        consumer.setInstanceName("<instanceName>");
        consumer.subscribe("<topic>", "<tag>");
        consumer.setMessageListener(<listener>);
        // 其他消费者配置
        return consumer;
    }

    // ......
}

在项目启动完成节点统一启动消费者:

@Slf4j
@Component
public class MqConsumerApplicationRunner implements ApplicationRunner {

    @Autowired
    private Map<String, DefaultMQPushConsumer> defaultMQPushConsumerMap;

    @Override
    public void run(ApplicationArguments args) {
        if (CollectionUtils.isEmpty(defaultMQPushConsumerMap)) {
            return;
        }
        defaultMQPushConsumerMap.forEach((bean, consumer) -> {
            try {
                consumer.start();
            } catch (MQClientException e) {
                log.error("Consumer bean:[{}] start error.", bean, e);
            }
        }
    }
}
除非注明,否则均为李锋镝的博客原创文章,转载必须以链接形式标明本文链接

本文链接:https://www.lifengdi.com/archives/article/3888

相关文章

  • SpringBoot常用注解
  • CompletableFuture使用详解
  • SpringBoot 中内置的 49 个常用工具类
  • SpringBoot 实现接口防刷的 5 种实现方案
  • SpringBoot 实现 RSA+AES 自动接口解密
本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可
标签: JAVA MQ RocketMQ SpringBoot
最后更新:2022年6月23日

李锋镝

既然选择了远方,便只顾风雨兼程。

打赏 点赞
< 上一篇
下一篇 >

文章评论

1 2 3 4 5 6 7 8 9 11 12 13 14 15 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 46 47 48 49 50 51 52 53 54 55 57 58 60 61 62 63 64 65 66 67 69 72 74 76 77 78 79 80 81 82 85 86 87 90 92 93 94 95 96 97 98 99
取消回复

他乡共酌金花酒,万里同悲鸿雁天。

最新 热点 随机
最新 热点 随机
SpringBoot框架自动配置之spring.factories和AutoConfiguration.imports 应用型负载均衡(ALB)和网络型负载均衡(NLB)区别 什么是Helm? TransmittableThreadLocal介绍与使用 ReentrantLock深度解析 RedisTemplate和Redisson的区别
玩博客的人是不是越来越少了?准备入手个亚太的ECS,友友们有什么建议吗?什么是Helm?2024年11月1号 农历十月初一别再背线程池的七大参数了,现在面试官都这么问URL地址末尾加不加“/”有什么区别
UUID太长怎么办?快来试试NanoId 网易云什么时候能有杰伦的歌…… 妹妹的画【2019.09.26】 MySQL 的自增 ID 用完了,怎么办? 分布式ID生成算法SnowFlake(雪花算法)Java源码 常用正则表达式
标签聚合
设计模式 docker 数据库 多线程 架构 文学 K8s SpringBoot JVM 面试 教程 Spring 分布式 SQL MySQL 日常 Redis IDEA JAVA ElasticSearch
友情链接
  • i架构
  • 临窗旋墨
  • 博友圈
  • 博客录
  • 博客星球
  • 哥斯拉
  • 志文工作室
  • 搬砖日记
  • 旋律的博客
  • 旧时繁华
  • 林羽凡
  • 知向前端
  • 蜗牛工作室
  • 集博栈
  • 韩小韩博客
  • 風の声音

COPYRIGHT © 2025 lifengdi.com. ALL RIGHTS RESERVED.

Theme Kratos Made By Dylan

津ICP备2024022503号-3