RocketMq学习笔记
RocketMq学习笔记
Quick Start
1 | |
Start Name Server
1 | |
Start Broker
1 | |
Send & Receive Messages
1 | |
Shutdown Servers
1 | |
RocketMq Console
已更名为rocketmq-dashboard
下载与安装
1 | |
修改配置
rocketmq-console/src/main/resources/application.properties
1 | |
RocketMq启动
1 | |
1 | |

RocketMq使用
在消息存储方面使用顺序写
在消息发送方面使用零拷贝技术,少了从内核态到用户态的步骤,同时一次只能映射1.5G~2G的数据到虚拟内存,所以RocketMQ默认设置单个CommitLog日志数据文件大小为1G

- CommitLog: 存储消息的元数据
- ConsumerQueue: 存储消息在CommitLog的索引,按照消息的偏移量来查询消息
- IndexFile: 提供了一种通过key或者时间区间查询消息的方法
刷盘机制
指的是消息从内存到磁盘
- 同步刷盘,保证数据不丢失
- 异步刷盘,高吞吐量

高可用

消息复制
- 同步复制,只有master把消息同步到slave,才通知用户写入成功
- 异步复制,只要master写入成功,就通知用户成功,可能出现消息的丢失
- 配置方式:broker.properties中的brokerRole
- ASYNC_MASTER 主节点,异步复制
- SYNC_MASTER 主节点,同步复制,保证消息不丢失
- SLAVE 从节点

建议:异步刷盘+主从同步复制
负载均衡
- 生产者负载均衡:轮询发送,会平均落在不同的Broker的不同的Queue(同一个topic)

消费者负载均衡
集群模式,默认就实现了负载均衡,只需要增加消费者就可以达到负载的效果
在集群模式下,一个队列只允许分配给一个消费者,当消费者的数量比队列的数量还多的话,就会出现闲置的消费者
默认算法是:AllocateMessageQueueAveragely

另一种算法是:AllocateMessageQueueAveragelyByCircle

广播模式
每一个消费者都会消费所有的队列,所有广播模式不属于负载均衡

消息重试
指的是消费者消费消息失败后会重试
顺序消息的重试
当出现消费失败的情况时,会无限次数重试(每隔1秒),应及时进行处理,避免造成消费阻塞
无序消息的重试
无序消息重试,只针对集群方式生效(默认最多重试16次),依然失败则进入死信队列,而广播模式没有重试机制

无序消费的集群模式重试机制配置方式:
推荐使用:Action.ReconsumeLater;

不想重试,配置如下

死信队列
当消息经过重试后依然失败,消息会被放入死信队列
特征:
- 不会再被消费者正常消费
- 有效期与正常消息相同,均为3天
- 死信队列对应的是消费者组,该消费者组下面的所有topic都会放入同一个死信队列
如何消费死信队列
- 可以在控制台重新发送该消息
- 可以创建一个新的消费者,重新消费死信队列中的消息
生产者与生产者组
消息发送包括同步发送,异步发送,顺序发送,单向发送。
同步发送,发送过程中处于阻塞状态,直到broker返回消息确认
异步发送,同步异步回调来接收broker的消息确认
顺序发送,
单向发送,发过去就不管了,没有消息确认机制
生产者组,同一类Producer的集合,发送同一类消息且发送逻辑一致
使用场景:如果发送的是事务消息且发送消息之后生产者崩溃了,但事务还没有结束,此时broker会与生产者组的其他Producer完成整个事务的发送
消息生产与消费
顺序消费
生产者通过MessageQueueSelector实现,原理是对传递的参数进行取模或者hash运算,将需要排序的消息发往同一个队列
当Broker推送消息到消费者时,消费者会通过一个orderly的监听器进行顺序消费
延迟消息
消息生产者设置messageDelayLevel实现延迟消息,非商业版只能设置18个等级的messageDelayLevel,如下
原理:18个等级对应着18个延迟消费的队列(系统内置的延迟队列SCHEDULE_TOPIC_XXXX),延迟消息不是延迟发送,而是延迟消费
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
批量消息
- 将消息放到集合进行批量发送,但是批量的消息限制在4m以内
- 如果超出4M,可以使用
ListSpliter进行拆分成小于4m的集合进行分批发送;同时设置Broker的接收限制为4M以内,即broker.conf中的maxMessageSize - 在生产者中的源码中
DefaultMQProducer,已经设置了maxMessageSize的大小限制为4M

过滤消息
Tag方式的过滤
生产方对消息打上tag
消费方订阅带有指定tag的消息,实际上过滤是Broker做的
不适用复杂场景的过滤
SQL表达式过滤
生产方通过putUserProperty为消息设置各种键值对
消费方通过mq的sql表达式
同时设置
broker.conf中的enablePropertyFilter=true
消息的推和拉
推模式
推模式指的是broker将消息推给消费者,消费者通过监听器接收消息并消费;
大部分场景用的都是推模式
被动接收
拉模式
拉模式指的是消费者主动向broker拉取消息,编程起来相对复杂,但灵活性高,可以自定义拉去哪些消息
适用于消息的回溯,比如需要重新消费过去24小时的某些消息,可以使用拉模式
主动拉去
消息幂等
当Broker或者客户端重启、扩缩容时,会触发Rebalance,重新进行负载均衡
通过业务唯一标识来保证幂等性
- 将消费过的消息进行保存,每次消费之前去查一下,如已消费则直接丢弃
- 使用分布式锁,每次消费之前去获取锁,来判断是否已经消费过该消息
技术架构

NameServer作用
- Broker管理,集群中每一个NameServer节点都会保存一份完整Broker信息,同时,提供心跳检测Broker是否存活
- RouteInfo管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。

BrokerServer作用
Broker主要负责消息的存储、投递和查询以及服务高可用保证
部署架构

- NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
- 一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName
- BrokerId为0表示Master,非0表示Slave
- Broker与NameServer通过长连接定时注册RouteInfo到NameServer
- ==当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。==
Producer
- Producer完全无状态
- Producer与NameServer集群中的其中一个节点(随机选择)建立长连接
- 定期从NameServer获取Topic路由信息
- Producer定期想Broker的Master节点发送心跳
Consumer
- Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接
- 定期从NameServer获取Topic路由信息
- Consumer定期向Master、Slave发送心跳
消息存储

CommitLog
- 存放的是真正的消息内容
- 存储Producer端写入的消息主体内容
- 消息主要是顺序写入日志文件(文件长度20),00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。
ConsumeQueue
- 存放的是消息的偏移量信息
- ConsumeQueue是逻辑消费队列
- Consumer可根据ConsumeQueue来查找待消费的消息
- 具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
- 文件中每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode
- 每个ConsumeQueue文件大小约5.72M
IndexFile
- IndexFile(索引文件)**==提供了一种可以通过key或时间区间来查询消息的方法==**
- Index文件的存储位置是:$HOME/store/index/{fileName},文件名fileName是以创建时的时间戳命名的
- 单个IndexFile文件大小约为400M
- IndexFile的底层存储设计为HashMap结构
Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。
页缓存与内存映射
页缓存 PageCache
- 对于数据的写入,OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。
- 对于数据的读取,如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。
内存映射
- RocketMQ主要通过MappedByteBuffer对文件进行读写操作。即零拷贝技术
源码分析
生产者发送消息
负载均衡
org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue
1 | |