kafka
JMS
在java中存在这样一个机制,java message service(java消息服务)简称JMS,他是用于异构系统之前的通信。JMS里面有一个中间件middleware,位于生产者与消费者中间,能够实现低耦合(生产者与消费者两种不同系统之间不需要交互,只需要与中间件进行通信即可),实现异步消息发送(生产者发送消息时,消费可以不需要在线,等上线后再接收),middleware有两种存储机制:
- queue队列(队列中的消息只能被一个消费者消费)P2P模式
- topic主题(消息会给每个消费者发送一个副本)发布订阅模式
kafka
kafka的介绍
kafka也是一个消息中间件系统,它是分布式的,kafka将队列和主题模式统一起来,kafka全是主题模式,没有队列,它将消息副本发送给所有的消费者分组,在同一组里面只能有一个消费者可以消费到数据。kafka需要借助zk来搭建集群。
kafka是分布式流(流指像水流一样源源不断,实时处理)处理平台,在系统之间构建实时数据流管道,以topic分类对记录进行存储,每个记录包含key-value+timestamp,每秒钟可高达百万消息吞吐量。
kafka的术语
- producer 消息生产者
- consumer 消息消费者
- consumer group 消费者组
- kafka server broker,kafka服务器
- topic 主题,副本数,分区.
kafka的安装
- 安装jdk
- 安装zk
- 下载kafka_2.12-1.1.0.tar.gz,并上传到服务器,tar开,分发到三台主机
- 配置环境变量
export KAFKA_HOME=/soft/kafka_2.12-1.1.0 export PATH=$PATH:$KAFKA_HOME/bin
- 配置配置文件/kafka_2.12-1.1.0/config/server.properties(注意:最好先复制一个做个备份,以防配置失败)标记序号的是需要配置的,其他默认即可
############################# Server Basics ############################# #1. 配置id,必须保证id在集群中是唯一的 broker.id=100 ############################# Socket Server Settings ############################# #2. 放开监听端口即可 listeners=PLAINTEXT://:9092 #如果你的环境有外网映射,那么你还需要下面这行属性,外网映射服务加端口 #advertised.listeners=PLAINTEXT://x.x.x.x:9092 #处理网络请求的线程数 num.network.threads=3 #处理磁盘IO的线程 num.io.threads=8 #发送缓冲区 socket.send.buffer.bytes=102400 #接收缓冲区 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 ############################# Log Basics ############################# # 3. 设置kafka日志存放目录 log.dirs=/home/kafka-logs #默认分区个数 num.partitions=1 #日志恢复的线程个数 num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Retention Policy ############################# #控制日志保留时间,168小时,7天 log.retention.hours=168 #日志文件最多保留1G log.segment.bytes=1073741824 #日志检查周期,5分钟检查一次 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# #4. 配置zk集群的地址 zookeeper.connect=Centos100:2181,Centos101:2181,Centos102:2181 #配置zk连接超时时间 zookeeper.connection.timeout.ms=6000 ############################# Group Coordinator Settings ############################# group.initial.rebalance.delay.ms=0
- 分发配置,并修改id,保证id唯一
- 启动
#先启动zk zkServer.sh start #启动kafka,-daemon是将kafka作为守护线程,放到后台运行 kafka-server-start.sh [-daemon] /soft/kafka_2.12-1.1.0/config/server.properties #验证启动是否成功 netstat -anop|grep 9092
- 创建主题
#连接zk服务器Centos100:2181,创建副本数是3,分区数为3的主题test1,注意副本数必须小于等于服务器节点数 kafka-topics.sh --create --zookeeper Centos100:2181,Centos101:2181,Centos102:2181 --replication-factor 3 --partitions 3 --topic test1 #查看主题 kafka-topics.sh --list --zookeeper Centos100:2181,Centos101:2181,Centos102:2181
- 模拟生产者和消费者
#创建生产者,连接topic为test1的主题,注意:生产者不连接zk kafka-console-producer.sh --broker-list Centos100:9092,Centos101:9092,Centos102:9092 --topic test1 #创建消费者,连接topic为test1的主题,从头开始消费,消费者连接zk kafka-console-consumer.sh --zookeeper Centos100:2181,Centos101:2181,Centos102:2181 --topic test1 --from-beginning
kafka在ZK中的体现
根目录下:
- /controller 用于存放控制器的地方,一个kafka集群只会推选出一个控制器(该目录只有在kafka启动的时候才会生成)
- /brokers
- /brokers/ids 用于存放kafka集群中服务器id的地方,每一个id目录下面都包含了它的连接信息
- /brokers/topics 用于存放kafka主题的地方,每一个主题下面还有分区,并且记录的分区下的所有副本分别在哪台服务器上,并且记录了副本的leader(领导者)在哪台服务器上
- /admin/delete_topics 记录被删除的主题
- /consumers 记录消费者组和消费者组消费的topic以及偏移量,只有消费者在线消费才能看到,消费者下线会自动注销
注意: 生产者不在zk中注册,所以生产者不连接zk,消费者要连接zk,kafka中并没有提供完全删除主题的功能,如果想完全删除,可以去zk中删除topic的相关文件
手动平衡kafka分区和副本
#创建topic名为test2,指定3个分区,每个分区3个副本,每个副本分别在100,101,102主机上,这样就可以均匀实现分区的平衡
kafka-topics.sh --create --zookeeper Centos100:2181,Centos101:2181,Centos102:2181 --topic test2 --replica-assignment 100:101:102,100:101:102,100:101:102
创建完毕我们可以到我们指定的kafka日志目录下查看:
我们还可以到zk中去查看:
副本
broker存放消息以消息达到顺序存放。生产和消费都是副本感知的。支持到n-1故障,只要有一个副本还在,就可以使用。每个分区下的副本都有leader,follow。leader挂掉时,消息分区写入到本地log或者在向生产者发送消息确认回执之前,生产者向新的leader发送消息。新leader的选举是通过isr进行,第一个注册的follower成为leader。
kafka支持副本模式
- 同步副本(可靠,但效率低)
- producer联系zk识别leader
- producer向leader发送消息
- leadr收到消息写入到本地log
- follower从leader pull(拉)消息
- follower向本地写入log
- follower向leader发送ack(确认写入完毕)消息
- leader收到所有follower的ack消息
- leader向producer回传ack
- 异步副本(不可靠,容易丢数据,效率高)
和同步复制的区别在于leader写入本地log之后,直接向producer回传ack消息,不需要等待所有follower复制完成。
JAVA API
环境搭建
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
生产者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ProducerTest {
public static void main(String[] args) {
Properties prop = new Properties();
prop.put("bootstrap.servers","192.168.200.100:9092,192.168.200.101:9092,192.168.200.102:9092");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
//设置是否全部提交(同步副本)
prop.put("acks","all");
Producer<String,String> producer = new KafkaProducer<String,String>(prop);
for(int i = 0; i < 2147483;i++){
ProducerRecord<String,String> record = new ProducerRecord<String,String>("test1",(new Date()).getTime() + "-" + i ,"Hello world suda" + i);
producer.send(record);
}
//必须加关闭,否则生产者断开时,kafka服务端不能完整接收数据
producer.close();
}
}
消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ConsumerTest {
public static void main(String[] args) {
Properties props = new Properties();
//新版的kafka客户端已经不支持连接zk去连接kafka,而是直接连kafka
props.setProperty("bootstrap.servers", "192.168.200.100:9092,192.168.200.101:9092,192.168.200.102:9092");
props.setProperty("group.id", "aaa");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//添加主题
consumer.subscribe(Arrays.asList("test1"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100).toMillis());
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
kafka集成flume
-
flume作为生产者,即kafka作为flume的sink
a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=8888 a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic=test1 a1.sinks.k1.kafka.bootstrap.servers=Centos100:9092,Centos101:9092,Centos102:9092 a1.sinks.k1.kafka.flumeBatchSize=20 a1.sinks.k1.kafka.producer.acks=1 a1.channels.c1.type=memory a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
-
flume作为消费者,即kafka作为source
a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize=5000 a1.sources.r1.batchDurationMillis=2000 a1.sources.r1.kafka.bootstrap.servers=Centos100:9092,Centos101:9092,Centos102:9092 a1.sources.r1.kafka.topics=test1 a1.sources.r1.kafka.consumer.group.id=g4 a1.sinks.k1.type=logger a1.channels.c1.type=memory a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
-
flume既作为生产者,又作为消费者
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 a1.sinks.k1.type = logger a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = Centos100:9092,Centos101:9092,Centos102:9092 a1.channels.c1.kafka.topic = test1 a1.channels.c1.kafka.consumer.group.id = g6 a1.channels.c1.parseAsFlumeEvent=false a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1