淘先锋技术网

首页 1 2 3 4 5 6 7

1 关于offset

offset用于记录消息消费的进度,主要有以下几种,

  • Current offset,用于记录消费者已经接收到(不一定有完成消费)的消息序号,保证同一个消息不会被重复消费,可以我们通过kafka-consumer-groups.sh查询,这也是我们测试或者实际环境需要调整的offset

  • Committed offset,用于记录消费者已经确认消费消息的序号,消费者可以通过设置enable.auto.commit为true来定期(auto.commit.interval.ms)向kafka提交这个offset,好像没有地方可以查询

  • LogEndOffset,用于记录broker上生成的最新消息序号,与Current offset的差值即为消费的lag

[root@localhost kafka_2.13-3.3.1]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-group      test-events     0          5               7               2               -               -               -

2 重置offset

在一些情况下,我们需要对current offset进行重置,比如lag太大,对于已产生的lag不想处理,就可以直接跳到最新的消息进行消费

[root@localhost kafka_2.13-3.3.1]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-group      test-events     0          5               8               3               -               -               -
[root@localhost kafka_2.13-3.3.1]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets  --group test-group --topic test-events --to-latest
WARN: No action will be performed as the --execute option is missing.In a future major release, the default behavior of this command will be to prompt the user before executing the reset rather than doing a dry run. You should add the --dry-run option explicitly if you are scripting this command and want to keep the current default behavior without prompting.

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
test-group                     test-events                    0          8
[root@localhost kafka_2.13-3.3.1]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets  --group test-group --topic test-events --to-latest --execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
test-group                     test-events                    0          8
[root@localhost kafka_2.13-3.3.1]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-group      test-events     0          8               8               0               -               -               -

3 删除offset

如果我们在某次版本变更中在同一个消费组中修改了topic的名字,那么原来的topic还会存在,但是并不会有人去消费消息,就会导致lag一直变大,产生误告警,此时就需要将该topic从该消费组中删除,

[root@localhost kafka_2.13-3.3.1]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-group      test-events     0          8               8               0               -               -               -
test-group      test-topic      0          0               3               3               -               -               -
[root@localhost kafka_2.13-3.3.1]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group test-group --topic test-topic
Request succeed for deleting offsets with topic test-topic group test-group

TOPIC                          PARTITION       STATUS
test-topic                     0               Successful
[root@localhost kafka_2.13-3.3.1]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-group      test-events     0          8               8               0               -               -               -

参考文档:

  1. https://www.logicbig.com/tutorials/misc/kafka/committing-offsets.html