淘先锋技术网

首页 1 2 3 4 5 6 7

尝试以下解决方案:

1. ConsumerFactory<xxx, xxx>

确定在自己的kafkaConfig.java(不一定要叫这个名字,但是是有@Configuration和@EnableKafka的一个config kafka配置的文件)定义的consumerFactory里,有没有配置ConsumerFactory<String, String> consumerFactory这样一个bean(因为我需要的ConsumerRecords是String类型,定义成别的应该也可以):

@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
	ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
	factory.setConsumerFactory(consumerFactory());
	factory.setConcurrency(1);
	factory.getContainerProperties.setPollTimeout(3000);
	// can also add:
	// factory.setBatchListener(true);
	// factory.getContainerProperties().setAckMode()
	// factory.setAutoStartup();
	return factory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
	return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
	Map<String, Object> props = new HashMap<>();
 	// config bootstrap servers, deserializer, groupId, SASL Kafka authentication/keytab
	...
	props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "xxx");
	props.put(saslconfigs.SASL_KERBEROS_SERVICE_NAME, "xxx");
	props.put(saslconfigs.SASL_JAAS_CONFIG, "xxx");
	...
	return props;
}

这样就可以在有consumer.poll()的那个java文件中按如下方式Inject:

@Inject
private ConsumerFactory<String, String> consumerFactory;

而不是:

@Inject
private ConsumerFactory consumerFactory;

2. Add ConsumerRebalanceListener - not work

Consumer<String, String> consumer = consumerFactory.createConsumer(groupId, hostname);
consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
	@Override
	...
})

3. Set longer timeout in consumer.poll()

4. Change auto-set-offset

change auto-set offset to “earliest” in KafkaConfig.java

4.1 auto-set-offset的作用是什么?

  • 结合我试验几次,以及看到别人说的,这里auto-set-offset的作用是,当你人为输入的offset out of range, (比如我有几次报了这样的Log: “offset 0 out of range, offset reset to xxx”, 这里的xxx就是所设置的”earliest“,可能是20000,可能是2000000…) ,就会reset到你定义的auto-set-offset, 取决于你定义的是”earliest“,’latest”,还是其它的什么。
  • 有人说,以下两种情况都会自动reset:
  1. offset out of range
  2. 人为选取的offset没有消息
  • 但是!!!我用的是consumer.seek(TopicPartition, startingOffset),试验后发现没有消息的时候并不会重置到auto-set-offset,只有在out of range的之后才自动重置了。但是auto-set-offset的确应该是有消息的第一个offset(在“earliest”的设置下)。

5. Tried @KafkaListener separately to check - not work, no messages received

6. Change subscribe to assign, the use seek - Works!

TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, offset); // can set different offset to seek

感觉问题是因为subscribe像是订阅新消息的发布,而seek可以找到任意offset的历史消息。@KafkaListener收不到感觉也是因为这个,没有新消息进队列。