淘先锋技术网

首页 1 2 3 4 5 6 7

消费者

基本概念

  • 消费者:消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。
  • 消费组:每个消费者都有一个对应的消费者组。也只属于一个消费组。

如果所有消费者都属于同一个消费组,那么所有的消息都会被均衡地投递到每一个消费者,每一个消息只会被一个消费者处理。如果所有的消费者都属于不同消费组,那么消息就会被广播给所有的消费者。

当某个主题中分区数小于小于消费者个数,那么就会出现有的消费者不能接收到消息。

代码

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;

public class MyConsumer1 {
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        Map<String,Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"ip:port");
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerDeserializer");
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        // 配置一次拉取最小拉取数量。默认1B
        configs.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,1);
        // 默认最大量。单位B
        configs.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,500000);
        // 最大等待毫秒
        configs.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,500);
        // 每个分区最大返回数据量,单位B
        configs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,500000);
        // 一次拉取最大记录数
        configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500);
        // 多久关闭闲置的连接
        configs.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG,500000);
        // 是否允许通过pattern方式订阅,true-不允许,false-允许
        configs.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG,false);
        // 设置Socket接收消息缓冲区的大小,单位B,-1则为操作系统默认值
        configs.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG,65565);
        // 发送消息缓冲区大小
        configs.put(ConsumerConfig.SEND_BUFFER_CONFIG,65565);
        // 等待请求响应的最长时间
        configs.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,60000);
        // 元数据过期时间。单位ms
        configs.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG,60000);
        // 重连等待时间
        configs.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG,30);
        // 重新发送失败的请求到指定主题分区之前的等待时间
        configs.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG,30);
        // 事务隔离级别  read_uncommitted  read_committed
        configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_uncommitted");


        // 消费者组名字
        configs.put(ConsumerConfig.GROUP_ID_CONFIG,"consumer.demo");
        // 手动进行位移提交
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        KafkaConsumer<Integer,String> consumer = new KafkaConsumer<Integer, String>(configs);
        // 通过pattern的模式订阅主题
        final Pattern pattern = Pattern.compile("topic_[0-9]");
        // 通过集合的方式订阅多个主题
        final List<String> topics = Arrays.asList("topic_1");
        // 消费者再均衡-分区的所属权从一个消费者转移到另一个消费者的行为。
        consumer.subscribe(topics, new ConsumerRebalanceListener() {
            // 再均衡开始之前和消费者停止读取之后被调用
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                collection.forEach(tp ->{
                    System.out.println("剥夺的分区:" + tp.partition());
                });
            }
            // 重新分配分区之后和消费者开始读取消费之前被调用
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                collection.forEach(tp->{
                    System.out.println(tp.partition());
                });
            }
        });
        final  ConsumerRecords<Integer,String> records = consumer.poll(Duration.ofSeconds(3));
        final Iterable<ConsumerRecord<Integer,String>> topicIterable = records.records("topic_1");
        topicIterable.forEach(record ->{
            System.out.println("===============================");
            System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));
            System.out.println("消息的key:" + record.key());
            System.out.println("消息的偏移量(当前消费到的位置):" + record.offset());
            System.out.println("消息的分区号:" + record.partition());
            System.out.println("消息的序列化key字节数:" + record.serializedKeySize());
            System.out.println("消息的时间戳:" + record.timestamp());
            System.out.println("消息的序列化value字节数:" + record.serializedValueSize());
            System.out.println("消息的时间戳类型:" + record.timestampType());
            System.out.println("消息的主题:" + record.topic());
            System.out.println("消息的值:" + record.value());
        });
        // 同步提交的方式
        consumer.commitSync();
    }
}

订阅及取消订阅方式

public interface Consumer<K, V> extends Closeable {
    // 按照主题的方式进行订阅
    void subscribe(Collection<String> topics);

    // 按照主题的方式进行订阅,并设置再均衡监听器
    void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
    // 按照Pattern进行订阅,并设置再均衡监听器
    void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
    // 按照Pattern进行订阅
    void subscribe(Pattern pattern);
    // 用于指定订阅的主题及分区集合
    void assign(Collection<TopicPartition> partitions);
    // --- 取消订阅
    void unsubscribe();
}
  • 查询某个主题的分区信息
public interface Consumer<K, V> extends Closeable {
    List<PartitionInfo> partitionsFor(String topic);
}
  • 取消订阅的其他方式
consumer.subscribe(new ArrayList<String>());
consumer.assign(new ArrayList<TopicPartition>());
  • 获取消费者所分配到的分区信息

位移提交

  • 同步位移提交
  • 带参数的同步位移提交
final  ConsumerRecords<Integer,String> records = consumer.poll(Duration.ofSeconds(3));
final Iterable<ConsumerRecord<Integer,String>> topicIterable = records.records("topic_1");
topicIterable.forEach(record ->{
    consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(),record.partition()),
                                     new OffsetAndMetadata(record.offset()+1)));
});
  • 按分区粒度同步提交消费位移
final  ConsumerRecords<Integer,String> records = consumer.poll(Duration.ofSeconds(3));
for (TopicPartition partition : records.partitions()) {
    List<ConsumerRecord<Integer, String>> consumerRecords = records.records(partition);
    for (ConsumerRecord<Integer, String> consumerRecord : consumerRecords) {

    }
    consumer.commitSync(Collections.singletonMap(partition,
	new OffsetAndMetadata(consumerRecords.get(consumerRecords.size()-1).offset())));
}
  • 异步提交

拦截器

public interface ConsumerInterceptor<K, V> extends Configurable {
	// 消息会在调用poll方法返回之前进行拦截处理
    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);

   	// 提交完消费者位移之后
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);

    /**
     * This is called when interceptor is closed
     */
    public void close();
}
  • 配置拦截器类
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"类名");