一、Reblance是什么
Reblance就像他的名称一样,意思是再平衡,平衡什么?平衡消费者和分区之间的对应关系。本质上来讲,Reblance是一种协议,规定了一个Consumer Group下所有Consumer如何达成一致,来分配订阅Topic的每个分区,尽量让每个消费者分配到相对均匀的分区,使Consumer的资源都能得到充分利用,防止有些Consumer比较忙,有的Consumer比较闲。
二、Reblance触发的时机
当kafka感知到存在让分区和消费者分配不均匀的事情发生时,就会触发Reblance,来保证分区和消费者再次平衡。那么那些事情会触发Reblance呢?或者说那些事情会导致分区和消费者分配不均匀呢?主要有三种:
- 消费者组消费的分区个数发生变化。
- 消费者组消费的主题个数发生变化。
- 消费者组内的消费者个数发生变化。
其实第2种情况,本质上是第1种情况的一个特例,消费组消费的主题个数发生变化,体现到消费组中的消费者身上,就是费配到消费者上的分区个数发生了变化。
三、Reblance的执行流程
理解Reblance的流程,需要先了解一下消费组协调者。
协调者(Coordinator)是broker进程中的一个组件,每一个broker都会有一个Coordinator,协调者的职责在于:服务ConsumerGroup,完成整个Reblance的过程,提供Consumer Group位移的管理,以及组内成员的管理。
这里有几个需要注意的问题:
1.Consumer Group如何知道哪个协调者是为自己服务的?
Consumer Group的位移保存在__consumer_offset主题的某个partition中,这个partition是 hash(group_id)%50(__consumer_offset主题有50个分区),然后这个partition的leader副本所在的broker上的coordintor就是该Consumer Group的协调者。
2.协调者如何管理组成员?
当Consumer启动的时候,会向协调者所在broker发送多种请求,其中包含一个Join Group的请求,收到请求后,协调者执行消费者组的注册,消费者元数据信息保存。同时组中的各个消费者,都会定时的向协调者发送心跳请求,让协调者了解每个消费者的状态信息。
当协调者发现某个消费者长时间没有发送心跳,那么协调者就会认为这个消费者挂了,把这个消费者从消费者组中剔除出去,然后在其他Consumer的心跳请求中回复 reblance_need,让每个消费者停止消息消费,并开启Reblance。
当各个消费者收到reblance_need响应后,都会停止消息消费,并向协调者发送 SyncGroup请求,来询问分配给自己的分区信息。
四、分区分配的策略有哪些
分配策略是指:当Reblance触发时,重新将分区分配给消费者的方式。常用的分区方式有三种:Range,RoundRobin 和 StickyAssignor。
为了方便描述分区分配的流程,我们假设以下场景:一个消费组中有3个消费者,分别为Consumer_1,Consumer_2和Consumer_3。消费7个分区,分别为partition_0-7。
- Range分配策略是指按照分区号的范围进行分区分配。
- RoundRobin分配策略是指轮训每个分区,将分区逐个分配给消费者
- StickyAssignor分区策略,又称为粘性分配,所谓的有粘性,是指每次 Rebalance 时,该策略会尽可能地保留之前的分配方案,尽量实现分区分配的最小变动。
例如实例Consumer1之前负责消费分区 0、1、2,那么 Rebalance 之后,如果可能的话,最好还是让实例 Consumer1 继续消费分区0、1、2,而不是被重新分配其他的分区。这样的话,实例 Consumer1 连接这些分区所在 Broker 的 TCP 连接就可以继续用,不用重新创建连接其他 Broker 的 Socket 资源。
不过这个分配策略要到 kafka 0.11版本才可以使用。
五、Reblance产生的影响
Reblance产生的影响主要有两个:
1.影响消费效率。
因为发生Reblance时,所有消费者都会停止消费,比较影响消息消费效率,当一个消费组中消费者比较多的时候,Reblance的过程会比较耗时。
2.可能会产生消息重复消费
因为Consumer消费分区消息的offset提交过程,不是实时的(以offset自动提交为例),由参数auto.commit.interval.ms控制提交的最小频率,默认是5000,也就是最少每5s提交一次。我们试想以下场景:提交位移之后的 3 秒发生了 Rebalance ,在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然可以通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。
很遗憾的是,目前kafka社区对于Reblance带来的影响,也没有彻底的解决办法。只能通过避免不必要的Reblance,来降低Reblance产生的影响。
六、减少Reblance的产生
有些时候,Reblance是不可避免的,比如在运维过程中,为了增加客户端的处理能力,需要增加partition个数或者consumer个数,那么不可避免的需要触发Reblance。
但是有些时候可能有些参数配置的问题,会导致一些不必要的Reblance的发生,这些Reblance的发生主要就是协调者错误的认为消费者实例挂了,然后触发Reblance。
有哪些参数配置异常,会导致协调者认为消费者实例挂了呢?
这个参数是协调者最长等待消费者没有发送心跳的时间间隔,如果协调者在该参数指定的时间内没有收到某个消费者的心跳请求,那么就认为该消费者挂了,就会将这个消费者从组里面剔除,然后触发Reblance。
同时参数heartbeat.interval.ms,表示消费者向协调者发送心跳请求的时间间隔,这个参数设置的过大,会导致消费者长时间不会向协调者发送心跳,同时协调者向消费者发送的 Reblance的消息也会不及时(协调者不是主从向消费者发送Reblance消息的,而是将Reblance消息封装到消费者心跳请求的响应消息中)。设置的小一些,消息会及时一些,但是,可能会消耗过多的带宽。
通常在生产环境中保证session.timeout.ms >= 3 * heartbeat.interval.ms。
消费者调用poll方法的时间间隔,如果时间间隔大于该参数的设置,会认为这个Consumer存在问题(消息处理效率低,消费者可能不健康),那么Consumer会向协调者发送 leaveGroup请求,自动退出消费者组,此时消费者数量发生变化,触发Reblance。
在业务中,该参数可以设置的长一些,但是也不要违背了该参数的初衷(kafka对消费者的一种优胜劣汰的优化机制,poll的慢导致消息堆积)。因为业务上的确可能存在一些耗时的操作,或者poll拉去的消息过多,导致消息处理的慢,进而导致poll方法消息拉去的时间间隔过长,对于这种情况可以调整每次拉去消息个条数,或者优化消息处理逻辑,加快消息处理效率。