李锋镝的博客

  • 首页
  • 时间轴
  • 评论区显眼包🔥
  • 左邻右舍
  • 博友圈
  • 关于我
    • 关于我
    • 另一个网站
    • 我的导航站
    • 网站地图
    • 赞助
  • 留言
  • 🚇开往
Destiny
自是人生长恨水长东
  1. 首页
  2. 转载
  3. 技术
  4. 正文

RocketMQ入门级教程

2022年8月16日 187点热度 0人点赞 0条评论

前言

RocketMQ是阿里巴巴旗下一款开源的MQ框架,经历过双十一考验、Java编程语言实现,有非常好完整生态系统。RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等,总之就是葛大爷的一句话

RocketMQ入门级教程

核心概念

  • NameServer:可以理解为是一个注册中心,主要是用来保存topic路由信息,管理Broker。在NameServer的集群中,NameServer与NameServer之间是没有任何通信的。
  • Broker:核心的一个角色,主要是用来保存topic的信息,接受生产者产生的消息,持久化消息。在一个Broker集群中,相同的BrokerName可以称为一个Broker组,一个Broker组中,BrokerId为0的为主节点,其它的为从节点。BrokerName和BrokerId是可以在Broker启动时通过配置文件配置的。每个Broker组只存放一部分消息。
  • 生产者:生产消息的一方就是生产者
  • 生产者组:一个生产者组可以有很多生产者,只需要在创建生产者的时候指定生产者组,那么这个生产者就在那个生产者组
  • 消费者:用来消费生产者消息的一方
  • 消费者组:跟生产者一样,每个消费者都有所在的消费者组,一个消费者组可以有很多的消费者,不同的消费者组消费消息是互不影响的。
  • topic(主题):可以理解为一个消息的集合的名字,生产者在发送消息的时候需要指定发到哪个topic下,消费者消费消息的时候也需要知道自己消费的是哪些topic底下的消息。
  • Tag(子主题):比topic低一级,可以用来区分同一topic下的不同业务类型的消息,发送消息的时候也需要指定。

这里有组的概念是因为可以用来做到不同的生产者组或者消费者组有不同的配置,这样就可以使得生产者或者消费者更加灵活。

工作流程

说完核心概念,再来说一下核心的工作流程,这里我先画了一张图。

RocketMQ入门级教程

通过这张图就可以很清楚的知道,RocketMQ大致的工作流程:

  • Broker启动的时候,会往每台NameServer(因为NameServer之间不通信,所以每台都得注册)注册自己的信息,这些信息包括自己的ip和端口号,自己这台Broker有哪些topic等信息。
  • Producer在启动之后会跟会NameServer建立连接,定期从NameServer中获取Broker的信息,当发送消息的时候,会根据消息需要发送到哪个topic去找对应的Broker地址,如果有的话,就向这台Broker发送请求;没有找到的话,就看根据是否允许自动创建topic来决定是否发送消息。
  • Broker在接收到Producer的消息之后,会将消息存起来,持久化,如果有从节点的话,也会主动同步给从节点,实现数据的备份
  • Consumer启动之后也会跟会NameServer建立连接,定期从NameServer中获取Broker和对应topic的信息,然后根据自己需要订阅的topic信息找到对应的Broker的地址,然后跟Broker建立连接,获取消息,进行消费

就跟上面的图一样,整体的工作流程还是比较简单的,这里我简化了很多概念,主要是为了好理解。

环境搭建

终于讲完了一些简单的概念,接下来就来搭建一套RocketMQ的环境。

通过上面分析,我们知道,在RocketMQ中有NameServer、Broker、生产者、消费者四种角色。而生产者和消费者实际上就是业务系统,所以这里不需要搭建,真正要搭建的就是NameServer和Broker,但是为了方便RocketMQ数据的可视化,这里我多搭建一套可视化的服务。

搭建过程比较简单,按照步骤一步一步来就可以完成,如果提示一些命令不存在,那么直接通过yum安装这些命令就行。

一、准备

需要准备一个linux服务器,需要先安装好JDK

关闭防火墙

1
2
systemctl stop firewalld
systemctl disable firewalld

下载并解压RocketMQ

1、创建一个目录,用来存放rocketmq相关的东西

mkdir /usr/rocketmq
cd /usr/rocketmq

 

2、下载并解压rocketmq

下载

wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip

 

解压

unzip rocketmq-all-4.7.1-bin-release.zip

 

看到这一个文件夹就完成了

RocketMQ入门级教程

然后进入rocketmq-all-4.7.1-bin-release文件夹

cd rocketmq-all-4.7.1-bin-release

 

RocketMQ的东西都在这了

RocketMQ入门级教程

二、搭建NameServer

修改jvm参数

在启动NameServer之前,强烈建议修改一下启动时的jvm参数,因为默认的参数都比较大,为了避免内存不够,建议修改小,当然,如果你的内存足够大,可以忽略。

vi bin/runserver.sh

 

修改画圈的这一行

RocketMQ入门级教程

这里你可以直接修改成跟我一样的

-server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=32m -XX:MaxMetaspaceSize=50m

 

启动NameServer

修改完之后,执行如下命令就可以启动NameServer了

nohup sh bin/mqnamesrv &

 

查看NameServer日志

tail -f ~/logs/rocketmqlogs/namesrv.log

 

如果看到如下的日志,就说明启动成功了

RocketMQ入门级教程NameServer日志

三、搭建Broker

这里启动单机版的Broker

修改jvm参数

跟启动NameServer一样,也建议去修改jvm参数

vi bin/runbroker.sh

 

将画圈的地方设置小点,当然也别太小啊

RocketMQ入门级教程

当然你还是可以跟我设置的一样

-server -Xms1g -Xmx1g -Xmn512m

 

修改Broker配置文件broker.conf

这里需要改一下Broker配置文件,需要指定NameServer的地址,因为需要Broker需要往NameServer注册

vi conf/broker.conf

 

Broker配置文件

RocketMQ入门级教程Broker配置文件

这里就能看出Broker的配置了,什么Broker集群的名称啊,Broker的名称啊,Broker的id啊,都跟前面说的对上了。

在文件末尾追加地址

namesrvAddr = localhost:9876

 

因为NameServer跟Broker在同一台机器,所以是localhost,NameServer端口默认的是9876。

不过这里我还建议再修改一处信息,因为Broker向NameServer进行注册的时候,带过去的ip如果不指定就会自动获取,但是自动获取的有个坑,就是有可能你的电脑无法访问到这个自动获取的ip,所以我建议手动指定你的电脑可以访问到的服务器ip。

我的虚拟机的ip是192.168.200.143,所以就指定为192.168.200.143,如下

brokerIP1 = 192.168.200.143
brokerIP2 = 192.168.200.143

 

如果以上都配置的话,最终的配置文件应该如下,红圈的为新加的

RocketMQ入门级教程

启动Broker

nohup sh bin/mqbroker -c conf/broker.conf &

 

-c 参数就是指定配置文件

查看日志

tail -f ~/logs/rocketmqlogs/broker.log

 

当看到如下日志就说明启动成功了

RocketMQ入门级教程

四、搭建可视化控制台

其实前面NameServer和Broker搭建完成之后,就可以用来收发消息了,但是为了更加直观,可以搭一套可视化的服务。

可视化服务其实就是一个jar包,启动就行了。

jar包可以从这获取

链接:https://pan.baidu.com/s/16s1qwY2qzE2bxR81t5Wm6w
提取码:s0sd

将jar包上传到服务器,放到/usr/rocketmq的目录底下,当然放哪都无所谓,这里只是为了方便,因为rocketmq的东西都在这里

然后进入/usr/rocketmq下,执行如下命名

nohup java -jar -server -Xms256m -Xmx256m -Drocketmq.config.namesrvAddr=localhost:9876 -Dserver.port=8088 rocketmq-console-ng-1.0.1.jar &

 

rocketmq.config.namesrvAddr就是用来指定NameServer的地址的

查看日志

tail -f ~/logs/consolelogs/rocketmq-console.log

 

当看到如下日志,就说明启动成功了

RocketMQ入门级教程

然后在浏览器中输入http://linux服务器的ip:8088/就可以看到控制台了,如果无法访问,可以看看防火墙有没有关闭

RocketMQ入门级教程

右上角可以把语言切换成中文

RocketMQ入门级教程Broker集群信息RocketMQ入门级教程topic信息

通过控制台可以查看生产者、消费者、Broker集群等信息,非常直观。

功能很多,这里就不一一介绍了。

测试

环境搭好之后,就可以进行测试了。

引入依赖

    org.apache.rocketmq
    rocketmq-client
    4.7.1

 

生产者发送消息

public class Producer {
    public static void main(String[] args) throws Exception {
        //创建一个生产者,指定生产者组为sanyouProducer
        DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");

        // 指定NameServer的地址
        producer.setNamesrvAddr("192.168.200.143:9876");
        // 第一次发送可能会超时,我设置的比较大
        producer.setSendMsgTimeout(60000);

        // 启动生产者
        producer.start();

        // 创建一条消息
        // topic为 sanyouTopic
        // 消息内容为 三友的java日记
        // tags 为 TagA
        Message msg = new Message("sanyouTopic", "TagA", "三友的java日记 ".getBytes(RemotingHelper.DEFAULT_CHARSET));

        // 发送消息并得到消息的发送结果,然后打印
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);

        // 关闭生产者
        producer.shutdown();
    }

}

 

  • 构建一个消息生产者DefaultMQProducer实例,然后指定生产者组为sanyouProducer;
  • 指定NameServer的地址:服务器的ip:9876,因为需要从NameServer拉取Broker的信息
  • producer.start() 启动生产者
  • 构建一个内容为三友的java日记的消息,然后指定这个消息往sanyouTopic这个topic发送
  • producer.send(msg):发送消息,打印结果
  • 关闭生产者

运行结果如下

SendResult [sendStatus=SEND_OK, msgId=C0A81FAF54F818B4AAC2475FD2010000, offsetMsgId=C0A8C88F00002A9F000000000009AE55, messageQueue=MessageQueue [topic=sanyouTopic, brokerName=broker-a, queueId=0], queueOffset=0]

 

sendStatus=SEND_OK 说明发送成功了,此时就能后控制台看到未消费的消息了。

RocketMQ入门级教程

到控制台看到消息那块,然后选定发送的topic,查询的时间范围手动再选一下,不选就查不出来(我怀疑这是个bug),然后查询就能看到了一条消息。

然后点击一下MESSAGE DETAIL就能够看到详情。

RocketMQ入门级教程

这里就能看到发送消息的详细信息。

左下角消息的消费的消费,因为我们还没有消费者订阅这个topic,所以左下角没数据。

消费者消费消息

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 通过push模式消费消息,指定消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");

        // 指定NameServer的地址
        consumer.setNamesrvAddr("192.168.200.143:9876");

        // 订阅这个topic下的所有的消息
        consumer.subscribe("sanyouTopic", "*");

        // 注册一个消费的监听器,当有消息的时候,会回调这个监听器来消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("消费消息:%s", new String(msg.getBody())   "n");
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

 

  • 创建一个消费者实例对象,指定消费者组为sanyouConsumer
  • 指定NameServer的地址:服务器的ip:9876
  • 订阅 sanyouTopic 这个topic的所有信息
  • consumer.registerMessageListener ,这个很重要,是注册一个监听器,这个监听器是当有消息的时候就会回调这个监听器,处理消息,所以需要用户实现这个接口,然后处理消息。
  • 启动消费者

启动之后,消费者就会消费刚才生产者发送的消息,于是控制台就打印出如下信息

Consumer Started.
消费消息:三友的java日记

 

此时再去看控制台

RocketMQ入门级教程

发现被sanyouConsumer这个消费者组给消费了。

SpringBoot环境下集成RocketMQ

集成

在实际项目中肯定不会像上面测试那样用,都是集成SpringBoot的。

1、引入依赖

    org.apache.rocketmq
    rocketmq-spring-boot-starter
    2.1.1

    org.springframework.boot
    spring-boot-starter-test
    2.1.1.RELEASE

 

2、yml配置

rocketmq:
  producer:
    group: sanyouProducer
  name-server: 192.168.200.143:9876

 

3、创建消费者

SpringBoot底下只需要实现RocketMQListener接口,然后加上@RocketMQMessageListener注解即可

@Component
@RocketMQMessageListener(consumerGroup = "sanyouConsumer", topic = "sanyouTopic")
public class SanYouTopicListener implements RocketMQListener {

    @Override
    public void onMessage(String msg) {
        System.out.println("处理消息:"   msg);
    }

}

 

@RocketMQMessageListener需要指定消费者属于哪个消费者组,消费哪个topic,NameServer的地址已经通过yml配置文件配置类

4、测试

@SpringBootTest(classes = RocketMQApplication.class)
@RunWith(SpringRunner.class)
public class RocketMQTest {

    @Autowired
    private RocketMQTemplate template;

    @Test
    public void send() throws InterruptedException {
        template.convertAndSend("sanyouTopic", "三友的java日记");
        Thread.sleep(60000);
    }

}

 

直接注入一个RocketMQTemplate,然后通过RocketMQTemplate发送消息。

运行结果如下:

处理消息:三友的java日记

 

的确消费到消息了。

原理

其实原理是一样的,只不过在SpringBoot中给封装了一层,让使用起来更加简单。

1、RocketMQTemplate构造代码

RocketMQ入门级教程

所以从这可以看出,最终在构造RocketMQTemplate的时候,传入了一个DefaultMQProducer,所以可想而知,最终RocketMQTemplate发送消息也是通过DefaultMQProducer发送的。

2、@RocketMQMessageListener 注解处理

RocketMQ入门级教程

从这可以看出,会为每一个加了@RocketMQMessageListener注解的对象创建一个DefaultMQPushConsumer,所以最终也是通过DefaultMQPushConsumer消费消息的。

至于监听器,是在这

RocketMQ入门级教程

遍历每条消息,然后调用handleMessage,最终会调用实现了RocketMQListener的对象处理消息。

最后

通过上面的理论介绍和实际的环境搭建再加上代码的测试,相信应该可以对RocketMQ有个入门,有兴趣的小伙伴可以手动搭起来,整个过程顺利的话可能就十几二十分钟这样子。

 

原文地址:https://www.cnblogs.com/zzyang/p/16591213.html

除非注明,否则均为李锋镝的博客原创文章,转载必须以链接形式标明本文链接

本文链接:https://www.lifengdi.com/transport/technology/3959

相关文章

  • RocketMQ的push消费方式实现详解
  • 使用RocketMQ时,服务启动过程中,Consumer在服务未启动时消费消息问题处理
  • MQ消费端遇到瓶颈该怎么办?
  • ThreadLocal如何解决内存泄漏问题
  • Redisson分布式锁的watch dog自动续期机制
本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可
标签: JAVA MQ RocketMQ
最后更新:2022年8月16日

李锋镝

既然选择了远方,便只顾风雨兼程。

打赏 点赞
< 上一篇
下一篇 >

文章评论

1 2 3 4 5 6 7 8 9 11 12 13 14 15 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 46 47 48 49 50 51 52 53 54 55 57 58 60 61 62 63 64 65 66 67 69 72 74 76 77 78 79 80 81 82 85 86 87 90 92 93 94 95 96 97 98 99
取消回复

愿将腰下剑,直为斩楼兰。

那年今日(04月20日)

  • 1971年:中国著名法学家周鲠生逝世
  • 1901年:著名建筑学家梁思成出生于日本东京,祖籍广东新会
  • 1889年:德国纳粹党元首希特勒出生于奥地利布劳瑙
  • 1808年:法兰西第二帝国皇帝拿破仑出生
  • 429年:中国古代数学家祖冲之出生
  • 更多历史事件
最新 热点 随机
最新 热点 随机
Everything Claude Code 详细使用文档 配置Jackson使用字段而不是getter/setter来序列化和反序列化 这个域名注册整整十年了,十年时间,真快啊 Claude Code全维度实战指南:从入门到精通,解锁AI编程新范式 Apollo配置中心中的protalDB的作用是什么 org.apache.ibatis.plugin.Interceptor类详细介绍及使用
AI时代,个人技术博客的出路在哪里?使用WireGuard在Ubuntu 24.04系统搭建VPN这个域名注册整整十年了,十年时间,真快啊WordPress实现用户评论等级排行榜插件WordPress网站换了个字体,差点儿把样式换崩了做了一个WordPress文章热力图插件
使用WireGuard在Ubuntu 24.04系统搭建VPN 为什么 Apache Doris 是比 Elasticsearch 更好的实时分析替代方案? 妹妹的画【2019.07.03】 jmap命令(jdk1.8) 我要狠狠的反驳“公司禁止使用 Lombok ”的观点! Java之五种遍历Map集合的方式
标签聚合
SpringBoot 多线程 分布式 AI docker 数据库 AI编程 ElasticSearch Redis Spring JVM 设计模式 WordPress IDEA SQL JAVA 架构 日常 MySQL K8s
友情链接
  • Blogs·CN
  • Honesty
  • Mr.Sun的博客
  • 临窗旋墨
  • 哥斯拉
  • 彬红茶日记
  • 志文工作室
  • 懋和道人
  • 拾趣博客导航
  • 搬砖日记
  • 旧时繁华
  • 林羽凡
  • 瓦匠个人小站
  • 皮皮社
  • 知向前端
  • 蜗牛工作室
  • 韩小韩博客
  • 风渡言

COPYRIGHT © 2026 lifengdi.com. ALL RIGHTS RESERVED.

域名年龄

Theme Kratos Made By Dylan

津ICP备2024022503号-3

京公网安备11011502039375号