李锋镝的博客 - LiFengdi.Com

  • 首页
  • 时间轴
  • 留言
  • 左邻右舍
  • 我的日常
  • 关于我
青衿之志 履践致远
霁月光风 不萦于怀
  1. 首页
  2. 原创
  3. 正文

使用RocketMQ时,服务启动过程中,Consumer在服务未启动时消费消息问题处理

2022年6月23日 298点热度 0人点赞 0条评论

背景

我们使用RocketMQ时,一般Consumer启动都是使用的@PostConstruct注解。(@PostConstruct:用于在执行任何初始化时执行依赖注入后需要执行的方法。),或者使用bean的方式配置。

配置如下:

生产者配置

在配置类中配置所有生产者,在业务中注入使用,将生产者的启动和销毁绑定到 Bean 的初始化和销毁上:

@Configuration
public class MQProducerConfig {

    // 第一个生产者
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQProducer demo1MQProducer() {
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
        defaultMQProducer.setNamesrvAddr("<nameServer>");
        defaultMQProducer.setProducerGroup("<group>");
        defaultMQProducer.setInstanceName("<instanceName>");
        // 其他生产者配置
        return defaultMQProducer;
    }

    // 第二个生产者
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQProducer demo2MQProducer() {
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
        defaultMQProducer.setNamesrvAddr("<nameServer>");
        defaultMQProducer.setProducerGroup("<group>");
        defaultMQProducer.setInstanceName("<instanceName>");
        // 其他生产者配置
        return defaultMQProducer;
    }

    // ......
}

消费者配置

在配置类中配置所有消费者,将消费者的启动和销毁绑定到 Bean 的初始化和销毁上:

@Configuration
public class MQConsumerConfig {

    // 第一个消费者
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQPushConsumer demo1Consumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>");
        consumer.setNamesrvAddr("<nameServer>");
        consumer.setInstanceName("<instanceName>");
        consumer.subscribe("<topic>", "<tag>");
        consumer.setMessageListener(<listener>);
        // 其他消费者配置
        return consumer;
    }

    // 第二个消费者
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQPushConsumer demo2Consumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>");
        consumer.setNamesrvAddr("<nameServer>");
        consumer.setInstanceName("<instanceName>");
        consumer.subscribe("<topic>", "<tag>");
        consumer.setMessageListener(<listener>);
        // 其他消费者配置
        return consumer;
    }

    // ......
}

优化

上述配置在项目启动 Bean 加载的时候就会启动生产者和消费者,导致项目启动慢,并且会在项目还未启动完,就会有大量消息涌入,所以可以使用 ApplicationRunner 或 CommandLineRunner 接口在项目启动成功后再执行 MQ 的启动。同时,去掉Consumer的@PostConstruct注解。

注解如下:

// 去掉initMethod配置
@Bean(destroyMethod = "shutdown")

将上述配置中的消费者启动不再绑定到 Bean 初始化阶段。

新的消费者配置如下:

@Configuration
public class MQConsumerConfig {

    // 第一个消费者
    @Bean(destroyMethod = "shutdown")
    public DefaultMQPushConsumer demo1Consumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>");
        consumer.setNamesrvAddr("<nameServer>");
        consumer.setInstanceName("<instanceName>");
        consumer.subscribe("<topic>", "<tag>");
        consumer.setMessageListener(<listener>);
        // 其他消费者配置
        return consumer;
    }

    // 第二个消费者
    @Bean(destroyMethod = "shutdown")
    public DefaultMQPushConsumer demo2Consumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>");
        consumer.setNamesrvAddr("<nameServer>");
        consumer.setInstanceName("<instanceName>");
        consumer.subscribe("<topic>", "<tag>");
        consumer.setMessageListener(<listener>);
        // 其他消费者配置
        return consumer;
    }

    // ......
}

在项目启动完成节点统一启动消费者:

@Slf4j
@Component
public class MqConsumerApplicationRunner implements ApplicationRunner {

    @Autowired
    private Map<String, DefaultMQPushConsumer> defaultMQPushConsumerMap;

    @Override
    public void run(ApplicationArguments args) {
        if (CollectionUtils.isEmpty(defaultMQPushConsumerMap)) {
            return;
        }
        defaultMQPushConsumerMap.forEach((bean, consumer) -> {
            try {
                consumer.start();
            } catch (MQClientException e) {
                log.error("Consumer bean:[{}] start error.", bean, e);
            }
        }
    }
}
除非注明,否则均为李锋镝的博客 - LiFengdi.Com原创文章,转载必须以链接形式标明本文链接
本文链接:https://www.lifengdi.com/archives/article/3888
标签: JAVA MQ RocketMQ SpringBoot
最后更新:2022年6月23日

李锋镝

既然选择了远方,便只顾风雨兼程。

打赏 点赞
< 上一篇
guest
您的姓名(必填)
您的邮箱(必填)
您的站点
guest
您的姓名(必填)
您的邮箱(必填)
您的站点
0 评论
Inline Feedbacks
查看所有评论
文章目录
  • 背景
    • 生产者配置
    • 消费者配置
  • 优化
网站统计
  • 文章总数:264 篇
  • 评论总数:421 篇
  • 标签数量:218 个
  • 最后更新:2022年06月28日
  • 建站日期:2016年6月6

红豆生南国,春来发几枝。
愿君多采撷,此物最相思。

最新 热点 随机
最新 热点 随机
MybatisCodeHelperPro激活 @Resource 和 @Autowired 的区别 使用RocketMQ时,服务启动过程中,Consumer在服务未启动时消费消息问题处理 祝大家六一儿童节快乐~~~ 网易云什么时候能有杰伦的歌…… jsdelivr的CDN加速好像不行了……
居家办公了~办理居住证困难重重啊!WordPress的自动更新好烦啊醒醒~补个税了居住证签注...十一节后开工头一天,修了个耳机……
JVM安全点介绍 博客有logo啦 离骚 从SQL规范性检查、表结构索引检查着手分析如何优化SQL 海琴烟~~~ 居住证签注...
最近评论
张三 发布于 1 个月前(05月20日) 收到,谢谢博主啊
张三 发布于 1 个月前(05月20日) 请问是哪些插件啊,我想用一下试试
zenmexiugai 发布于 1 个月前(05月20日) 改成一样的还是报错,怎么回事呢
张三 发布于 1 个月前(05月19日) 我不会css,作者的前端是怎么写的啊?包括这些评论啊什么的
张三 发布于 1 个月前(05月19日) 很棒的博客 作者加油啊
有情链接
  • 志文工作室
  • 临窗旋墨
  • 旧时繁华
  • 城南旧事
  • 强仔博客
  • 林三随笔
  • 徐艺扬的博客
  • 猫鼬的星球计划
  • 云辰博客
  • 韩小韩博客
  • 知向前端
  • 阿誉的博客
  • 林羽凡
  • 情侣头像
  • 哥斯拉
  • Xym's blog

COPYRIGHT © 2022 lifengdi.com. ALL RIGHTS RESERVED.

Theme Kratos Made By Seaton Jiang

豫ICP备16004681号-2