接下来我们来看下主题命令行操作
参数 描述
--bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号。
--topic <String: topic> 操作的 topic 名称。
--create 创建主题。
--delete 删除主题。
--alter 修改主题。
--list 查看所有主题。
--describe 查看主题详细描述。
--partitions <Integer: # of partitions> 设置分区数。
--replication-factor<Integer: replication factor> 设置分区副本。
--config <String: name=value> 更新系统默认的配置。
创建topic名称为zhuti,并且设置1个分区,3个副本
cd /opt/kafka/
bin/kafka-topics.sh --bootstrap-server linux1:9092 --create --partitions 1 --replication-factor 3 --topic zhuti
查看当前服务器所有的topic
cd /opt/kafka/
bin/kafka-topics.sh --bootstrap-server linux1:9092 --list
修改分区为3,注意 分区只能增加,不能减少
bin/kafka-topics.sh --bootstrap-server linux1:9092 --alter --topic zhuti --partitions 3
查看zhuti主题详情
bin/kafka-topics.sh --bootstrap-server linux1:9092 --describe --topic zhuti
删除zhuti主题
bin/kafka-topics.sh --bootstrap-server linux1:9092 --delete --topic zhuti
生产者命令行操作
生产者发送消息
bin/kafka-console-producer.sh --bootstrap-server linux1:9092 --topic zhuti
在下面输入内容
消费者命令行操作
消费当前的消息
bin/kafka-console-consumer.sh --bootstrap-server linux1:9092 --topic zhuti
消费者没有收到消息,这是因为当前使用的命令是只接收当前运行中发送的消息,历史消息不会接收
生产者再次发送一条消息
可以看到消费者接收到消息了
我们消费者使用下面的--from-beginning命令来接收历史的消息
bin/kafka-console-consumer.sh --bootstrap-server linux1:9092 --from-beginning --topic zhuti
可以看到历史数据收到了
至于使用历史的还是当前的,根据你的业务场景来使用;
kafka的同步发送消息,必须等待第一条响应完毕,才能发送第二条;
异步发送消息,不需要等待响应,可以批量发送消息;
接下来我们来看下异步发送
在C:\Windows\System32\drivers\etc这里配置hosts
192.168.1.12 linux2
192.168.1.13 linux3
192.168.1.11 linux1
如果不配置 ,启动生产者会报这个错误
java.net.UnknownHostException: linux3
打开springboot项目
添加pom.xml
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
创建生产者
package com.example.client.entity;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
//生产者
public class Producer {
public static void main(String[] args) {
//创建生产者的配置对象
Properties properties=new Properties();
//给kafka配置对象添加配置信息 bootstrap.serve
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux1:9092,linux2:9092,linux3:9092");
//key value序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//创建kafka生产者对象
KafkaProducer<String,String>kafkaProducer=new KafkaProducer<String, String>(properties);
//调用send方法 发送消息
for (int i = 0; i < 5; i++) {
//给zhuti这个主题 发送消息
ProducerRecord<String, String> var1=new ProducerRecord("zhuti","你好"+i);
kafkaProducer.send(var1);
}
//关闭资源
kafkaProducer.close();
}
}
启动之后,我们看一下消费者 接收到的信息
在来看下异步发送完成后的回调方法
package com.example.client.entity;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
//生产者
public class Producer {
public static void main(String[] args) {
//创建生产者的配置对象
Properties properties=new Properties();
//给kafka配置对象添加配置信息 bootstrap.serve
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux1:9092,linux2:9092,linux3:9092");
//key value序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//创建kafka生产者对象
KafkaProducer<String,String>kafkaProducer=new KafkaProducer<String, String>(properties);
//调用send方法 发送消息
for (int i = 0; i < 5; i++) {
//给zhuti这个主题 发送消息
ProducerRecord<String, String> var1=new ProducerRecord("zhuti","你好"+i);
kafkaProducer.send(var1, new Callback() {
//异步回调 完成时
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e==null){
System.out.println("主题:"+recordMetadata.topic()+",分区:"+recordMetadata.partition());
}else {
//打印异常信息
e.printStackTrace();
}
}
});
try {
//延迟一会看看发送到那个分区 如果不设置延迟,那么都会发送到一个分区中
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//关闭资源
kafkaProducer.close();
}
}
我们在来看下同步发送
就是在send方法后面加了一个.get()方法
接下来我们在看下生产者分区
比如有100T的数据,直接发送太多了,我们就把主题分成多个区,每个区发送一部分
合理的控制分区的任务,可以实现负载均衡的效果
还可以提高消费速度,多个消费者 消费不同的分区
在idea搜索DefaultPartitioner默认分区器
我们进入ProducerRecord的源码
如果使用了partition分区,那么按照设置的去走;
如果没有指定分区,但是设置了key,那么按照key的hash取余放入不同的分区;
如果没有指定key和分区,只写了value,那么会随机选择一个分区,一直使用,
等到该分区batch满了或者已完成,kafka在随机选择一个新的分区进行使用
我们来改造下,设置分区设置为1
我们在来设置key值
在项目中,我们一般以表的名字当做key,来发送到某一个分区
接下来我们看一下自定义分区器
package com.example.client.entity;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
*
* 自定义分区器
* @param
* @return
* @throws Exception
*/
public class MyPartitioner implements Partitioner {
/**
*
*
* @param topic 主题
* @param key 消息的key
* @param value 消息的内容
* @param cluster 集群元数据可以查看分区信息
* @param keyBytes 消息的key 序列化后的字节数组
* @param valueBytes 消息的内容 序列化后的字节数组
* @return
* @throws Exception
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//获取消息
String msg=value.toString();
if(msg.equals("你好0")){
//如果是你好0 这个内容 就发送到1分区
return 1;
}
//否则发送到0分区
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
package com.example.client.entity;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
//生产者
public class Producer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建生产者的配置对象
Properties properties=new Properties();
//给kafka配置对象添加配置信息 bootstrap.serve
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux1:9092,linux2:9092,linux3:9092");
//key value序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class);
//创建kafka生产者对象
KafkaProducer<String,String>kafkaProducer=new KafkaProducer<String, String>(properties);
//调用send方法 发送消息
for (int i = 0; i < 5; i++) {
//给zhuti这个主题 发送消息
ProducerRecord<String, String> var1=new ProducerRecord("zhuti","你好"+i);
kafkaProducer.send(var1, new Callback() {
//异步回调 完成时
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e==null){
System.out.println("主题:"+recordMetadata.topic()+",分区:"+recordMetadata.partition());
}else {
//打印异常信息
e.printStackTrace();
}
}
});
}
//关闭资源
kafkaProducer.close();
}
}
可以看到我们的自定义分区器,使用成功了
接下来我们来看下生产者如何提高吞吐量
batch.size:批次大小,默认16k 到了16k才把数据发送出去
linger.ms:等待时间,修改为5-100ms 如果等待时间到了,那么会在随机一个分区进行使用
compression.type:压缩snappy
RecordAccumulator:缓冲区大小,修改为64m
batch.size和linger.ms有一个满足条件,就会发送消息
package com.example.client.entity;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
//生产者
public class Producer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建生产者的配置对象
Properties properties=new Properties();
//给kafka配置对象添加配置信息 bootstrap.serve
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux1:9092,linux2:9092,linux3:9092");
//key value序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//batch.size和linger.ms有一个满足条件,就会发送消息
//batch.size 批次大小 默认16k 到了16k才把数据发送出去
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16*1024);
//linger.ms 等待时间 默认为0毫秒 这里设置为5毫秒 如果等待时间到了,那么会在随机一个分区进行使用
properties.put(ProducerConfig.LINGER_MS_CONFIG,5);
//RecordAccumulator 缓冲区大小 默认32m
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,32*1024*1024);
//压缩 默认none 可以配置值gzip, snappy,lz4,zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
//创建kafka生产者对象
KafkaProducer<String,String>kafkaProducer=new KafkaProducer<String, String>(properties);
//调用send方法 发送消息
for (int i = 0; i < 5; i++) {
//给zhuti这个主题 发送消息
ProducerRecord<String, String> var1=new ProducerRecord("zhuti","你好"+i);
kafkaProducer.send(var1, new Callback() {
//异步回调 完成时
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e==null){
System.out.println("主题:"+recordMetadata.topic()+",分区:"+recordMetadata.partition());
}else {
//打印异常信息
e.printStackTrace();
}
}
});
}
//关闭资源
kafkaProducer.close();
}
}
接下来在看下生产经验-数据可靠性
acks应答级别:
acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。
数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
package com.example.client.entity;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
//生产者
public class Producer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建生产者的配置对象
Properties properties=new Properties();
//给kafka配置对象添加配置信息 bootstrap.serve
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux1:9092,linux2:9092,linux3:9092");
//key value序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//acks级别
properties.put(ProducerConfig.ACKS_CONFIG,"1");
//重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,3);
//创建kafka生产者对象
KafkaProducer<String,String>kafkaProducer=new KafkaProducer<String, String>(properties);
//调用send方法 发送消息
for (int i = 0; i < 5; i++) {
//给zhuti这个主题 发送消息
ProducerRecord<String, String> var1=new ProducerRecord("zhuti","你好"+i);
kafkaProducer.send(var1, new Callback() {
//异步回调 完成时
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e==null){
System.out.println("主题:"+recordMetadata.topic()+",分区:"+recordMetadata.partition());
}else {
//打印异常信息
e.printStackTrace();
}
}
});
}
//关闭资源
kafkaProducer.close();
}
}
acks级别设置为-1 ,但是还会有重复的数据
Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。
重复数据的判断标准:
具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;
Partition 表示分区号;
Sequence Number是单调自增的。
所以幂等性只能保证的是在单分区单会话内不重复。
Producer 在使用事务功能前,必须先自定义一个唯一的 transactional.id。
有了 transactional.id,即使客户端挂掉了,它重启后也能继续处理未完成的事务
如何使用幂等性
开启参数 enable.idempotence 默认为 true,false 关闭
我们来看下代码演示下事物
package com.example.client.entity;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
//生产者
public class Producer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建生产者的配置对象
Properties properties=new Properties();
//给kafka配置对象添加配置信息 bootstrap.serve
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux1:9092,linux2:9092,linux3:9092");
//key value序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//设置事物id
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "shiwu_id");
//创建kafka生产者对象
KafkaProducer<String,String>kafkaProducer=new KafkaProducer<String, String>(properties);
//初始化事物
kafkaProducer.initTransactions();
//开启事物
kafkaProducer.beginTransaction();
//调用send方法 发送消息
try {
for (int i = 0; i < 5; i++) {
//给zhuti这个主题 发送消息
ProducerRecord<String, String> var1=new ProducerRecord("zhuti","你好"+i);
kafkaProducer.send(var1, new Callback() {
//异步回调 完成时
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e==null){
System.out.println("主题:"+recordMetadata.topic()+",分区:"+recordMetadata.partition());
}else {
//打印异常信息
e.printStackTrace();
}
}
});
}
//模拟报错
int a=1/0;
//提交事物
kafkaProducer.commitTransaction();
}catch (Exception e){
//终止事物
kafkaProducer.abortTransaction();
}
//关闭资源
kafkaProducer.close();
}
}
可以看到数据报错,在消息者哪里,并没消费数据,这就是事物
生产经验——数据乱序
生产者
InFlightRequests,默认每个broker最多缓存5个请求
如果开启了幂等性且缓存的
请求个数小于5个。会在服务端重新排序
1)kafka在1.x版本之前保证数据单分区有序,条件如下:
max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)。
2)kafka在1.x及以后版本保证数据单分区有序,条件如下:
(1)未开启幂等性
max.in.flight.requests.per.connection需要设置为1。
(2)开启幂等性
max.in.flight.requests.per.connection需要设置小于等于5。
原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,
故无论如何,都可以保证最近5个request的数据都是有序的。