消息队列2种模式:
点对点模式:生产者发送消息到队列,消费者消费消息,队列把消息删除;
发布订阅模式:生产者发送到不同的主题中,不同的消费者 订阅不同的主题,1个主题可以对应多个消费者;
在项目中常用的就是发布订阅模式;
kafka的主题可以分为多个分区;
消费者对应消费组,组内每个消费者并行消费;
为了提高可用性,每个分区增加若干个副本;
副本分为leader和follower,生产者和消费者只对应leader,当leader挂掉后,follower自动成为leader;
zookeeper可以存储kafka上下线信息,还可以纪录每一个分区谁是leader副本;
在kafka2.8.0之前必须使用zookeeper;
在kafka2.8.0之后可以不用zookeeper;
官网下载地址:http://kafka.apache.org/downloads.html
下载这个文件,然后上传到3台服务器中
关闭防火墙
systemctl stop firewalld
下载zookeeper,放入三台机器中
https://dlcdn.apache.org/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0-bin.tar.gz
我们来准备3台机器,设置hostname,分别在不同的机器执行不同的hostname
hostnamectl set-hostname linux1
hostnamectl set-hostname linux2
hostnamectl set-hostname linux3
每台机器都设置hosts
vi /etc/hosts
192.168.1.12 linux2
192.168.1.13 linux3
192.168.1.11 linux1
3台机器解压kafka
tar -zxvf kafka_2.12-3.0.0.tgz
修改解压后的文件名称 3台机器都执行
mv kafka_2.12-3.0.0/ kafka
3台机器都进入配置目录,分别修改配置
cd /opt/kafka/config/
vim server.properties
broker.id这里要注意,你复制的时候,不要都写成0
192.168.1.11的broker.id=0
192.168.1.12的broker.id=1
192.168.1.13的broker.id=2
内容如下
#broker 的全局唯一编号,不能重复,只能是数字。
broker.id=0
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以
配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/kafka/datas
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个 topic 创建时的副本数,默认时 1 个副本
offsets.topic.replication.factor=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个 segment 文件的大小,默认最大 1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
zookeeper.connect=linux1:2181,linux2:2181,linux3:2181/kafka
3台机器解压zookeeper,并重命名
tar -zxvf apache-zookeeper-3.8.0-bin.tar.gz
mv apache-zookeeper-3.8.0-bin zookeeper
3台机器添加环境变量
vim /etc/profile
#zookeeper
export ZK_HOME=/opt/zookeeper
export PATH=$PATH:$ZK_HOME/bin
3台机器刷新环境变量
source /etc/profile
3台机器进入zookeeper的配置目录
cd /opt/zookeeper/conf
mv zoo_sample.cfg zoo.cfg
vi zoo.cfg
2888为组成zookeeper服务器之间的通信端口
3888为用来选举leader的端口
server后面的数字与后面的myid相对应
在最下面加入下面的内容
server.1=192.168.1.11:2888:3888
server.2=192.168.1.12:2888:3888
server.3=192.168.1.13:2888:3888
在3台机器上创建data目录
mkdir -p /opt/zookeeper/data
3台机器再次修改zoo.cfg文件
vi /opt/zookeeper/conf/zoo.cfg
把刚才创建的data文件夹放入到这个地方
/opt/zookeeper/data
dataDir=/opt/zookeeper/data
在11机器上进入data目录 创建myid 为1
cd /opt/zookeeper/data/
vi myid
在12机器上进入data目录 创建myid 为2
cd /opt/zookeeper/data/
vi myid
在13机器上进入data目录 创建myid 为3
cd /opt/zookeeper/data/
vi myid
3台机器上启动zookeeper
cd /opt/zookeeper/bin/
./zkServer.sh start
查看zookeeper状态
./zkServer.sh status
关闭zookeeper可以使用
./zkServer.sh stop
我们可以看到节点2自动变成了leader,其他机器变成了follower
连接zk
./zkCli.sh -server linux1:2181
退出zk
quit
接下来我们在3台机器配置kafka环境变量
vim /etc/profile
#KAFKA_HOME
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
刷新环境变量
source /etc/profile
在3台机器上启动kafka
cd /opt/kafka/
bin/kafka-server-start.sh -daemon config/server.properties
关闭kafka可以使用
bin/kafka-server-stop.sh
注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper
集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,
Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。
我们进入zk连接
cd /opt/zookeeper/bin
./zkCli.sh -server linux1:2181
ls /
可以看到在zookeeper中自动创建了/kafka目录,也叫节点
获取节点的信息可以使用
get /kafka