尝试以下解决方案:
1. ConsumerFactory<xxx, xxx>
确定在自己的kafkaConfig.java(不一定要叫这个名字,但是是有@Configuration和@EnableKafka的一个config kafka配置的文件)定义的consumerFactory里,有没有配置ConsumerFactory<String, String> consumerFactory这样一个bean(因为我需要的ConsumerRecords是String类型,定义成别的应该也可以):
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.getContainerProperties.setPollTimeout(3000);
// can also add:
// factory.setBatchListener(true);
// factory.getContainerProperties().setAckMode()
// factory.setAutoStartup();
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// config bootstrap servers, deserializer, groupId, SASL Kafka authentication/keytab
...
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "xxx");
props.put(saslconfigs.SASL_KERBEROS_SERVICE_NAME, "xxx");
props.put(saslconfigs.SASL_JAAS_CONFIG, "xxx");
...
return props;
}
这样就可以在有consumer.poll()的那个java文件中按如下方式Inject:
@Inject
private ConsumerFactory<String, String> consumerFactory;
而不是:
@Inject
private ConsumerFactory consumerFactory;
2. Add ConsumerRebalanceListener - not work
Consumer<String, String> consumer = consumerFactory.createConsumer(groupId, hostname);
consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
@Override
...
})
3. Set longer timeout in consumer.poll()
4. Change auto-set-offset
change auto-set offset to “earliest” in KafkaConfig.java
4.1 auto-set-offset的作用是什么?
- 结合我试验几次,以及看到别人说的,这里auto-set-offset的作用是,当你人为输入的offset out of range, (比如我有几次报了这样的Log: “offset 0 out of range, offset reset to xxx”, 这里的xxx就是所设置的”earliest“,可能是20000,可能是2000000…) ,就会reset到你定义的auto-set-offset, 取决于你定义的是”earliest“,’latest”,还是其它的什么。
- 有人说,以下两种情况都会自动reset:
- offset out of range
- 人为选取的offset没有消息
- 但是!!!我用的是consumer.seek(TopicPartition, startingOffset),试验后发现没有消息的时候并不会重置到auto-set-offset,只有在out of range的之后才自动重置了。但是auto-set-offset的确应该是有消息的第一个offset(在“earliest”的设置下)。
5. Tried @KafkaListener separately to check - not work, no messages received
6. Change subscribe to assign, the use seek - Works!
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, offset); // can set different offset to seek
感觉问题是因为subscribe像是订阅新消息的发布,而seek可以找到任意offset的历史消息。@KafkaListener收不到感觉也是因为这个,没有新消息进队列。