RocketMQ原理学习

消息队列学习笔记–RocketMQ

消息队列的优缺点

优点

异步

对于实时性不是很高的业务,如积分增减、发送短信等非核心流程,可以放到消息队列中去,提升整个链路的响应时间。

削峰填谷

对于秒杀等短时间大流量场景,可以将请求短暂的在消息队列中堆积,平滑的消费请求。

解耦

MQ可以实现服务的高内聚、低耦合,通过发布订阅模型实现系统/模块间的解耦。

缺点

系统可用性降低

MQ相当于一个依赖组件,一旦出问题会导致系统不可用。

系统复杂度提高

引入MQ如何保证消息不重复消费?如何处理消息丢失?如何保证消息的顺序消费?

数据一致性问题

生产者消费者事务执行结果不一致造成的数据不一致问题。

消息队列选型

ActiveMQ、RabbitMQ、RocketMQ、Kafka对比:

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,比 RocketMQ、Kafka 低一个数量级 同 ActiveMQ 10 万级,支撑高吞吐 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic 数量对吞吐量的影响 topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性 ms 级 微秒级,这是 RabbitMQ 的一大特点,延迟最低 ms 级 延迟在 ms 级以内
可用性 高,基于主从架构实现高可用 同 ActiveMQ 非常高,分布式架构 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到 0 丢失 同 RocketMQ
功能支持 MQ 领域的功能极其完备 基于 erlang 开发,并发能力很强,性能极好,延时很低 MQ 功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

ActiveMQ:因为使用公司较少,没有经过大规模吞吐场景的考验,社区也不活跃,所以不做讨论。

RabbitMQ:是ActiveMQ的替代方案,能够保证数据不丢失,但吞吐量只能到万级别,相比于Kafka和RocketMQ低了一个数量级,但比Kafka多了很多的高级特性,如消息重试和死信队列,且写入延迟能达到微秒级,但使用Erlang作为开发语言,较为劝退。

Kafka:单机吞吐量能达到10万级别,写入延迟能到毫秒级别,但功能较为简单,只支持简单的MQ功能,主要运用在大数据实时计算和日志收集领域。

RocketMQ:单机吞吐量能到10w级别,低延迟、高性能、高可用,并且功能丰富,像延时消息、事务消息、消息回溯、死信队列等,广泛应用在订单、交易、计算、消息推送、binlog分发等场景。

主要通过RocketMQ来学习消息队列。

RocketMQ整体架构

img

RocketMQ主要分为四个组成部分:Producer、Broker、Consumer、Name Server。

其中,Producer负责生产消息,Consumer负责消费消息,Broker负责存储消息,NameServer负责注册发现与路由剔除。

Topic

Topic 是一种逻辑上的分区,是同一类消息的集合,每一个消息只能属于一个 Topic ,是RocketMQ进行消息订阅的基本单位。

每个 topic 会被分成很多 Messsage Queue ,和 Kafka 中的 Partition 概念一样,topic 的数据被分布在不同的 Message Queue 中。

在业务增长,消息量增大时,可以增大 topic 的 Message Queue,这样可以将压力分摊到更多的 broker 上。因为 Producer 可以发送消息的时候可以通过指定的算法,将消息均匀的发送到每个 Message Queue,这种算法可以是轮询、随机选择、Hash,也可以是自定义的。

NameServer

NameServer的作用是注册中心,类似于Zookeeper。每个NameServer节点间互相独立,没有任何信息交互,不存在任何的选主或主从切换,因此比Zookeeper更加轻量。每个NameServer节点都保存了所有的Broker元数据,并和Broker保持长连接心跳。

img

NameServer主要有两个功能:注册发现和路由剔除。

注册发现

Broker启动时会和NameServer所有的节点建立长连接,每隔30s发送一次心跳,包括BrokerId、Broker地址、Broker名称等Broker集群的信息和Topic的拓扑信息。然后生产者和消费者启动的时候任选一台 Name Server 机器拉取所需的 Topic 的路由信息缓存在本地内存中,之后每隔 30s 定时从远端拉取更新本地缓存。

路由剔除

NameServer定期监测Broker的心跳,一旦超过120s没有最新的心跳,则判断Broker失联,关闭Broker连接,剔除Broker信息,但不会主动通知Producer和Consumer,需要主动拉取才会更新,所以最长需要30s才能感知到Broker故障。

Producer

Producer负责生产消息,单个生产者与一台NameServer保持长链接,每隔30s从NameServer拉取topic配置信息,如果NameServer挂了,会自动连接另一台NameServer;单个生产者会和他关联的所有Broker保持长连接,并每隔30s发送心跳。

消息发送方式

可靠同步发送

Producer发送消息后,需要收到接受方的响应才能发送下一条消息。

可靠异步发送

异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。 MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务器响应即可返回,进行第二条消息发送。发送方通过回调接口接收服务器响应,并对响应结果进行处理。

单向发送

送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。

消息类型

普通消息

即正常的消息

延迟消息

延时消息在投递时,需要设置指定的延时级别,等到特定的时间间隔后消息才会被消费者消费。mq服务端,为每一个延迟级别单独设置一个定时器,定时(每隔1秒)拉取对应延迟级别的消费队列。

可以通过delayLevel设置延时级别,可以取以下值:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

  • level == 0,消息为非延迟消息

  • 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s

  • level > maxLevel,则level== maxLevel,例如level==20,延迟2h

事务消息

事务消息用于保证本地事务执行和MQ投递都能成功,保持一致性。

通过两阶段提交和状态定时回查来保证消息一定发送到Broker。

img

Producer发送Half Message,服务端响应ACK写入结果,根据发送结果执行本地事务(如果写入失败,Half Message不可见,不执行本地事务),再根据本地事务的执行情况发送Commit/Rollback请求。

对于处于Half Message pending状态的消息,服务端会定期回查,Producer收到消息会检查本地事务的执行情况,根据本地事务状态重新Commit或Rollback。

注意:Half Message并未真正进入topic的queue,而是存放在临时queue中,提交事务后才会真正转移到topic的queue中。

顺序消息

顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

  • 全局顺序 对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。 适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景。

    • 全局消息的实现方式:一个全局顺序 topic 只创建一个Message Queue
  • 分区顺序 对于指定的一个 Topic,所有消息根据 key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。 适用场景:性能要求高,以 key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

发送流程

  1. Producer在发送消息的时候,如果本地路由表中未缓存Topic的路由信息,则向NameServer发送获取路由的请求,更新本地路由表,并每隔30s从NameServer更新本地路由表
  2. Producer在拿到路由信息后,根据路由消息选择queue,发送消息

img

Consumer

Consumer负责消费消息,一个消费者组可以订阅多个Topic。Consumer启动后,就会通过定时任务不断地向RocketMQ集群中的所有Broker实例保持长连接,并每隔30s发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息),一旦连接断开,Broker会立即感知到,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费。

单个消费者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,消费者会自动连接下一个nameserver,并且每隔30s会从NameServer拉取topic信息更新。

消费模式

  • 集群消费 同一个消费组里的消费者“瓜分”所有消息来消费,同一个消息只会被一个消费者实例消费。因为集群模式的消费进度是保存在Broker端的,所以即使应用崩溃,消费进度也不会出错。

  • 广播消费 同一个消费组里的消费者会消费所有消息,即同一个消息会被每一个消费者实例消费。广播消费的消费进度保存在客户端机器的文件中。如果文件弄丢了,那么消费进度就丢失了,可能会导致部分消息没有消费。

拉取流程

对于任何一款消息中间件而言,消费者客户端一般有两种方式从消息中间件获取消息并消费:

(1)Push方式:由消息中间件主动地将消息推送给消费者;采用Push方式,实时性高,可以尽快的将消息发送给消费者进行消费。但是在消费者的处理消息的能力较弱的时候,而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常。

(2)Pull方式:由消费者客户端主动向消息中间件拉取消息;采用Pull方式,可控性好,如何设置Pull消息的频率需要重点去考虑。如果每次Pull的时间间隔比较久,会增加消息的延迟,若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能。

RocketMQ的消费方式都是基于拉模式拉取消息的,而且还采用了一种长轮询机制,来平衡上面Push/Pull方式的各自缺点:

Consumer发送pull请求,Broker端接受请求,如果发现队列里没有新消息,不立即返回,而是持有这个请求一段时间(通过设置超时时间来实现),在这段时间内轮询Broker队列内是否有新的消息,如果有新消息,就利用现有的连接返回消息给消费者;如果这段时间内没有新消息进入队列,则返回空。 这样消费消息的主动权既保留在Consumer端,也不会出现Broker积压大量消息后,短时间内推送给Consumer大量消息使Consumer因为性能问题出现消费不及时的情况。

消费进度保存

集群消费模式中,消费进度保存在broker 上,而广播模式中保存在客户端本地。

消费进度是消费者组之间互相隔离的,Broker 上通过 Topic + 消费者组名称作为 key,value 中分别记录每个 MessageQueue 对应该消费者组的消费偏移量 offset。

消费者侧会记录自己的消费进度到内存中的 OffsetTable,然后定时提交到 Broker 侧。

由于一批消息的消费次序不确定,可能下标大的消息先被消费结束,下标小的由于延时尚未被消费,此时消费者向 Broker 提交的 offset 应该是已被消费的最小下标,从而保证消息不被遗漏,但缺点在于可能重复消费消息。

高可用

  • 重试,死信 重试 Topic:如果由于各种意外导致消息消费失败,那么该消息会自动被保存到重试Topic中,格式为“%RETRY%消费者组”,在订阅的时候会自动订阅这个重试Topic。 进入重试队列的消息有16次重试机会,每次都会按照一定的时间间隔进行,只要正常消费或者重试消费中有一次消费成功,就算消费成功。

img

死信Topic:死信Topic名字格式为“%DLQ%消费者组名”。如果正常消费1次失败,重试16次失败,那么消息会被保存到死信Topic中,进入死信Topic的消息不能被再次消费。RocketMQ认为,如果17次机会都失败了,说明生产者发送消息的格式发生了变化,或者消费服务出现了问题,需要人工介入处理。

  • Rebalance

    Rebalance

    (重平衡)机制

    ,用于在发生Broker掉线、Topic扩容和缩容、消费者扩容和缩容等变化时,自动感知并调整自身消费,以尽量减少甚至避免消息没有被消费。 触发时机

    • Consumer启动时 启动之后会立马进行Rebalance
  • Consumer运行中会监听Broker发送过来的Rebalance消息,以及Consumer自身的定时任务(每隔20s)触发的Rebalance

  • Consumer停止运行时没有直接的调用Rebalance,而是会通知Broker自己下线了,然后Broker会通知其余的Consumer进行Rebalance。

Rebalance 核心逻辑主要在 client 侧。首先,Consumer 从 Broker 拉取自己订阅的所有 Topic 和 ConsumerGroup 下的消费者信息。然后会对 Topic 中 MessageQueue 和 消费者ID 进行排序,然后用消息队列默认分配算法来进行分配。排序是关键步骤,因为 consumer 之间并不通信,通过排序这样可以保证每一个 consumer 的视图一致,然后通过相同的分配算法,就能达到分配结果的一致。

常见的分配算法:

  • AllocateMessageQueueAveragely平均算法,默认策略
  • AllocateMessageQueueAveragelyByCircle:环形平均算法。
  • AllocateMessageQueueByConfig:根据配置负载均衡算法,根据配置,为每一个消费者配置固定的消息队列。
  • AllocateMessageQueueByMachineRoom:根据机房负载均衡算法。
  • AllocateMessageQueueConsistentHash:一致性哈希负载均衡算法。

举个栗子,假设队列大小是8(编号0-7),消费者数量3(编号0-2),分配结果就是 AllocateMessageQueueAveragely的结果:

消费者0:队列0,1,2;

消费者1:队列3,4,5;

消费者2:队列6,7。

AllocateMessageQueueAveragelyByCircle的结果:

消费者0:队列0,3,6;

消费者1:队列1,4,7;

消费者2:队列2,5。

消息的顺序性

我们可以通过指定key的方式让相关的消息都存到一个MessageQueue中,并且这个MessageQueue中的消息一定是有序的。消费者取出来的顺序也一定是有序的,但消费者可能会有多个线程并发处理消息,因为单线程的吞吐量太低。多线程并发时可能造成乱序。

解决:

  • 一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。即队列数量设置为1,且单线程消费

  • 写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。

img

  • 通过指定key的方式保证相关的消息都会被添加到一条队列中,但此时可能有多个消费者去队列中取消息(不是同时取),虽然能保证消息出队顺序的FIFO,但可能因为网络等原因,消费者消费的顺序不同。 解决:监听MessageListenerOrderly类,表示顺序消费,起到的作用是:采用分段锁的思想,每个consumer在拉取对应messageQueue的消息的时候,都要实现获取对应的锁,保证了同一个consumerQueue的消息不能被并发消费,锁住的不是Broker而是某一个Queue。

消息幂等性

需要根据具体的场景具体分析:

  • 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,则update

  • 比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。

  • 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,就处理,然后这个 id 写 Redis。如果消费过了,那就别处理了,保证别重复处理相同的消息即可。

  • 比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。

  • 还有一些大数据日志场景允许一定的重复消费。

Broker

RocketMQ的Broker采用文件系统来存储消息。

复制和刷盘

复制是指Broker与Broker之间的数据同步方式,分为同步和异步两种。

  • 同步复制时,生产者会等待同步复制成功后,才返回生产者消息发送成功。

  • 异步复制时,消息写入Master Broker后即为写入成功,此时系统有较低的写入延迟和较大的系统吞吐量。

刷盘是指数据发送到Broker的内存(通常指PageCache)后,以何种方式持久化到磁盘。

  • 同步刷盘时,生产者会等待数据持久化到磁盘后,才返回生产者消息发送成功,可靠性极强。

  • 异步刷盘时,消息写入PageCache即为写入成功,到达一定量时自动触发刷盘。此时系统有非常低的写入延迟和非常大的系统吞吐量。

存储

RocketMQ 主要存储的文件包括Comitlog文件、ConsumeQueue 文件、IndexFile文件。

  • CommitLog:存储消息的元数据。RocketMQ将所有主题的消息存储在同一个文件中,确保消息发送时顺序写文件,尽最大能力确保消息发送的高性能与高吞吐量。每个文件大小一般是1GB,可以通过mapedFileSizeCommitLog进行配置。

  • ConsumerQueue:存储消息在CommitLog的索引。由于消息中间件一般是基于消息主题的订阅机制,这样便给按照消息主题检索消息带来了极大的不便。为了提高消息消费的效率,RocketMQ引入了ConsumeQueue消息队列文件,每个消息主题包含多个消息消费队列,每一个消息队列有一个消息文件。每个消费队列其实是commitlog的一个索引,提供给消费者做拉取消息、更新位点使用。

  • IndexFile:IndexFile索引文件,其主要设计理念就是为了加速消息的检索性能,根据消息的属性快速从Commitlog文件中检索消息,提供了一种通过key或者时间区间来查询消息的方法。全部的文件都是按照消息key创建的Hash索引。文件名是用创建时的时间戳命名的。

高可用

主从同步

  • 4.5之前:提供Master Broker 和 Slave Broker之间的数据同步功能,以及主从切换。 当master挂了, 写入服务就不可用了,但是 slave 服务仍然提供可读服务。
  • 4.5之后:引入了Dleger,使用Raft共识算法, 在master故障后自动选举新leader。

消息丢失、重复、积压问题

消息丢失

一条MQ消息会经历三个阶段:

  • 生产阶段:Producer生产消息,然后通过网络将消息传递给MQ Broker
  • 存储阶段:存储消息到MQ磁盘上
  • 消费阶段:Consumer从Broker拉取消息

三个阶段都可能丢失消息。

  • 生产阶段

    Procuder通过网络发送消息给Broker,当Broker收到后,会返回确认响应消息给Producer,发送方式分为同步发送和异步发送。同步发送指Producer发送消息后,需要收到接受方的响应才能发送下一条消息;异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback),对发送失败的情况进行补偿。

    无论同步还是异步,都可能因为网络发送失败,需要设置合理的重试次数

    • 存储阶段

      • 消息存储

        消息的存储分为同步刷盘异步刷盘两种方式。

        默认情况下采取异步刷盘,消息到了Broker后会先保存在内存中,消息写入PageCache即为写入成功,到达一定量时自动触发刷盘。此时系统有非常低的写入延迟和非常大的系统吞吐量。但如果出现机器宕机的情况,消息未及时刷盘,会出现消息丢失。

        同步刷盘时,生产者会等待数据持久化到磁盘后,才返回生产者消息发送成功,可靠性极强。

      • 消息复制

        集群部署时还会采用一主多从架构,为了保证消息不丢失,还需要复制到slave节点,消息复制也分为同步复制异步复制

        默认情况下采取异步复制,消息写入master就算成功,可以返回确认给生产者,接着消息异步复制到slave节点。若此时master节点宕机且不可恢复,则未复制到slave的消息会丢失。

        可以采用同步复制的方式来提高消息的可靠性,master 节点将会同步等待 slave 节点复制完成,才会返回确认响应。

  • 消费阶段

    消费者从 broker 拉取消息,然后执行相应的业务逻辑。一旦执行成功,将会返回SUCCESS给 Broker。如果 Broker 未收到消费确认响应或收到其他状态,消费者下次还会再次拉取到该条消息,进行重试。这样的方式有效避免了消费者消费过程发生异常,或者消息在网络传输中丢失的情况。但可能会造成重复消费的情况,需要自己做幂等。

消息重复 todo(出现的原因分析)

消费端业务自己保证幂等性。

消息积压

解决方案:

  1. 全部丢弃

    如果这些消息允许丢失,那么此时可以紧急修改消费者系统的代码,在代码里对所有的消息都获取到就直接丢弃,不做任何的处理,这样可以迅速的让积压在MQ里的百万消息被处理掉。

  2. 排查消费者系统问题

    修复consumer,恢复消费速度

  3. 扩容消费者

    新建一个topic,partition设为原来10倍,写一个消费程序读取积压数据,分发到新建的partition中,然后部署10倍的consumer机器消费数据。

    消息正常后恢复原来的架构。

  4. MQ快写满时,丢弃消息,快速消费所有的消息,后续进行补数据。

解决思路:

  1. 提高消费并行度
  2. 批量消费
  3. 跳过非重要消息
  4. 优化每条消息的消费过程

RocketMQ高可用

消息消费高可用

当master不可用时,Consumer会自动切换到从slave读。

对于顺序消息,当Consumer消费消息失败后,RocketMQ会不断进行消息重试,此时后续消息会被阻塞。

RocketMQ默认每条消息会被重试16次,超过16次则不再重试,会将消息放到死信队列。

消息发送高可用

NameServer检测Broker有延迟,NameServer 为了简化和客户端通信,发现 Broker 故障时并不会立即通知客户端。

  1. 消息发送失败重试

    在消息发送出现异常时会尝试再次发送,默认最多重试三次。重试机制仅支持同步发送方式,不支持异步和单向发送方式。

  2. 多partition

    创建Topic时,可以把Topic下的多个partition分散到多个Broker上,这样当一个Broker不可用时不会影响其他的partition。

  3. 主从切换

    master不可用时,自动进行主从切换

消息主从复制

同步刷盘、异步刷盘

同步复制、异步复制

Rebalance

Rebalance用于Consumer Group下的消费者达成一致来分配Queue。当Consumer订阅的Topic发生变化或者Consumer实例发生变化后会触发Rebalance来重新分配每个消费者实例对应的Queue。

  1. 轮询实例订阅的所有Topic

  2. 基于topic调用rebalanceByTopic执行rebalance

  3. 分为广播模式和集群模式。

    1. 广播模式

      获取该topic下的所有Queue,每个客户端都能收到topic下的所有Queue,为客户端分配的Queue集合为全量的集合。

    2. 集群模式

      获取topic下的所有Queue;从broker获取该topic下所有客户端id列表;排序后调用AllocateMessageAueueStrategy获得ConsumerGroup下该客户端应该分配到的Queue集合,默认平均分配。即集群模式下,每个客户端分到的Queue列表由AllocateMessageQueueStrategy来分配。

      默认平均策略执行时,会把Queue列表和客户端id进行排序,分配时排在前面的客户端能分到Queue的会多一点。因而在初始化时,最好保证ConsumerGroup下的客户端数量<=Topic下的Queue数量。

  4. 获取该客户端所属的Queue集合后,调用updateProcessQueueTableInRebalance更新,更新当前客户端处理的Queue以及对应的消费进度。

  5. 调用truncateMessageQueueNotMyTopic移除缓存中不是该实例处理的Queue

上面的Rebalance都是客户端自己定时(默认20s)执行的,还可以Broker进行主动通知。Broker有一个ConsumerManager,当客户端实例发生变更时(上下线)会通知到各个客户端执行Rebalance。

RocketMQ和Kafka Rebalance对比:

  1. Kafka:会在消费者组的多个消费者实例中,选出一个作为Group Leader,由这个Group Leader来进行分区分配,分配结果通过Cordinator(特殊角色的broker)同步给其他消费者。相当于Kafka的分区分配只有一个大脑,就是Group Leader。
  2. RocketMQ:每个消费者,自己负责给自己分配队列,相当于每个消费者都是一个大脑。

Rebalance缺陷:

  1. 消费暂停

    新增Consumer后出发Rebalance需要分配Queue给新Consumer,在此期间分配的Consumer无法被消费。

  2. 重复消费

    新增的Consumer拿到分配的Queue后,必须从之前的Consumer已经消费过的offset继续消费,但offset的提交默认是异步的,当offset未提交时发生了Rebalance,就会导致重复消费。

  3. 消费突刺

    rebalance导致重复消费,重复消费的消息过多,或者rebalance暂停时间过长导致积压消息,造成消费的突刺。

-------------本文结束感谢您的阅读-------------