淘先锋技术网

首页 1 2 3 4 5 6 7

消息队列2种模式:

点对点模式:生产者发送消息到队列,消费者消费消息,队列把消息删除;

发布订阅模式:生产者发送到不同的主题中,不同的消费者 订阅不同的主题,1个主题可以对应多个消费者;

在项目中常用的就是发布订阅模式;

kafka的主题可以分为多个分区;

消费者对应消费组,组内每个消费者并行消费;

为了提高可用性,每个分区增加若干个副本;

副本分为leader和follower,生产者和消费者只对应leader,当leader挂掉后,follower自动成为leader;

zookeeper可以存储kafka上下线信息,还可以纪录每一个分区谁是leader副本;

在kafka2.8.0之前必须使用zookeeper;

在kafka2.8.0之后可以不用zookeeper;

官网下载地址:http://kafka.apache.org/downloads.html

下载这个文件,然后上传到3台服务器中

关闭防火墙

systemctl stop firewalld

下载zookeeper,放入三台机器中

https://dlcdn.apache.org/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0-bin.tar.gz

我们来准备3台机器,设置hostname,分别在不同的机器执行不同的hostname

hostnamectl set-hostname linux1
hostnamectl set-hostname linux2
hostnamectl set-hostname linux3

每台机器都设置hosts

vi /etc/hosts
192.168.1.12 linux2
192.168.1.13 linux3
192.168.1.11 linux1

3台机器解压kafka

tar -zxvf kafka_2.12-3.0.0.tgz

修改解压后的文件名称 3台机器都执行

mv kafka_2.12-3.0.0/ kafka

3台机器都进入配置目录,分别修改配置

cd /opt/kafka/config/

vim server.properties

broker.id这里要注意,你复制的时候,不要都写成0

192.168.1.11的broker.id=0

192.168.1.12的broker.id=1

192.168.1.13的broker.id=2

内容如下

#broker 的全局唯一编号,不能重复,只能是数字。
broker.id=0
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以
配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/kafka/datas
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个 topic 创建时的副本数,默认时 1 个副本
offsets.topic.replication.factor=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个 segment 文件的大小,默认最大 1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
zookeeper.connect=linux1:2181,linux2:2181,linux3:2181/kafka

3台机器解压zookeeper,并重命名

tar -zxvf apache-zookeeper-3.8.0-bin.tar.gz
mv apache-zookeeper-3.8.0-bin zookeeper

3台机器添加环境变量

vim /etc/profile
#zookeeper
export ZK_HOME=/opt/zookeeper
export PATH=$PATH:$ZK_HOME/bin

3台机器刷新环境变量

source /etc/profile

3台机器进入zookeeper的配置目录

cd /opt/zookeeper/conf
mv zoo_sample.cfg zoo.cfg
vi zoo.cfg

2888为组成zookeeper服务器之间的通信端口

3888为用来选举leader的端口

server后面的数字与后面的myid相对应

在最下面加入下面的内容

server.1=192.168.1.11:2888:3888
server.2=192.168.1.12:2888:3888
server.3=192.168.1.13:2888:3888

在3台机器上创建data目录

mkdir -p /opt/zookeeper/data

3台机器再次修改zoo.cfg文件

vi /opt/zookeeper/conf/zoo.cfg

把刚才创建的data文件夹放入到这个地方

/opt/zookeeper/data
dataDir=/opt/zookeeper/data

在11机器上进入data目录 创建myid 为1

cd /opt/zookeeper/data/
vi myid

在12机器上进入data目录 创建myid 为2

cd /opt/zookeeper/data/
vi myid

在13机器上进入data目录 创建myid 为3

cd /opt/zookeeper/data/
vi myid

3台机器上启动zookeeper

cd /opt/zookeeper/bin/
 ./zkServer.sh start

查看zookeeper状态

 ./zkServer.sh status

关闭zookeeper可以使用

./zkServer.sh stop

我们可以看到节点2自动变成了leader,其他机器变成了follower

连接zk

./zkCli.sh -server linux1:2181

退出zk

quit

接下来我们在3台机器配置kafka环境变量

vim /etc/profile
#KAFKA_HOME
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin

刷新环境变量

source /etc/profile

在3台机器上启动kafka

cd /opt/kafka/
bin/kafka-server-start.sh -daemon config/server.properties

关闭kafka可以使用

 bin/kafka-server-stop.sh

注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper

集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,

Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。

我们进入zk连接

cd /opt/zookeeper/bin
./zkCli.sh -server linux1:2181
ls /

可以看到在zookeeper中自动创建了/kafka目录,也叫节点

获取节点的信息可以使用

get /kafka

kafka3.x入门教程(二)