目录
一、Kafka 概述
1.1 定义
1.2 消息队列
目前企业中比较常见的消息队列产品主要有 Kafka 、ActiveMQ、RabbitMQ、RocketMQ 等。
在大数据场景主要采用 Kafka 作为消息队列。在 JavaEE 开发中主要采用 ActiveMQ、RabbitMQ、RocketMQ。
1.3 传统消息队列的应用场景
传统的消息队列的主要应用场景包括:缓存/消峰、解耦和异步通信。
1.3.1 消息队列的应用场景——缓冲/消峰
1.3.2 消息队列的应用场景——解耦
1.3.3 消息队列的应用场景——异步通信
1.4 消息队列的两种模式
1.5 Kafka 基础架构
-
Producer:消息生产者,就是向 Kafka broker 发消息的客户端。
-
Consumer:消息消费者,向 Kafka broker 取消息的客户端。
-
Consumer Group(CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
-
Broker:一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个broker 可以容纳多个 topic。
-
Topic:可以理解为一个队列,生产者和消费者面向的都是一个 topic。
-
Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。
-
Replica:副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个Follower。
-
Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 Leader。
-
Follower:每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。
二、Kafka 快速入门
2.1 安装部署
2.1.1 集群规划
主机名 | IP | 软件 |
hadoop01 | 192.168.170.130 | zookeeper、kafka |
hadoop02 | 192.168.170.131 | zookeeper、kafka |
2.1.2 集群部署
kafka 官网下载地址:Apache Kafka
#1. 解压安装包
[root@hadoop01 ~]# cd /opt/
[root@hadoop01 /opt]# ls
kafka_2.12-3.4.0.tgz
[root@hadoop01 /opt]# mkdir -p /opt/module
[root@hadoop01 /opt]# tar -zxvf kafka_2.12-3.4.0.tgz -C /opt/module/
#2. 修改解压后的文件名称
[root@hadoop01 /opt]# cd module/
[root@hadoop01 /opt/module]# mv kafka_2.12-3.4.0/ kafka
#3. 进入到 opt/module/kafka 目录,修改配置文件
[root@hadoop01 /opt/module]# cd kafka/config/
[root@hadoop01 /opt/module/kafka/config]# vim server.properties
# broker 的全局唯一编号,不能重复,只能是数字
broker.id=0
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘 IO 的线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
# kafka 运行日志数据存放的路径 ,路径不需要提前创建 kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
# topic 在当前 broker 上的分区个数
num. partitions=1
# 用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个 topic 创建时的副本数,默认时 1 个副本
offsets.topic.replication.factor=1
# segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
# 每个 segment 文件的大小,默认最大 1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
# 配置连接 Zookeeper 集群地址 (在 zk 根目录下创建 kafka ,方便管理
zookeeper.connect=hadoop01:2181,hadoop02:2181/kafka
#4. 编写分发脚本
[root@hadoop01 /opt/module]# pwd
/opt/module
[root@hadoop01 /opt/module]# vim xsync
#!/bin/bash
#1. 判断参数个数
if [ $# -lt 1 ]
then
echo Not Enough Arguement!
exit;
fi
#2. 遍历集群所有机器
for host in hadoop01 hadoop02
do
echo ==================== $host ====================
#3. 遍历所有目录,挨个发送
for file in $@
do
#4. 判断文件是否存在
if [ -e $file ]
then
#5. 获取父目录
pdir=$(cd -P $(dirname $file); pwd)
#6. 获取当前文件的名称
fname=$(basename $file)
ssh $host "mkdir -p $pdir"
rsync -av $pdir/$fname $host:$pdir
else
echo $file does not exists!
fi
done
done
[root@hadoop01 /opt/module]# chmod 777 xsync
[root@hadoop01 /opt/module]# mv xsync /usr/bin/
#5. 配置主机解析。两台机子都要执行
[root@hadoop01 /opt/module]# vim /etc/hosts
192.168.170.130 hadoop01
192.168.170.131 hadoop02
#6. 配置免密登录
# ⼀直回⻋就⾏
[root@hadoop01 /opt/module]# ssh-keygen -t rsa
# 等待一会,然后输入 yes,再输入各主机的密码即可
[root@hadoop01 /opt/module]# for i in hadoop01 hadoop02;do ssh-copy-id -i /root/.ssh/id_rsa.pub $i;done
#7. 分发安装包
[root@hadoop01 /opt/module]# xsync kafka/
#8. 在 hadoop02 上修改配置文件 /opt/module/kafka/config/server.properties 中的 broker.id=1
[root@hadoop02 ~]# vim /opt/module/kafka/config/server.properties
broker.id=1
# 注 broker.id 不得重复,整个集群中唯一。
2.1.3 配置环境变量(两台机器都要执行)
[root@hadoop01 ~]# vim /etc/profile
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[root@hadoop01 ~]# source /etc/profile
2.1.4 安装 Zookeeper 集群
可以查看我的这篇文章:【Zookeeper 初级】02、Zookeeper 集群部署_Stars.Sky的博客-CSDN博客
2.1.5 启动 kafka 集群
# 先启动 Zookeeper 集群,然后依次启动 Kafka
[root@hadoop01 ~]# zk.sh start
# 依次在 hadoop01、 hadoop02 节点上 启动 Kafka
[root@hadoop01 ~]# cd /opt/module/kafka/
[root@hadoop01 /opt/module/kafka]# bin/kafka-server-start.sh -daemon config/server.properties
[root@hadoop02 ~]# cd /opt/module/kafka/
[root@hadoop02 /opt/module/kafka]# bin/kafka-server-start.sh -daemon config/server.properties
# 查看集群状态
[root@hadoop01 /opt/module/kafka]# jpsall
========= hadoop01 =========
1938 kafka.Kafka
2024 sun.tools.jps.Jps
1517 org.apache.zookeeper.server.quorum.QuorumPeerMain
========= hadoop02 =========
2002 sun.tools.jps.Jps
1512 org.apache.zookeeper.server.quorum.QuorumPeerMain
1930 kafka.Kafka
2.1.6 关闭集群
[root@hadoop01 /opt/module/kafka]# bin/kafka-server-stop.sh
[root@hadoop02 /opt/module/kafka]# bin/kafka-server-stop.sh
[root@hadoop01 /opt/module/kafka]# jpsall
========= hadoop01 =========
2084 sun.tools.jps.Jps
1517 org.apache.zookeeper.server.quorum.QuorumPeerMain
========= hadoop02 =========
2064 sun.tools.jps.Jps
1512 org.apache.zookeeper.server.quorum.QuorumPeerMain
2.2 kafka 集群启停脚本
[root@hadoop01 /opt/module/kafka]# vim /usr/bin/kf.sh
#!/bin/bash
case $1 in
"start") {
for i in hadoop01 hadoop02
do
echo " ------- 启动 $i Kafka ------- "
ssh $i "source /etc/profile; /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
done
};;
"stop") {
for i in hadoop01 hadoop02
do
echo " ------- 停止 $i Kafka ------- "
ssh $i "source /etc/profile; /opt/module/kafka/bin/kafka-server-stop.sh"
done
};;
esac
[root@hadoop01 /opt/module/kafka]# chmod 777 /usr/bin/kf.sh
# 启动
[root@hadoop01 /opt/module/kafka]# kf.sh start
------- 启动 hadoop01 Kafka -------
------- 启动 hadoop02 Kafka -------
[root@hadoop01 /opt/module/kafka]# jpsall
========= hadoop01 =========
5689 sun.tools.jps.Jps
1517 org.apache.zookeeper.server.quorum.QuorumPeerMain
5647 kafka.Kafka
========= hadoop02 =========
5105 kafka.Kafka
1512 org.apache.zookeeper.server.quorum.QuorumPeerMain
5132 sun.tools.jps.Jps
# 停止
[root@hadoop01 /opt/module/kafka]# kf.sh stop
------- 停止 hadoop01 Kafka -------
------- 停止 hadoop02 Kafka -------
注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper 集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息, Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。
三、Kafka 命令行操作
3.1 Kafka 基础架构
3.2 主题命令行操作
3.2.1 查看操作主题命令参数
[root@hadoop01 /opt/module/kafka]# bin/kafka-topics.sh
参数 | 描述 |
--bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号 |
--topic <String: topic> | 操作的 topic 名称 |
--create | 创建主题 |
--delete | 删除主题 |
--alter | 修改主题 |
--list | 查看所有主题 |
--describe | 查看主题详细描述 |
--partitions <Integer: # of partitions> | 设置分区数 |
--replication-factor<Integer: replication factor> | 设置分区副本 |
--config <String: name=value> | 更新系统默认的配置 |
3.2.2 查看当前服务器中的所有 topic
[root@hadoop01 /opt/module/kafka]# bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --list
3.2.3 创建 first topic
[root@hadoop01 /opt/module/kafka]# bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --topic first --create --partitions 1 --replication-factor 2
Created topic first.
[root@hadoop01 /opt/module/kafka]# bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --list
first
选项说明:
-
--topic:定义 topic 名
-
--replication factor:定义副本数
-
--partitions:定义分区数
3.2.4 查看 first 主题的详情
[root@hadoop01 /opt/module/kafka]# bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --describe --topic first
Topic: first TopicId: bZhLNYEkRI2en0RF4y5mTg PartitionCount: 1 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
3.2.5 修改分区数 (注意:分区数只能增加,不能减少)
[root@hadoop01 /opt/module/kafka]# bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --topic first --partitions 3 --alter
[root@hadoop01 /opt/module/kafka]# bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --describe --topic first
Topic: first TopicId: bZhLNYEkRI2en0RF4y5mTg PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: first Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: first Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
3.2.6 删除 topic(暂不操作)
[root@hadoop01 /opt/module/kafka]# bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --topic first --delete
3.3 生产者命令行操作
3.3.1 查看操作生产者命令参数
[root@hadoop01 /opt/module/kafka]# bin/kafka-console-producer.sh
参数 | 描述 |
--bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号 |
--topic <String: topic> | 操作的 topic 名称 |
3.3.2 发送消息
[root@hadoop01 /opt/module/kafka]# bin/kafka-console-producer.sh --bootstrap-server hadoop01:9092 --topic first
>hello world
3.4 消费者命令行操作
3.4.1 查看操作消费者命令参数
[root@hadoop02 /opt/module/kafka]# bin/kafka-console-consumer.sh
参数 | 描述 |
--bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号 |
--topic <String: topic> | 操作的 topic 名称 |
--from-beginning | 从头开始消费 |
--group <String: consumer group id> | 指定消费者组名称 |
3.4.2 消费信息(在 hadoop02 上操作)
# 消费 first 主题中的数据(同时在 hadoop01 上发送 hello)
[root@hadoop02 /opt/module/kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first
hello
# 把主题中所有的数据都读取出来(包括历史数据)
[root@hadoop02 /opt/module/kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first --from-beginning
hello world
hello