1 关于offset
offset用于记录消息消费的进度,主要有以下几种,
-
Current offset,用于记录消费者已经接收到(不一定有完成消费)的消息序号,保证同一个消息不会被重复消费,可以我们通过kafka-consumer-groups.sh查询,这也是我们测试或者实际环境需要调整的offset
-
Committed offset,用于记录消费者已经确认消费消息的序号,消费者可以通过设置enable.auto.commit为true来定期(auto.commit.interval.ms)向kafka提交这个offset,好像没有地方可以查询
-
LogEndOffset,用于记录broker上生成的最新消息序号,与Current offset的差值即为消费的lag
[root@localhost kafka_2.13-3.3.1]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-group test-events 0 5 7 2 - - -
2 重置offset
在一些情况下,我们需要对current offset进行重置,比如lag太大,对于已产生的lag不想处理,就可以直接跳到最新的消息进行消费
[root@localhost kafka_2.13-3.3.1]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-group test-events 0 5 8 3 - - -
[root@localhost kafka_2.13-3.3.1]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group test-group --topic test-events --to-latest
WARN: No action will be performed as the --execute option is missing.In a future major release, the default behavior of this command will be to prompt the user before executing the reset rather than doing a dry run. You should add the --dry-run option explicitly if you are scripting this command and want to keep the current default behavior without prompting.
GROUP TOPIC PARTITION NEW-OFFSET
test-group test-events 0 8
[root@localhost kafka_2.13-3.3.1]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group test-group --topic test-events --to-latest --execute
GROUP TOPIC PARTITION NEW-OFFSET
test-group test-events 0 8
[root@localhost kafka_2.13-3.3.1]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-group test-events 0 8 8 0 - - -
3 删除offset
如果我们在某次版本变更中在同一个消费组中修改了topic的名字,那么原来的topic还会存在,但是并不会有人去消费消息,就会导致lag一直变大,产生误告警,此时就需要将该topic从该消费组中删除,
[root@localhost kafka_2.13-3.3.1]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-group test-events 0 8 8 0 - - -
test-group test-topic 0 0 3 3 - - -
[root@localhost kafka_2.13-3.3.1]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group test-group --topic test-topic
Request succeed for deleting offsets with topic test-topic group test-group
TOPIC PARTITION STATUS
test-topic 0 Successful
[root@localhost kafka_2.13-3.3.1]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-group test-events 0 8 8 0 - - -
参考文档:
- https://www.logicbig.com/tutorials/misc/kafka/committing-offsets.html