您的当前位置:首页正文

线上Kafka突发rebalance异常,如何快速解决?

2021-06-30 来源:意榕旅游网
线上Kafka突发rebalance异常,如何快速解决?

Kafka 是我们最常⽤的消息队列,它那⼏万、甚⾄⼏⼗万的处理速度让我们为之欣喜若狂。但是随着使⽤场景的增加,我们遇到的问题也越来越多,其中⼀个经常遇到的问题就是:rebalance(重平衡)问题。

#什么是消费组

要想了解 rebalance,那就得先了解消费组(consumer group)。

消费组指的是多个消费者(consumer)组成起来的⼀个组,它们共同消费 topic 的所有消息,并且⼀个 topic 的⼀个 partition 只能被⼀个 consumer 消费。

Kafka 为消费者组定义了 5 种状态,它们分别是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。

了解了这些状态的含义之后,我们来看⼀张图⽚,它展⽰了状态机的各个状态流转。

⼀个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置于 PreparingRebalance 状态等待成员加⼊,之后变更到CompletingRebalance 状态等待分配⽅案,最后流转到 Stable 状态完成重平衡。

当有新成员加⼊或已有成员退出时,消费者组的状态从 Stable 直接跳到 PreparingRebalance 状态,此时,所有现存成员就必须重新申请加⼊组。当所有成员都退出组后,消费者组状态变更为 Empty。Kafka 定期⾃动删除过期位移的条件就是,组要处于 Empty 状态。因此,如果你的消费者组停掉了很长时间(超过 7 天),那么 Kafka 很可能就把该组的位移数据删除了。我相信,你在 Kafka 的⽇志中⼀定经常看到下⾯这个输出:

Removed ✘✘✘ expired offsets in ✘✘✘ milliseconds.

这就是 Kafka 在尝试定期删除过期位移。现在你知道了,只有 Empty 状态下的组,才会执⾏过期位移删除的操作。

#什么是rebalance?

我们都知道 kafka 主要可以分为三⼤块:⽣产者、kafka broker、消费者。

⽽ kafka 怎么均匀地分配某个 topic 下的所有 partition 到各个消费者,从⽽使得消息的消费速度达到最快,这就是平衡(balance)。⽽rebalance(重平衡)其实就是重新进⾏ partition 的分配,从⽽使得 partition 的分配重新达到平衡状态。

#rebalance的流程

重平衡的完整流程需要消费者端和协调者组件共同参与才能完成。我们先从消费者的视⾓来审视⼀下重平衡的流程。

在消费者端,重平衡分为两个步骤:分别是加⼊组和等待领导消费者(Leader Consumer)分配⽅案。这两个步骤分别对应两类特定的请求:JoinGroup 请求和 SyncGroup 请求。

##JoinGroup请求

当组内成员加⼊组时,它会向协调者发送 JoinGroup 请求。在该请求中,每个成员都要将⾃⼰订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。⼀旦收集了全部成员的 JoinGroup 请求后,协调者会从这些成员中选择⼀个担任这个消费者组的领导者。

通常情况下,第⼀个发送 JoinGroup 请求的成员⾃动成为领导者。你⼀定要注意区分这⾥的领导者和之前我们介绍的领导者副本,它们不是⼀个概念。这⾥的领导者是具体的消费者实例,它既不是副本,也不是协调者。这⾥的领导者指的是消费组(consumer group)的领导者,消费组领导者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配⽅案。

选出领导者之后,协调者会把消费者组订阅信息封装进 JoinGroup 请求的响应体中,然后发给领导者,由领导者统⼀做出分配⽅案后,进⼊到下⼀步:发送 SyncGroup 请求。

##SyncGroup请求

在这⼀步中,领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配⽅案发给协调者。值得注意的是,其他成员也会向协调者发送SyncGroup 请求,只不过请求体中并没有实际的内容。这⼀步的主要⽬的是让协调者接收分配⽅案,然后统⼀以 SyncGroup 响应的⽅式分发给所有成员,这样组内所有成员就都知道⾃⼰该消费哪些分区了。

接下来,我⽤⼀张图来形象地说明⼀下 JoinGroup 请求的处理过程。

就像前⾯说的,JoinGroup 请求的主要作⽤是将组成员订阅信息发送给领导者消费者,待领导者制定好分配⽅案后,重平衡流程进⼊到SyncGroup 请求阶段。

下⾯这张图描述的是 SyncGroup 请求的处理流程。

SyncGroup 请求的主要⽬的,就是让协调者把领导者制定的分配⽅案下发给各个组内成员。当所有成员都成功接收到分配⽅案后,消费者组进⼊到 Stable 状态,即开始正常的消费⼯作。

#什么时候会发⽣rebalance?

前⾯我们已经说到,rebalance 其实就是对 partition 进⾏重新分配。那么什么时候会发⽣ rebalance 呢?其实在以下三种情况下,会触发 rebalance:

订阅 Topic 的分区数发⽣变化。订阅的 Topic 个数发⽣变化。

消费组内成员个数发⽣变化。例如有新的 consumer 实例加⼊该消费组或者离开组。

##订阅Topic的分区数发⽣变化

简单地说,就是之前 topic 有 10 个分区,现在变成了 20 个,那么多出来的 10 个分区的数据就没⼈消费了。那么此时就需要进⾏重平衡,将新增的 10 个分区分给消费组内的消费者进⾏消费。所以在这个情况下,会发⽣重平衡。

##订阅的Topic个数发⽣变化

简单地说,⼀个 consumer group 如果之前只订阅了 A topic,那么其组内的 consumer 知会消费 A topic 的消息。⽽如果现在新增订阅了 B topic,那么 kafka 就需要把 B topic 的 partition 分配给组内的 consumer 进⾏消费。这个分配的过程,其实也是⼀个 rebalance 的过程。

##消费组内成员个数发⽣变化

我们都知道 kafka 中是以消费组(consumer group)的⽅式进⾏消费的,消费组内的消费者共同消费⼀个 topic 下的消息。⽽当消费组内成员个数发⽣变化,例如某个 consumer 离开,或者新 consumer 加⼊,都会导致消费组内成员个数发⽣变化,从⽽导致重平衡。

相⽐起之前的两个情况,这种情况在实际情况中更加常见。因为订阅分区数、以及订阅 topic 数都是我们主动改变才会发⽣,⽽组内消

费组成员个数发⽣变化,则是更加随机的。

下⾯我们⼀起分析⼀下「消费组内成员个数发⽣变化」的⼏种情况:

新成员加⼊组成员主动离开组成员崩溃

###新成员加⼊

新成员⼊组是指组处于 Stable 状态后,有新成员加⼊。如果是全新启动⼀个消费者组,Kafka 是有⼀些⾃⼰的⼩优化的,流程上会有些许的不同。我们这⾥讨论的是,组稳定了之后有新成员加⼊的情形。

当协调者收到新的 JoinGroup 请求后,它会通过⼼跳请求响应的⽅式通知组内现有的所有成员,强制它们开启新⼀轮的重平衡。具体的过程和之前的客户端重平衡流程是⼀样的。现在,我⽤⼀张时序图来说明协调者⼀端是如何处理新成员⼊组的。

###组成员主动离开

何谓主动离组?就是指消费者实例所在线程或进程调⽤ close() ⽅法主动通知协调者它要退出。这个场景就涉及到了第三类请求:LeaveGroup 请求。协调者收到 LeaveGroup 请求后,依然会以⼼跳响应的⽅式通知其他成员,因此我就不再赘述了,还是直接⽤⼀张图来说明。

###组成员崩溃

崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,因为后者是主动发起的离组,协调者能马上感知并处理。但崩溃离组是被动的,协调者通常需要等待⼀段时间才能感知到,这段时间⼀般是由消费者端参数 session.timeout.ms 控制的。

也就是说,Kafka ⼀般不会超过 session.timeout.ms 就能感知到这个崩溃。当然,后⾯处理崩溃离组的流程与之前是⼀样的,我们来看看下⾯这张图。

###疑惑

在许多⽂章中,它们会加多了⼀个 rebalance 场景,即:「重平衡时协调者对组内成员提交位移的处理」。其实这个要说是 rebalance场景,有点牵强。我们先来了解下这个场景究竟是什么情况。

正常情况下,每个组内成员都会定期汇报位移给协调者。当重平衡开启时,协调者会给予成员⼀段缓冲时间,要求每个成员必须在这段时间内快速地上报⾃⼰的位移信息,然后再开启正常的 JoinGroup/SyncGroup 请求发送。还是⽼办法,我们使⽤⼀张图来说明。

所以这种场景是指 rebalance 发⽣之时,留有时间给消费者提交 offset,并不是引起 rebalance 的触发原因(并不是因为提交 offset 引发 rebalance)。因此在我这篇⽂章⾥,我并没有将其作为 rebalance 的⼀种场景。

#rebalance问题处理思路

前⾯我们讲过 rebalance ⼀般会有 3 种情况,分别是:

新成员加⼊组成员主动离开组成员崩溃

对于「新成员加⼊」、「组成员主动离开」都是我们主动触发的,能⽐较好地控制。但是「组成员崩溃」则是我们预料不到的,遇到问题的时候也⽐较不好排查。但对于「组成员崩溃」也是有⼀些通⽤的排查思路的,下⾯我们就来聊聊「rebalance问题的处理思路」。

要学会处理 rebalance 问题,我们需要先搞清楚 kafaka 消费者配置的四个参数:

session.timeout.ms 设置了超时时间heartbeat.interval.ms ⼼跳时间间隔

max.poll.interval.ms 每次消费的处理时间max.poll.records 每次消费的消息数

session.timeout.ms 表⽰ consumer 向 broker 发送⼼跳的超时时间。例如 session.timeout.ms = 180000 表⽰在最长 180 秒内 broker 没收到 consumer 的⼼跳,那么 broker 就认为该 consumer 死亡了,会启动 rebalance。

heartbeat.interval.ms 表⽰ consumer 每次向 broker 发送⼼跳的时间间隔。heartbeat.interval.ms = 60000 表⽰ consumer 每 60 秒向broker 发送⼀次⼼跳。⼀般来说,session.timeout.ms 的值是 heartbeat.interval.ms 值的 3 倍以上。

max.poll.interval.ms 表⽰ consumer 每两次 poll 消息的时间间隔。简单地说,其实就是 consumer 每次消费消息的时长。如果消息处理的逻辑很重,那么市场就要相应延长。否则如果时间到了 consumer 还么消费完,broker 会默认认为 consumer 死了,发起 rebalance。

max.poll.records 表⽰每次消费的时候,获取多少条消息。获取的消息条数越多,需要处理的时间越长。所以每次拉取的消息数不能太多,需要保证在 max.poll.interval.ms 设置的时间内能消费完,否则会发⽣ rebalance。

简单来说,会导致崩溃的⼏个点是:

消费者⼼跳超时,导致 rebalance。

消费者处理时间过长,导致 rebalance。

##消费者⼼跳超时

我们知道消费者是通过⼼跳和协调者保持通讯的,如果协调者收不到⼼跳,那么协调者会认为这个消费者死亡了,从⽽发起rebalance。

⽽ kafka 的消费者参数设置中,跟⼼跳相关的两个参数为:

session.timeout.ms 设置了超时时间heartbeat.interval.ms ⼼跳时间间隔

这时候需要调整 session.timeout.ms 和 heartbeat.interval.ms 参数,使得消费者与协调者能保持⼼跳。⼀般来说,超时时间应该是⼼跳间隔的 3 倍时间。即 session.timeout.ms 如果设置为 180 秒,那么 heartbeat.interval.ms 最多设置为 60 秒。

为什么要这么设置超时时间应该是⼼跳间隔的 3 倍时间?因为这样的话,在⼀个超时周期内就可以有多次⼼跳,避免⽹络问题导致偶发失败。

##消费者处理时间过长

如果消费者处理时间过长,那么同样会导致协调者认为该 consumer 死亡了,从⽽发起重平衡。⽽ kafka 的消费者参数设置中,跟消费处理的两个参数为:

max.poll.interval.ms 每次消费的处理时间max.poll.records 每次消费的消息数

对于这种情况,⼀般来说就是增加消费者处理的时间(即提⾼ max.poll.interval.ms 的值),减少每次处理的消息数(即减少

max.poll.records 的值)。

除此之外,超时时间参数(session.timeout.ms)与 消费者每次处理的时间(max.poll.interval.ms)也是有关联的。

max.poll.interval.ms 时间不能超过 session.timeout.ms 时间。 因为在 kafka 消费者的实现中,其是单线程去消费消息和执⾏⼼跳的,如果线程卡在处理消息,那么这时候即使到时间要⼼跳了,还是没有线程可以去执⾏⼼跳操作。很多同学在处理问题的时候,明明设置了很长的session.timeout.ms 时间,但最终还是⼼跳超时了,就是因为没有处理好这两个参数的关联。

对于 rebalance 类问题,简单总结就是:处理好⼼跳超时问题和消费处理超时问题。

对于⼼跳超时问题。⼀般是调⾼⼼跳超时时间(session.timeout.ms),调整超时时间(session.timeout.ms)和⼼跳间隔时间(heartbeat.interval.ms)的⽐例。阿⾥云官⽅⽂档建议超时时间(session.timeout.ms)设置成 25s,最长不超过 30s。那么⼼跳间隔时间(heartbeat.interval.ms)就不超过 10s。

对于消费处理超时问题。⼀般是增加消费者处理的时间(max.poll.interval.ms),减少每次处理的消息数

(max.poll.records)。阿⾥云官⽅⽂档建议 max.poll.records 参数要远⼩于当前消费组的消费能⼒(records < 单个线程每秒消费的条数 x 消费线程的个数 x session.timeout的秒数)。

#参考资料

因篇幅问题不能全部显示,请点此查看更多更全内容