消息队列学习笔记–RocketMQ
消息队列的优缺点
优点
异步
对于实时性不是很高的业务,如积分增减、发送短信等非核心流程,可以放到消息队列中去,提升整个链路的响应时间。
削峰填谷
对于秒杀等短时间大流量场景,可以将请求短暂的在消息队列中堆积,平滑的消费请求。
解耦
MQ可以实现服务的高内聚、低耦合,通过发布订阅模型实现系统/模块间的解耦。
缺点
系统可用性降低
MQ相当于一个依赖组件,一旦出问题会导致系统不可用。
系统复杂度提高
引入MQ如何保证消息不重复消费?如何处理消息丢失?如何保证消息的顺序消费?
数据一致性问题
生产者消费者事务执行结果不一致造成的数据不一致问题。
消息队列选型
ActiveMQ、RabbitMQ、RocketMQ、Kafka对比:
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
单机吞吐量 | 万级,比 RocketMQ、Kafka 低一个数量级 | 同 ActiveMQ | 10 万级,支撑高吞吐 | 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景 |
topic 数量对吞吐量的影响 | topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic | topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源 | ||
时效性 | ms 级 | 微秒级,这是 RabbitMQ 的一大特点,延迟最低 | ms 级 | 延迟在 ms 级以内 |
可用性 | 高,基于主从架构实现高可用 | 同 ActiveMQ | 非常高,分布式架构 | 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 | 有较低的概率丢失数据 | 基本不丢 | 经过参数优化配置,可以做到 0 丢失 | 同 RocketMQ |
功能支持 | MQ 领域的功能极其完备 | 基于 erlang 开发,并发能力很强,性能极好,延时很低 | MQ 功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用 |
ActiveMQ:因为使用公司较少,没有经过大规模吞吐场景的考验,社区也不活跃,所以不做讨论。
RabbitMQ:是ActiveMQ的替代方案,能够保证数据不丢失,但吞吐量只能到万级别,相比于Kafka和RocketMQ低了一个数量级,但比Kafka多了很多的高级特性,如消息重试和死信队列,且写入延迟能达到微秒级,但使用Erlang作为开发语言,较为劝退。
Kafka:单机吞吐量能达到10万级别,写入延迟能到毫秒级别,但功能较为简单,只支持简单的MQ功能,主要运用在大数据实时计算和日志收集领域。
RocketMQ:单机吞吐量能到10w级别,低延迟、高性能、高可用,并且功能丰富,像延时消息、事务消息、消息回溯、死信队列等,广泛应用在订单、交易、计算、消息推送、binlog分发等场景。
主要通过RocketMQ来学习消息队列。
RocketMQ整体架构
RocketMQ主要分为四个组成部分:Producer、Broker、Consumer、Name Server。
其中,Producer负责生产消息,Consumer负责消费消息,Broker负责存储消息,NameServer负责注册发现与路由剔除。
Topic
Topic 是一种逻辑上的分区,是同一类消息的集合,每一个消息只能属于一个 Topic ,是RocketMQ进行消息订阅的基本单位。
每个 topic 会被分成很多 Messsage Queue ,和 Kafka 中的 Partition 概念一样,topic 的数据被分布在不同的 Message Queue 中。
在业务增长,消息量增大时,可以增大 topic 的 Message Queue,这样可以将压力分摊到更多的 broker 上。因为 Producer 可以发送消息的时候可以通过指定的算法,将消息均匀的发送到每个 Message Queue,这种算法可以是轮询、随机选择、Hash,也可以是自定义的。
NameServer
NameServer的作用是注册中心,类似于Zookeeper。每个NameServer节点间互相独立,没有任何信息交互,不存在任何的选主或主从切换,因此比Zookeeper更加轻量。每个NameServer节点都保存了所有的Broker元数据,并和Broker保持长连接心跳。
NameServer主要有两个功能:注册发现和路由剔除。
注册发现
Broker启动时会和NameServer所有的节点建立长连接,每隔30s发送一次心跳,包括BrokerId、Broker地址、Broker名称等Broker集群的信息和Topic的拓扑信息。然后生产者和消费者启动的时候任选一台 Name Server 机器拉取所需的 Topic 的路由信息缓存在本地内存中,之后每隔 30s 定时从远端拉取更新本地缓存。
路由剔除
NameServer定期监测Broker的心跳,一旦超过120s没有最新的心跳,则判断Broker失联,关闭Broker连接,剔除Broker信息,但不会主动通知Producer和Consumer,需要主动拉取才会更新,所以最长需要30s才能感知到Broker故障。
Producer
Producer负责生产消息,单个生产者与一台NameServer保持长链接,每隔30s从NameServer拉取topic配置信息,如果NameServer挂了,会自动连接另一台NameServer;单个生产者会和他关联的所有Broker保持长连接,并每隔30s发送心跳。
消息发送方式
可靠同步发送
Producer发送消息后,需要收到接受方的响应才能发送下一条消息。
可靠异步发送
异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。 MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务器响应即可返回,进行第二条消息发送。发送方通过回调接口接收服务器响应,并对响应结果进行处理。
单向发送
送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。
消息类型
普通消息
即正常的消息
延迟消息
延时消息在投递时,需要设置指定的延时级别,等到特定的时间间隔后消息才会被消费者消费。mq服务端,为每一个延迟级别单独设置一个定时器,定时(每隔1秒)拉取对应延迟级别的消费队列。
可以通过delayLevel设置延时级别,可以取以下值:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。
level == 0,消息为非延迟消息
1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
level > maxLevel,则level== maxLevel,例如level==20,延迟2h
事务消息
事务消息用于保证本地事务执行和MQ投递都能成功,保持一致性。
通过两阶段提交和状态定时回查来保证消息一定发送到Broker。
Producer发送Half Message,服务端响应ACK写入结果,根据发送结果执行本地事务(如果写入失败,Half Message不可见,不执行本地事务),再根据本地事务的执行情况发送Commit/Rollback请求。
对于处于Half Message pending状态的消息,服务端会定期回查,Producer收到消息会检查本地事务的执行情况,根据本地事务状态重新Commit或Rollback。
注意:Half Message并未真正进入topic的queue,而是存放在临时queue中,提交事务后才会真正转移到topic的queue中。
顺序消息
顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
全局顺序 对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。 适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景。
- 全局消息的实现方式:一个全局顺序 topic 只创建一个Message Queue
分区顺序 对于指定的一个 Topic,所有消息根据 key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。 适用场景:性能要求高,以 key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。
发送流程
- Producer在发送消息的时候,如果本地路由表中未缓存Topic的路由信息,则向NameServer发送获取路由的请求,更新本地路由表,并每隔30s从NameServer更新本地路由表
- Producer在拿到路由信息后,根据路由消息选择queue,发送消息
Consumer
Consumer负责消费消息,一个消费者组可以订阅多个Topic。Consumer启动后,就会通过定时任务不断地向RocketMQ集群中的所有Broker实例保持长连接,并每隔30s发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息),一旦连接断开,Broker会立即感知到,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费。
单个消费者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,消费者会自动连接下一个nameserver,并且每隔30s会从NameServer拉取topic信息更新。
消费模式
集群消费 同一个消费组里的消费者“瓜分”所有消息来消费,同一个消息只会被一个消费者实例消费。因为集群模式的消费进度是保存在Broker端的,所以即使应用崩溃,消费进度也不会出错。
广播消费 同一个消费组里的消费者会消费所有消息,即同一个消息会被每一个消费者实例消费。广播消费的消费进度保存在客户端机器的文件中。如果文件弄丢了,那么消费进度就丢失了,可能会导致部分消息没有消费。
拉取流程
对于任何一款消息中间件而言,消费者客户端一般有两种方式从消息中间件获取消息并消费:
(1)Push方式:由消息中间件主动地将消息推送给消费者;采用Push方式,实时性高,可以尽快的将消息发送给消费者进行消费。但是在消费者的处理消息的能力较弱的时候,而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常。
(2)Pull方式:由消费者客户端主动向消息中间件拉取消息;采用Pull方式,可控性好,如何设置Pull消息的频率需要重点去考虑。如果每次Pull的时间间隔比较久,会增加消息的延迟,若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能。
RocketMQ的消费方式都是基于拉模式拉取消息的,而且还采用了一种长轮询机制,来平衡上面Push/Pull方式的各自缺点:
Consumer发送pull请求,Broker端接受请求,如果发现队列里没有新消息,不立即返回,而是持有这个请求一段时间(通过设置超时时间来实现),在这段时间内轮询Broker队列内是否有新的消息,如果有新消息,就利用现有的连接返回消息给消费者;如果这段时间内没有新消息进入队列,则返回空。 这样消费消息的主动权既保留在Consumer端,也不会出现Broker积压大量消息后,短时间内推送给Consumer大量消息使Consumer因为性能问题出现消费不及时的情况。
消费进度保存
集群消费模式中,消费进度保存在broker 上,而广播模式中保存在客户端本地。
消费进度是消费者组之间互相隔离的,Broker 上通过 Topic + 消费者组名称作为 key,value 中分别记录每个 MessageQueue 对应该消费者组的消费偏移量 offset。
消费者侧会记录自己的消费进度到内存中的 OffsetTable,然后定时提交到 Broker 侧。
由于一批消息的消费次序不确定,可能下标大的消息先被消费结束,下标小的由于延时尚未被消费,此时消费者向 Broker 提交的 offset 应该是已被消费的最小下标,从而保证消息不被遗漏,但缺点在于可能重复消费消息。
高可用
- 重试,死信 重试 Topic:如果由于各种意外导致消息消费失败,那么该消息会自动被保存到重试Topic中,格式为“%RETRY%消费者组”,在订阅的时候会自动订阅这个重试Topic。 进入重试队列的消息有16次重试机会,每次都会按照一定的时间间隔进行,只要正常消费或者重试消费中有一次消费成功,就算消费成功。
死信Topic:死信Topic名字格式为“%DLQ%消费者组名”。如果正常消费1次失败,重试16次失败,那么消息会被保存到死信Topic中,进入死信Topic的消息不能被再次消费。RocketMQ认为,如果17次机会都失败了,说明生产者发送消息的格式发生了变化,或者消费服务出现了问题,需要人工介入处理。
Rebalance
Rebalance
(重平衡)机制
,用于在发生Broker掉线、Topic扩容和缩容、消费者扩容和缩容等变化时,自动感知并调整自身消费,以尽量减少甚至避免消息没有被消费。 触发时机
- Consumer启动时 启动之后会立马进行Rebalance
Consumer运行中会监听Broker发送过来的Rebalance消息,以及Consumer自身的定时任务(每隔20s)触发的Rebalance
Consumer停止运行时没有直接的调用Rebalance,而是会通知Broker自己下线了,然后Broker会通知其余的Consumer进行Rebalance。
Rebalance 核心逻辑主要在 client 侧。首先,Consumer 从 Broker 拉取自己订阅的所有 Topic 和 ConsumerGroup 下的消费者信息。然后会对 Topic 中 MessageQueue 和 消费者ID 进行排序,然后用消息队列默认分配算法来进行分配。排序是关键步骤,因为 consumer 之间并不通信,通过排序这样可以保证每一个 consumer 的视图一致,然后通过相同的分配算法,就能达到分配结果的一致。
常见的分配算法:
- AllocateMessageQueueAveragely平均算法,默认策略
- AllocateMessageQueueAveragelyByCircle:环形平均算法。
- AllocateMessageQueueByConfig:根据配置负载均衡算法,根据配置,为每一个消费者配置固定的消息队列。
- AllocateMessageQueueByMachineRoom:根据机房负载均衡算法。
- AllocateMessageQueueConsistentHash:一致性哈希负载均衡算法。
举个栗子,假设队列大小是8(编号0-7),消费者数量3(编号0-2),分配结果就是 AllocateMessageQueueAveragely的结果:
消费者0:队列0,1,2;
消费者1:队列3,4,5;
消费者2:队列6,7。
AllocateMessageQueueAveragelyByCircle的结果:
消费者0:队列0,3,6;
消费者1:队列1,4,7;
消费者2:队列2,5。
消息的顺序性
我们可以通过指定key的方式让相关的消息都存到一个MessageQueue中,并且这个MessageQueue中的消息一定是有序的。消费者取出来的顺序也一定是有序的,但消费者可能会有多个线程并发处理消息,因为单线程的吞吐量太低。多线程并发时可能造成乱序。
解决:
一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。即队列数量设置为1,且单线程消费
写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。
- 通过指定key的方式保证相关的消息都会被添加到一条队列中,但此时可能有多个消费者去队列中取消息(不是同时取),虽然能保证消息出队顺序的FIFO,但可能因为网络等原因,消费者消费的顺序不同。 解决:监听MessageListenerOrderly类,表示顺序消费,起到的作用是:采用分段锁的思想,每个consumer在拉取对应messageQueue的消息的时候,都要实现获取对应的锁,保证了同一个consumerQueue的消息不能被并发消费,锁住的不是Broker而是某一个Queue。
消息幂等性
需要根据具体的场景具体分析:
比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,则update
比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,就处理,然后这个 id 写 Redis。如果消费过了,那就别处理了,保证别重复处理相同的消息即可。
比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。
还有一些大数据日志场景允许一定的重复消费。
Broker
RocketMQ的Broker采用文件系统来存储消息。
复制和刷盘
复制是指Broker与Broker之间的数据同步方式,分为同步和异步两种。
同步复制时,生产者会等待同步复制成功后,才返回生产者消息发送成功。
异步复制时,消息写入Master Broker后即为写入成功,此时系统有较低的写入延迟和较大的系统吞吐量。
刷盘是指数据发送到Broker的内存(通常指PageCache)后,以何种方式持久化到磁盘。
同步刷盘时,生产者会等待数据持久化到磁盘后,才返回生产者消息发送成功,可靠性极强。
异步刷盘时,消息写入PageCache即为写入成功,到达一定量时自动触发刷盘。此时系统有非常低的写入延迟和非常大的系统吞吐量。
存储
RocketMQ 主要存储的文件包括Comitlog文件、ConsumeQueue 文件、IndexFile文件。
CommitLog:存储消息的元数据。RocketMQ将所有主题的消息存储在同一个文件中,确保消息发送时顺序写文件,尽最大能力确保消息发送的高性能与高吞吐量。每个文件大小一般是1GB,可以通过mapedFileSizeCommitLog进行配置。
ConsumerQueue:存储消息在CommitLog的索引。由于消息中间件一般是基于消息主题的订阅机制,这样便给按照消息主题检索消息带来了极大的不便。为了提高消息消费的效率,RocketMQ引入了ConsumeQueue消息队列文件,每个消息主题包含多个消息消费队列,每一个消息队列有一个消息文件。每个消费队列其实是commitlog的一个索引,提供给消费者做拉取消息、更新位点使用。
IndexFile:IndexFile索引文件,其主要设计理念就是为了加速消息的检索性能,根据消息的属性快速从Commitlog文件中检索消息,提供了一种通过key或者时间区间来查询消息的方法。全部的文件都是按照消息key创建的Hash索引。文件名是用创建时的时间戳命名的。
高可用
主从同步
- 4.5之前:提供Master Broker 和 Slave Broker之间的数据同步功能,以及主从切换。 当master挂了, 写入服务就不可用了,但是 slave 服务仍然提供可读服务。
- 4.5之后:引入了Dleger,使用Raft共识算法, 在master故障后自动选举新leader。
消息丢失、重复、积压问题
消息丢失
一条MQ消息会经历三个阶段:
- 生产阶段:Producer生产消息,然后通过网络将消息传递给MQ Broker
- 存储阶段:存储消息到MQ磁盘上
- 消费阶段:Consumer从Broker拉取消息
三个阶段都可能丢失消息。
生产阶段
Procuder通过网络发送消息给Broker,当Broker收到后,会返回确认响应消息给Producer,发送方式分为同步发送和异步发送。同步发送指Producer发送消息后,需要收到接受方的响应才能发送下一条消息;异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback),对发送失败的情况进行补偿。
无论同步还是异步,都可能因为网络发送失败,需要设置合理的重试次数。
存储阶段
消息存储
消息的存储分为同步刷盘和异步刷盘两种方式。
默认情况下采取异步刷盘,消息到了Broker后会先保存在内存中,消息写入PageCache即为写入成功,到达一定量时自动触发刷盘。此时系统有非常低的写入延迟和非常大的系统吞吐量。但如果出现机器宕机的情况,消息未及时刷盘,会出现消息丢失。
同步刷盘时,生产者会等待数据持久化到磁盘后,才返回生产者消息发送成功,可靠性极强。
消息复制
集群部署时还会采用一主多从架构,为了保证消息不丢失,还需要复制到slave节点,消息复制也分为同步复制和异步复制。
默认情况下采取异步复制,消息写入master就算成功,可以返回确认给生产者,接着消息异步复制到slave节点。若此时master节点宕机且不可恢复,则未复制到slave的消息会丢失。
可以采用同步复制的方式来提高消息的可靠性,master 节点将会同步等待 slave 节点复制完成,才会返回确认响应。
消费阶段
消费者从 broker 拉取消息,然后执行相应的业务逻辑。一旦执行成功,将会返回SUCCESS给 Broker。如果 Broker 未收到消费确认响应或收到其他状态,消费者下次还会再次拉取到该条消息,进行重试。这样的方式有效避免了消费者消费过程发生异常,或者消息在网络传输中丢失的情况。但可能会造成重复消费的情况,需要自己做幂等。
消息重复 todo(出现的原因分析)
消费端业务自己保证幂等性。
消息积压
解决方案:
全部丢弃
如果这些消息允许丢失,那么此时可以紧急修改消费者系统的代码,在代码里对所有的消息都获取到就直接丢弃,不做任何的处理,这样可以迅速的让积压在MQ里的百万消息被处理掉。
排查消费者系统问题
修复consumer,恢复消费速度
扩容消费者
新建一个topic,partition设为原来10倍,写一个消费程序读取积压数据,分发到新建的partition中,然后部署10倍的consumer机器消费数据。
消息正常后恢复原来的架构。
MQ快写满时,丢弃消息,快速消费所有的消息,后续进行补数据。
解决思路:
- 提高消费并行度
- 批量消费
- 跳过非重要消息
- 优化每条消息的消费过程
RocketMQ高可用
消息消费高可用
当master不可用时,Consumer会自动切换到从slave读。
对于顺序消息,当Consumer消费消息失败后,RocketMQ会不断进行消息重试,此时后续消息会被阻塞。
RocketMQ默认每条消息会被重试16次,超过16次则不再重试,会将消息放到死信队列。
消息发送高可用
NameServer检测Broker有延迟,NameServer 为了简化和客户端通信,发现 Broker 故障时并不会立即通知客户端。
消息发送失败重试
在消息发送出现异常时会尝试再次发送,默认最多重试三次。重试机制仅支持同步发送方式,不支持异步和单向发送方式。
多partition
创建Topic时,可以把Topic下的多个partition分散到多个Broker上,这样当一个Broker不可用时不会影响其他的partition。
主从切换
master不可用时,自动进行主从切换
消息主从复制
同步刷盘、异步刷盘
同步复制、异步复制
Rebalance
Rebalance用于Consumer Group下的消费者达成一致来分配Queue。当Consumer订阅的Topic发生变化或者Consumer实例发生变化后会触发Rebalance来重新分配每个消费者实例对应的Queue。
轮询实例订阅的所有Topic
基于topic调用rebalanceByTopic执行rebalance
分为广播模式和集群模式。
广播模式
获取该topic下的所有Queue,每个客户端都能收到topic下的所有Queue,为客户端分配的Queue集合为全量的集合。
集群模式
获取topic下的所有Queue;从broker获取该topic下所有客户端id列表;排序后调用AllocateMessageAueueStrategy获得ConsumerGroup下该客户端应该分配到的Queue集合,默认平均分配。即集群模式下,每个客户端分到的Queue列表由AllocateMessageQueueStrategy来分配。
默认平均策略执行时,会把Queue列表和客户端id进行排序,分配时排在前面的客户端能分到Queue的会多一点。因而在初始化时,最好保证ConsumerGroup下的客户端数量<=Topic下的Queue数量。
获取该客户端所属的Queue集合后,调用updateProcessQueueTableInRebalance更新,更新当前客户端处理的Queue以及对应的消费进度。
调用truncateMessageQueueNotMyTopic移除缓存中不是该实例处理的Queue
上面的Rebalance都是客户端自己定时(默认20s)执行的,还可以Broker进行主动通知。Broker有一个ConsumerManager,当客户端实例发生变更时(上下线)会通知到各个客户端执行Rebalance。
RocketMQ和Kafka Rebalance对比:
- Kafka:会在消费者组的多个消费者实例中,选出一个作为Group Leader,由这个Group Leader来进行分区分配,分配结果通过Cordinator(特殊角色的broker)同步给其他消费者。相当于Kafka的分区分配只有一个大脑,就是Group Leader。
- RocketMQ:每个消费者,自己负责给自己分配队列,相当于每个消费者都是一个大脑。
Rebalance缺陷:
消费暂停
新增Consumer后出发Rebalance需要分配Queue给新Consumer,在此期间分配的Consumer无法被消费。
重复消费
新增的Consumer拿到分配的Queue后,必须从之前的Consumer已经消费过的offset继续消费,但offset的提交默认是异步的,当offset未提交时发生了Rebalance,就会导致重复消费。
消费突刺
rebalance导致重复消费,重复消费的消息过多,或者rebalance暂停时间过长导致积压消息,造成消费的突刺。