背景
我们使用RocketMQ时,一般Consumer
启动都是使用的@PostConstruct
注解。(@PostConstruct
:用于在执行任何初始化时执行依赖注入后需要执行的方法。),或者使用bean的方式配置。
配置如下:
生产者配置
在配置类中配置所有生产者,在业务中注入使用,将生产者的启动和销毁绑定到 Bean 的初始化和销毁上:
@Configuration
public class MQProducerConfig {
// 第一个生产者
@Bean(initMethod = "start", destroyMethod = "shutdown")
public DefaultMQProducer demo1MQProducer() {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
defaultMQProducer.setNamesrvAddr("<nameServer>");
defaultMQProducer.setProducerGroup("<group>");
defaultMQProducer.setInstanceName("<instanceName>");
// 其他生产者配置
return defaultMQProducer;
}
// 第二个生产者
@Bean(initMethod = "start", destroyMethod = "shutdown")
public DefaultMQProducer demo2MQProducer() {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
defaultMQProducer.setNamesrvAddr("<nameServer>");
defaultMQProducer.setProducerGroup("<group>");
defaultMQProducer.setInstanceName("<instanceName>");
// 其他生产者配置
return defaultMQProducer;
}
// ......
}
消费者配置
在配置类中配置所有消费者,将消费者的启动和销毁绑定到 Bean 的初始化和销毁上:
@Configuration
public class MQConsumerConfig {
// 第一个消费者
@Bean(initMethod = "start", destroyMethod = "shutdown")
public DefaultMQPushConsumer demo1Consumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>");
consumer.setNamesrvAddr("<nameServer>");
consumer.setInstanceName("<instanceName>");
consumer.subscribe("<topic>", "<tag>");
consumer.setMessageListener(<listener>);
// 其他消费者配置
return consumer;
}
// 第二个消费者
@Bean(initMethod = "start", destroyMethod = "shutdown")
public DefaultMQPushConsumer demo2Consumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>");
consumer.setNamesrvAddr("<nameServer>");
consumer.setInstanceName("<instanceName>");
consumer.subscribe("<topic>", "<tag>");
consumer.setMessageListener(<listener>);
// 其他消费者配置
return consumer;
}
// ......
}
优化
上述配置在项目启动 Bean
加载的时候就会启动生产者和消费者,导致项目启动慢,并且会在项目还未启动完,就会有大量消息涌入,所以可以使用 ApplicationRunner
或 CommandLineRunner
接口在项目启动成功后再执行 MQ 的启动。同时,去掉Consumer
的@PostConstruct
注解。
注解如下:
// 去掉initMethod配置
@Bean(destroyMethod = "shutdown")
将上述配置中的消费者启动不再绑定到 Bean 初始化阶段。
新的消费者配置如下:
@Configuration
public class MQConsumerConfig {
// 第一个消费者
@Bean(destroyMethod = "shutdown")
public DefaultMQPushConsumer demo1Consumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>");
consumer.setNamesrvAddr("<nameServer>");
consumer.setInstanceName("<instanceName>");
consumer.subscribe("<topic>", "<tag>");
consumer.setMessageListener(<listener>);
// 其他消费者配置
return consumer;
}
// 第二个消费者
@Bean(destroyMethod = "shutdown")
public DefaultMQPushConsumer demo2Consumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("<group>");
consumer.setNamesrvAddr("<nameServer>");
consumer.setInstanceName("<instanceName>");
consumer.subscribe("<topic>", "<tag>");
consumer.setMessageListener(<listener>);
// 其他消费者配置
return consumer;
}
// ......
}
在项目启动完成节点统一启动消费者:
@Slf4j
@Component
public class MqConsumerApplicationRunner implements ApplicationRunner {
@Autowired
private Map<String, DefaultMQPushConsumer> defaultMQPushConsumerMap;
@Override
public void run(ApplicationArguments args) {
if (CollectionUtils.isEmpty(defaultMQPushConsumerMap)) {
return;
}
defaultMQPushConsumerMap.forEach((bean, consumer) -> {
try {
consumer.start();
} catch (MQClientException e) {
log.error("Consumer bean:[{}] start error.", bean, e);
}
}
}
}
除非注明,否则均为李锋镝的博客原创文章,转载必须以链接形式标明本文链接