消费者
基本概念
- 消费者:消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。
- 消费组:每个消费者都有一个对应的消费者组。也只属于一个消费组。
如果所有消费者都属于同一个消费组,那么所有的消息都会被均衡地投递到每一个消费者,每一个消息只会被一个消费者处理。如果所有的消费者都属于不同消费组,那么消息就会被广播给所有的消费者。
当某个主题中分区数小于小于消费者个数,那么就会出现有的消费者不能接收到消息。
代码
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
public class MyConsumer1 {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
Map<String,Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"ip:port");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerDeserializer");
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
// 配置一次拉取最小拉取数量。默认1B
configs.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,1);
// 默认最大量。单位B
configs.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,500000);
// 最大等待毫秒
configs.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,500);
// 每个分区最大返回数据量,单位B
configs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,500000);
// 一次拉取最大记录数
configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500);
// 多久关闭闲置的连接
configs.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG,500000);
// 是否允许通过pattern方式订阅,true-不允许,false-允许
configs.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG,false);
// 设置Socket接收消息缓冲区的大小,单位B,-1则为操作系统默认值
configs.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG,65565);
// 发送消息缓冲区大小
configs.put(ConsumerConfig.SEND_BUFFER_CONFIG,65565);
// 等待请求响应的最长时间
configs.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,60000);
// 元数据过期时间。单位ms
configs.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG,60000);
// 重连等待时间
configs.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG,30);
// 重新发送失败的请求到指定主题分区之前的等待时间
configs.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG,30);
// 事务隔离级别 read_uncommitted read_committed
configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_uncommitted");
// 消费者组名字
configs.put(ConsumerConfig.GROUP_ID_CONFIG,"consumer.demo");
// 手动进行位移提交
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
KafkaConsumer<Integer,String> consumer = new KafkaConsumer<Integer, String>(configs);
// 通过pattern的模式订阅主题
final Pattern pattern = Pattern.compile("topic_[0-9]");
// 通过集合的方式订阅多个主题
final List<String> topics = Arrays.asList("topic_1");
// 消费者再均衡-分区的所属权从一个消费者转移到另一个消费者的行为。
consumer.subscribe(topics, new ConsumerRebalanceListener() {
// 再均衡开始之前和消费者停止读取之后被调用
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
collection.forEach(tp ->{
System.out.println("剥夺的分区:" + tp.partition());
});
}
// 重新分配分区之后和消费者开始读取消费之前被调用
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
collection.forEach(tp->{
System.out.println(tp.partition());
});
}
});
final ConsumerRecords<Integer,String> records = consumer.poll(Duration.ofSeconds(3));
final Iterable<ConsumerRecord<Integer,String>> topicIterable = records.records("topic_1");
topicIterable.forEach(record ->{
System.out.println("===============================");
System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));
System.out.println("消息的key:" + record.key());
System.out.println("消息的偏移量(当前消费到的位置):" + record.offset());
System.out.println("消息的分区号:" + record.partition());
System.out.println("消息的序列化key字节数:" + record.serializedKeySize());
System.out.println("消息的时间戳:" + record.timestamp());
System.out.println("消息的序列化value字节数:" + record.serializedValueSize());
System.out.println("消息的时间戳类型:" + record.timestampType());
System.out.println("消息的主题:" + record.topic());
System.out.println("消息的值:" + record.value());
});
// 同步提交的方式
consumer.commitSync();
}
}
订阅及取消订阅方式
public interface Consumer<K, V> extends Closeable {
// 按照主题的方式进行订阅
void subscribe(Collection<String> topics);
// 按照主题的方式进行订阅,并设置再均衡监听器
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
// 按照Pattern进行订阅,并设置再均衡监听器
void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
// 按照Pattern进行订阅
void subscribe(Pattern pattern);
// 用于指定订阅的主题及分区集合
void assign(Collection<TopicPartition> partitions);
// --- 取消订阅
void unsubscribe();
}
- 查询某个主题的分区信息
public interface Consumer<K, V> extends Closeable {
List<PartitionInfo> partitionsFor(String topic);
}
- 取消订阅的其他方式
consumer.subscribe(new ArrayList<String>());
consumer.assign(new ArrayList<TopicPartition>());
- 获取消费者所分配到的分区信息
位移提交
- 同步位移提交
- 带参数的同步位移提交
final ConsumerRecords<Integer,String> records = consumer.poll(Duration.ofSeconds(3));
final Iterable<ConsumerRecord<Integer,String>> topicIterable = records.records("topic_1");
topicIterable.forEach(record ->{
consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(),record.partition()),
new OffsetAndMetadata(record.offset()+1)));
});
- 按分区粒度同步提交消费位移
final ConsumerRecords<Integer,String> records = consumer.poll(Duration.ofSeconds(3));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<Integer, String>> consumerRecords = records.records(partition);
for (ConsumerRecord<Integer, String> consumerRecord : consumerRecords) {
}
consumer.commitSync(Collections.singletonMap(partition,
new OffsetAndMetadata(consumerRecords.get(consumerRecords.size()-1).offset())));
}
- 异步提交
拦截器
public interface ConsumerInterceptor<K, V> extends Configurable {
// 消息会在调用poll方法返回之前进行拦截处理
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
// 提交完消费者位移之后
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
/**
* This is called when interceptor is closed
*/
public void close();
}
- 配置拦截器类
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"类名");