目录
1、环境准备
1)启动zk、kafka集群,并创建topic名为"test",分区数为3
2)导入pom依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.0.1</version>
</dependency>
2、生产者API
http://kafka.apache.org/0100/documentation.html#producerapi
2.1、创建生产者并推送消息
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class NewProducer {
public static void main(String[] args) {
Properties props = new Properties();
// Kafka服务端的主机名和端口号,定义其中一个broker即可
props.put("bootstrap.servers", "master1:9092");
// 等待所有副本节点的应答(应答级别)all等价于-1
props.put("acks", "all");
// props.put(ProducerConfig.ACKS_CONFIG, "all"); // 二者等价
// 消息发送最大尝试次数
props.put("retries", 0);
// 一批消息处理大小,16M
props.put("batch.size", 16384);
// 请求延时
props.put("linger.ms", 1);
// 发送缓存区内存大小(32M)
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建生产者对象
Producer<String, String> producer = new KafkaProducer<String, String>(props);
// 循环推送数据
for (int i = 0; i < 50; i++) {
producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hello world-" + i));
}
// 关闭资源
producer.close();
}
}
2.2、生产者与分区
kafka会根据producer推送的消息中的key值进行确定分区,默认采用org.apache.kafka.clients.producer.internals.DefaultPartitioner分区器
默认的分区策略是:
- 如果在发消息的时候指定了分区,则消息投递到指定的分区
- 如果没有指定分区,但是消息的key不为空,则基于key的哈希值来选择一个分区
- 如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
//若为提供key值
if (keyBytes == null) {
//counter是一个计数器,AtomicInteger counter = new AtomicInteger(new Random().nextInt());
//初始值是一个随机数,每接受一个消息递增1,达到轮询的效果
int nextValue = counter.getAndIncrement();
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
//通过递增1的计数器对可用的分区数取模得到分区号,轮询分区;toPositive获得绝对值
int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
//没有可用的分区时
return DefaultPartitioner.toPositive(nextValue) % numPartitions;
}
} else {
// 若指定了key,通过key的murmurhash值对分区数取余
return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
- murmur2
murmurhash2 是一种非加密型哈希函数,与普通的DJB hash相比,其随机分布特征表现更良好,冲突少,速度更快。也有文章声称,只有当key的长度大于10字节的时候,MurmurHash的运算速度才快于DJB。“从计算速度上来看,MurmurHash只适用于已知长度的、长度比较长的字符”。
在推送消息时可以指定分区号,ProducerRecord提供了4个参数的构造方法
public ProducerRecord(String topic, K key, V value) {
this(topic, null, null, key, value);
}
//指定分区号
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, null, key, value);
}
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null");
if (timestamp != null && timestamp < 0)
throw new IllegalArgumentException("Invalid timestamp " + timestamp);
this.topic = topic;
this.partition = partition;
this.key = key;
this.value = value;
this.timestamp = timestamp;
}
比如在上例中指定前25条消息写到分区0中,后25条写入到分区1中
for (int i = 0; i < 50; i++) {
if(i < 25) {
producer.send(new ProducerRecord<String, String>("test", 0,Integer.toString(i), "hello world-" + i));
}
producer.send(new ProducerRecord<String, String>("test", 1,Integer.toString(i), "hello world-" + i));
}
2.3、创建生产者带回调函数
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.Future;
public class CallBackNewProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "master1:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 50; i++) {
//通过send方法传递回调函数
producer.send(new ProducerRecord<String, String>("topic", "hello world-" + i), new Callback() {
@Override //重写onCompletion
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata != null) {
//打印每条消息分区、偏移量
System.out.println(metadata.partition() + "---" + metadata.offset());
}
}
});
}
producer.close();
}
}
2.4、自定义分区
package com.jod.kafka
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
//实现Partitioner接口
public class PartitionerProducer implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 分区逻辑...
return 0;
}
@Override
public void close() {}
}
然后再代码中通过properties对象指定分区器
//Properties props = new Properties();
props.put("partitioner.class", "com.jod.kafka.PartitionerProducer");
3、消费者API
http://kafka.apache.org/0100/documentation.html#consumerapi
3.1、创建消费者
官网提供的是高级API案例,即自动维护消费情况
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
public class NewConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "master1:9092");
// 指定consumer group
props.put("group.id", "group0");
/*如果想重复消费topic数据,有三种方式:
1、新建一个组。并添加以下属性,从最早的消息开始消费
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
2、使用低级API指定offset。
3、使用高级API在不换组的情况下重复消费topic数据。
consumer.assign(Collections.singletonList(new TopicPartition("second", 0)));
consumer.seek(new TopicPartition("test", 0), 2);
*/
// 是否自动确认offset
props.put("enable.auto.commit", "true");
// 自动确认offset的时间间隔,与上面的属性绑定使用
props.put("auto.commit.interval.ms", "1000");
// key的序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value的序列化类
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 定义consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// 指定消费者订阅的topic,可同时订阅多个
consumer.subscribe(Arrays.asList("test"));
while (true) {
// 读取数据,读取超时时间为100ms
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.topic() + "---" + record.partition() + "---" + record.offset() + "---" + record.value());
}
}
}
3.2、Producer拦截器(interceptor)
Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。
对于 producer 而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个 interceptor 按顺序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是 org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
1)configure(configs)
获取配置信息和初始化数据时调用。
2)onSend(ProducerRecord)
该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算。
3)onAcknowledgement(RecordMetadata, Exception)
该方法会在消息被应答或消息发送失败时调用,并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在 producer 的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率。
4)close:
关闭 interceptor,主要用于执行一些资源清理工作。
如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非再向上传递。这在使用过程中要特别留意。
案例:实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。
1)时间戳拦截器
package com.jod.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class TimeInterceptor implements ProducerInterceptor<String, String> {
public void configure(Map<String, ?> configs) {}
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 创建一个新的record,把时间戳写入消息体的最前部
return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
System.currentTimeMillis() + "," + record.value().toString());
}
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}
public void close() {}
}
2)计数器拦截器,并在producer关闭时打印这两个计数器
package com.jod.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class CounterInterceptor implements ProducerInterceptor<String, String> {
private int errorCounter = 0;
private int successCounter = 0;
public void configure(Map<String, ?> configs) {}
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 统计成功和失败的次数
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}
public void close() {
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
}
3)main方法中指定拦截器
// Properties props = new Properties();
// 构建拦截链列表
List<String> interceptors = new ArrayList<String>();
interceptors.add("com.jod.kafka.interceptor.TimeInterceptor");
interceptors.add("com.jod.kafka.interceptor.CounterInterceptor");
//指定拦截器
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
4、Flume 与 kafka 集成(Kafka sink)
Property Name | Default | Description |
---|---|---|
type | – | org.apache.flume.sink.kafka.KafkaSink |
brokerList | – | List of brokers Kafka-Sink will connect to, to get the list of topic partitions This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port |
topic | default-flume-topic | The topic in Kafka to which the messages will be published. If this parameter is configured, messages will be published to this topic. If the event header contains a “topic” field, the event will be published to that topic overriding the topic configured here. |
batchSize | 100 | How many messages to process in one batch. Larger batches improve throughput while adding latency. |
requiredAcks | 1 | How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) Set this to -1 to avoid data loss in some cases of leader failure. |
Other Kafka Producer Properties | – | These properties are used to configure the Kafka Producer. Any producer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefix kafka.. For example: kafka.producer.type |
通过netcat向kafka sink发送消息,可理解为flume作为生产者
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test
a1.sinks.k1.brokerList = master1:9092
a1.sinks.k1.requiredAcks = -1 #1,0,-1那个
a1.sinks.k1.batchSize = 20 #一批20个消息
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1