淘先锋技术网

首页 1 2 3 4 5 6 7

一. Kafka消费者Consumer消费消息配置实战

配置:

 public static Properties getProperties() {
        Properties props = new Properties();
​
        //broker地址
        props.put("bootstrap.servers", "ip:9092");
​
        //消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
        props.put("group.id", "xdclass-g1");
​
        //开启自动提交offset
        props.put("enable.auto.commit", "true");
​
        //自动提交offset延迟时间
        props.put("auto.commit.interval.ms", "1000");
​
        //反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
​
        return props;
    }

消费订阅

@Test
    public void simpleConsumerTest(){
        Properties props = getProperties();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
​
        //订阅topic主题
        consumer.subscribe(Arrays.asList(KafkaProducerTest.TOPIC_NAME));
​
       while (true) {
            //拉取时间控制,阻塞超时时间
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
            for (ConsumerRecord<String, String> record : records) {
                System.err.printf("topic = %s, offset = %d, key = %s, value = %s%n",record.topic(), record.offset(), record.key(), record.value());
            }
        }
    }

二.Consumer从头消费配置和手工提交offset配置

如果需要从头消费partition消息,怎操作?

auto.offset.reset 配置策略即可
默认是latest,需要改为 earliest 且消费者组名变更 ,即可实现从头消费.

//默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更,才生效 props.put("auto.offset.reset","earliest");

自动提交offset问题

没法控制消息是否正常被消费
适合非严谨的场景,比如日志收集发送

手工提交offset配置和测试

初次启动消费者会请求broker获取当前消费的offset值

手工提交offset

同步 commitSync 阻塞当前线程 (自动失败重试)
异步 commitAsync 不会阻塞当前线程 (没有失败重试,回调callback函数获取提交信息,记录日志)