淘先锋技术网

首页 1 2 3 4 5 6 7

__consumer_offsets详解

一般情况下,当集群中第一次有消费者消费消息时会自动创建主题consumer offsets,不过它的副本因子还受offsets..topic.replication.factor参数的约束,这个参数的默认值为3(下载安装的包中此值可能为l),分区数可以通过offsets.topic.num.partitions参数设置,默认为50。客户端提交消费位移是使用OffsetCommitRequest请求实现的,OffsetCommitRequest的结构如图所示。

在这里插入图片描述

请求体第一层中的group_id、generation_id和member_id在前面的内容中已经介绍过多次了,retention time表示当前提交的消费位移所能保留的时长,不过对于消费者而言这个值保持为-l。也就是说,按照broker端的配置offsets.retention.minutes来确定保留时长。offsets.retention.minutes的默认值为l0080,即7天,超过这个时间后消费位移的信息就会被删除(使用墓碑消息和日志压缩策略)。注意这个参数在2.0.0版本之前的
默认值为1440,即1天,很多关于消费位移的异常也是由这个参数的值配置不当造成的。有些定时消费的任务在执行完某次消费任务之后保存了消费位移,之后隔了一段时间再次执行消费任务,如果这个间隔时间超过offsets,retention.minutes的配置值,那么原先的位移信
息就会丢失,最后只能根据客户端参数auto.offset.reset来决定开始消费的位置,遇到这种情况时就需要根据实际情况来调配offsets.retention.minutes参数的值。

OffsetCommitRequest中的其余字段大抵也是按照分区的粒度来划分消费位移的:topic表示主题名称,partition表示分区编号等。OffsetAndMetadata中包含2个成员变量(offset和metadata),与此对应的有两个构造方法,详细如下:

public OffsetAndMetadata(long offset)
public OffsetAndMetadata(long offset, String metadata)

同消费组的元数据信息一样,最终提交的消费位移也会以消息的形式发送至主题consumer offsets,.与消费位移对应的消息也只定义了key和value字段的具体内容,它不依赖于具体版本的消息格式,以此做到与具体的消息格式无关。

图中展示了消费位移对应的消息内容格式,上面是消息的key,下面是消息的value。可以看到key和value中都包含了version字段,这个用来标识具体的key和value的版本信
息,不同的版本对应的内容格式可能并不相同。就目前版本而言,key和value的version值都为l。key中除了version字段还有group、topic、partition字段,分别表示消费组
的groupId、主题名称和分区编号。虽然key中包含了4个字段,但最终确定这条消息所要存储的分区还是根据单独的group字段来计算的,这样就可以保证消费位移信息与消费组对应的GroupCoordinator处于同一个broker节点上,省去了中间轮转的开销,这一点与消费组的元数据信息的存储是一样的。

value中包含了5个字段,除version字段外,其余的offset、metadata、commit_timestamp、expire timestamp字段分别表示消费位移、自定义的元数据信息、位移提交到Kafka的时间戳、消费位移被判定为超时的时间戳。其中offset和metadata与OffsetCommitRequest请求体中的offset和metadata对应,而expire timestamp和
OffsetCommitRequest请求体中的retention time也有关联,commit_timestamp值与offsets.retention.minutes参数值之和即为expire_timestamp(默认情况下)。

在这里插入图片描述

在处理完消费位移之后,Kafka返回OffsetCommitResponse给客户端,OffsetCommitResponse的结构如图所示。OffsetCommitResponse中各个域的具体含义可以通过前面内容中推断出来,这里就不再赘述了。

在这里插入图片描述