RocketMq学习笔记

RocketMq学习笔记

Quick Start

1
2
3
4
5
> unzip rocketmq-all-4.9.4-source-release.zip
> cd rocketmq-all-4.9.4-source-release/
# 改完配置需要重新编译
> mvn -Prelease-all -DskipTests clean install -U
> cd distribution/target/rocketmq-4.9.4/rocketmq-4.9.4

Start Name Server

1
2
3
> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

Start Broker

1
2
3
> nohup sh bin/mqbroker -n localhost:9876 &
> tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 172.30.30.233:10911] boot success...

Send & Receive Messages

1
2
3
4
5
6
7
> export NAMESRV_ADDR=localhost:9876
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

Shutdown Servers

1
2
3
4
5
6
7
> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK

RocketMq Console

已更名为rocketmq-dashboard

下载与安装

1
2
3
4
5
6
7
8
9
10
11
# 从GitHub上面拉取代码
$ git clone https://github.com/apache/rocketmq-dashboard.git
$ cd rocketmq-dashboard

# maven打包
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar

or

mvn spring-boot:run

修改配置

rocketmq-console/src/main/resources/application.properties

1
2
3
4
5
6
7
8
9
10
11
12
server.address=0.0.0.0
# server.port=8080
# 我这里将8080改成19876了
server.port=19876
# ……
# 中间省略
# ………
# 这里是指定Nameserv,也可以不指定,在前端控制台进行指定
rocketmq.config.namesrvAddr=localhost:9876
# ……
# 后续省略
# ………

RocketMq启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
NettyServerConfig默认端口:10911
NameServer默认端口:9876

# rocketmq 启动路径
/xdfapp/server/rocket/rocketmq-all-4.3.0/distribution/target/apache-rocketmq/bin
# 日志路径
/root/logs/rocketmqlogs

# 启动NameServer
nohup ./mqnamesrv &
# 启动Broker
nohup ./mqbroker -c ../conf/broker.conf &


# broker.conf配置
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr=10.60.0.65:9876
# brokerIP1=10.60.0.65
autoCreateTopicEnable=true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


# 所属集群名字
brokerClusterName=DefaultCluster

# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
# 在 broker-b.properties 使用: broker-b
brokerName=broker-a

# 0 表示 Master,> 0 表示 Slave
brokerId=0

# nameServer地址,分号分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

# 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
brokerIP1=192.168.200.129

# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4

# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true

# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true

# Broker 对外服务的监听端口
listenPort=10911

# 删除文件时间点,默认凌晨4点
deleteWhen=04

# 文件保留时间,默认48小时
fileReservedTime=120

# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824

# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000

# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
# commitLog 存储路径
# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
# 消费队列存储
# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
# 消息索引存储路径
# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
# checkpoint 文件存储路径
# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
# abort 文件存储路径
# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# 限制的消息大小
maxMessageSize=65536

# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000

# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER

# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
# 发消息线程池数量
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128

image-20220817173427156

RocketMq使用

在消息存储方面使用顺序写

在消息发送方面使用零拷贝技术,少了从内核态到用户态的步骤,同时一次只能映射1.5G~2G的数据到虚拟内存,所以RocketMQ默认设置单个CommitLog日志数据文件大小为1G

image-20220817175729240

  • CommitLog: 存储消息的元数据
  • ConsumerQueue: 存储消息在CommitLog的索引,按照消息的偏移量来查询消息
  • IndexFile: 提供了一种通过key或者时间区间查询消息的方法

刷盘机制

指的是消息从内存到磁盘

  • 同步刷盘,保证数据不丢失
  • 异步刷盘,高吞吐量

image-20220817180635301

高可用

image-20220817181308435

消息复制

  • 同步复制,只有master把消息同步到slave,才通知用户写入成功
  • 异步复制,只要master写入成功,就通知用户成功,可能出现消息的丢失
  • 配置方式:broker.properties中的brokerRole
    • ASYNC_MASTER 主节点,异步复制
    • SYNC_MASTER 主节点,同步复制,保证消息不丢失
    • SLAVE 从节点

image-20220817182127810

建议:异步刷盘+主从同步复制

负载均衡

  • 生产者负载均衡:轮询发送,会平均落在不同的Broker的不同的Queue(同一个topic)

image-20220817184304881

  • 消费者负载均衡

    • 集群模式,默认就实现了负载均衡,只需要增加消费者就可以达到负载的效果

      在集群模式下,一个队列只允许分配给一个消费者,当消费者的数量比队列的数量还多的话,就会出现闲置的消费者

      默认算法是:AllocateMessageQueueAveragely

      image-20220817184838450

      另一种算法是:AllocateMessageQueueAveragelyByCircle

      image-20220817185045079

    • 广播模式

      每一个消费者都会消费所有的队列,所有广播模式不属于负载均衡

image-20220817185816466

消息重试

指的是消费者消费消息失败后会重试

  • 顺序消息的重试

    当出现消费失败的情况时,会无限次数重试(每隔1秒),应及时进行处理,避免造成消费阻塞

  • 无序消息的重试

    无序消息重试,只针对集群方式生效(默认最多重试16次),依然失败则进入死信队列,而广播模式没有重试机制

image-20220817191627279

无序消费的集群模式重试机制配置方式:

推荐使用:Action.ReconsumeLater;

image-20220817192040478

不想重试,配置如下

image-20220817192329498

死信队列

当消息经过重试后依然失败,消息会被放入死信队列

特征:

  • 不会再被消费者正常消费
  • 有效期与正常消息相同,均为3天
  • 死信队列对应的是消费者组,该消费者组下面的所有topic都会放入同一个死信队列

如何消费死信队列

  • 可以在控制台重新发送该消息
  • 可以创建一个新的消费者,重新消费死信队列中的消息

生产者与生产者组

消息发送包括同步发送,异步发送,顺序发送,单向发送。

同步发送,发送过程中处于阻塞状态,直到broker返回消息确认

异步发送,同步异步回调来接收broker的消息确认

顺序发送,

单向发送,发过去就不管了,没有消息确认机制

生产者组,同一类Producer的集合,发送同一类消息且发送逻辑一致

使用场景:如果发送的是事务消息且发送消息之后生产者崩溃了,但事务还没有结束,此时broker会与生产者组的其他Producer完成整个事务的发送

消息生产与消费

  • 顺序消费

    生产者通过MessageQueueSelector实现,原理是对传递的参数进行取模或者hash运算,将需要排序的消息发往同一个队列

    当Broker推送消息到消费者时,消费者会通过一个orderly的监听器进行顺序消费

  • 延迟消息

    消息生产者设置messageDelayLevel实现延迟消息,非商业版只能设置18个等级的messageDelayLevel,如下

    原理:18个等级对应着18个延迟消费的队列(系统内置的延迟队列SCHEDULE_TOPIC_XXXX),延迟消息不是延迟发送,而是延迟消费

    messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

  • 批量消息

    • 将消息放到集合进行批量发送,但是批量的消息限制在4m以内
    • 如果超出4M,可以使用ListSpliter进行拆分成小于4m的集合进行分批发送;同时设置Broker的接收限制为4M以内,即broker.conf中的maxMessageSize
    • 在生产者中的源码中DefaultMQProducer,已经设置了maxMessageSize的大小限制为4M

    image-20220916075050844

  • 过滤消息

    • Tag方式的过滤

      生产方对消息打上tag

      消费方订阅带有指定tag的消息,实际上过滤是Broker做的

      不适用复杂场景的过滤

    • SQL表达式过滤

      生产方通过putUserProperty为消息设置各种键值对

      消费方通过mq的sql表达式

      同时设置broker.conf中的enablePropertyFilter=true

消息的推和拉

  • 推模式

    推模式指的是broker将消息推给消费者,消费者通过监听器接收消息并消费;

    大部分场景用的都是推模式

    被动接收

  • 拉模式

    拉模式指的是消费者主动向broker拉取消息,编程起来相对复杂,但灵活性高,可以自定义拉去哪些消息

    适用于消息的回溯,比如需要重新消费过去24小时的某些消息,可以使用拉模式

    主动拉去

消息幂等

当Broker或者客户端重启、扩缩容时,会触发Rebalance,重新进行负载均衡

通过业务唯一标识来保证幂等性

  • 将消费过的消息进行保存,每次消费之前去查一下,如已消费则直接丢弃
  • 使用分布式锁,每次消费之前去获取锁,来判断是否已经消费过该消息

技术架构

rocketmq_architecture_1

NameServer作用

  • Broker管理,集群中每一个NameServer节点都会保存一份完整Broker信息,同时,提供心跳检测Broker是否存活
  • RouteInfo管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。

rocketmq_architecture_2

BrokerServer作用

Broker主要负责消息的存储、投递和查询以及服务高可用保证

部署架构

rocketmq_architecture_3

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
  • 一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName
  • BrokerId为0表示Master,非0表示Slave
  • Broker与NameServer通过长连接定时注册RouteInfo到NameServer
  • ==当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。==

Producer

  • Producer完全无状态
  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接
  • 定期从NameServer获取Topic路由信息
  • Producer定期想Broker的Master节点发送心跳

Consumer

  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接
  • 定期从NameServer获取Topic路由信息
  • Consumer定期向Master、Slave发送心跳

消息存储

rocketmq_design_1

CommitLog

  • 存放的是真正的消息内容
  • 存储Producer端写入的消息主体内容
  • 消息主要是顺序写入日志文件(文件长度20),00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。

ConsumeQueue

  • 存放的是消息的偏移量信息
  • ConsumeQueue是逻辑消费队列
  • Consumer可根据ConsumeQueue来查找待消费的消息
  • 具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
  • 文件中每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode
  • 每个ConsumeQueue文件大小约5.72M

IndexFile

  • IndexFile(索引文件)**==提供了一种可以通过key或时间区间来查询消息的方法==**
  • Index文件的存储位置是:$HOME/store/index/{fileName},文件名fileName是以创建时的时间戳命名的
  • 单个IndexFile文件大小约为400M
  • IndexFile的底层存储设计为HashMap结构

Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。

页缓存与内存映射

页缓存 PageCache

  • 对于数据的写入,OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。
  • 对于数据的读取,如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。

内存映射

  • RocketMQ主要通过MappedByteBuffer对文件进行读写操作。即零拷贝技术

源码分析

生产者发送消息

负载均衡

org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* 生产者是如何选择将消息发送到哪个队列的
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
// 去threadLocal获取index,保证线程之间的数据隔离
int index = tpInfo.getSendWhichQueue().incrementAndGet();
// 遍历需要发送的队列
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
// 对index与队列总数进行取模
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
// 确定要发送到哪个队列
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}

final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}

return tpInfo.selectOneMessageQueue();
}

return tpInfo.selectOneMessageQueue(lastBrokerName);
}

RocketMq学习笔记
http://example.com/2023/03/10/Middleware/MessageQueue/RocketMq学习笔记/
作者
UncleBryan
发布于
2023年3月10日
许可协议