淘先锋技术网

首页 1 2 3 4 5 6 7

目录

一、Kafka 概述

1.1 定义

1.2 消息队列

1.3 传统消息队列的应用场景

1.3.1 消息队列的应用场景——缓冲/消峰

1.3.2 消息队列的应用场景——解耦

1.3.3 消息队列的应用场景——异步通信

1.4 消息队列的两种模式

1.5 Kafka 基础架构

二、Kafka 快速入门

2.1 安装部署

2.1.1 集群规划

2.1.2 集群部署

2.1.3 配置环境变量(两台机器都要执行)

2.1.4 安装 Zookeeper 集群

2.1.5 启动 kafka 集群

2.1.6 关闭集群

2.2 kafka 集群启停脚本

三、Kafka 命令行操作

3.1 Kafka 基础架构

3.2 主题命令行操作

3.2.1 查看操作主题命令参数

3.2.2 查看当前服务器中的所有 topic

3.2.3 创建 first topic

3.2.4 查看 first 主题的详情

3.2.5 修改分区数 (注意:分区数只能增加,不能减少)

3.2.6 删除 topic(暂不操作)

3.3 生产者命令行操作

3.3.1 查看操作生产者命令参数

3.3.2 发送消息

3.4 消费者命令行操作

3.4.1 查看操作消费者命令参数

3.4.2 消费信息(在 hadoop02 上操作)


 

一、Kafka 概述

1.1 定义

1.2 消息队列

        目前企业中比较常见的消息队列产品主要有 Kafka 、ActiveMQ、RabbitMQ、RocketMQ 等。

        在大数据场景主要采用 Kafka 作为消息队列。在 JavaEE 开发中主要采用 ActiveMQ、RabbitMQ、RocketMQ。

1.3 传统消息队列的应用场景

传统的消息队列的主要应用场景包括:缓存/消峰、解耦和异步通信。

1.3.1 消息队列的应用场景——缓冲/消峰

1.3.2 消息队列的应用场景——解耦

1.3.3 消息队列的应用场景——异步通信

1.4 消息队列的两种模式

1.5 Kafka 基础架构

  1. Producer:消息生产者,就是向 Kafka broker 发消息的客户端。

  2. Consumer:消息消费者,向 Kafka broker 取消息的客户端。

  3. Consumer Group(CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

  4. Broker:一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个broker 可以容纳多个 topic。

  5. Topic:可以理解为一个队列,生产者和消费者面向的都是一个 topic。

  6. Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。

  7. Replica:副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个Follower。

  8. Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 Leader。

  9. Follower:每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。 

二、Kafka 快速入门

2.1 安装部署

2.1.1 集群规划

主机名IP软件
hadoop01192.168.170.130zookeeper、kafka
hadoop02192.168.170.131zookeeper、kafka

2.1.2 集群部署

kafka 官网下载地址:Apache Kafka

#1. 解压安装包
[root@hadoop01 ~]# cd /opt/
[root@hadoop01 /opt]# ls
kafka_2.12-3.4.0.tgz
[root@hadoop01 /opt]# mkdir -p /opt/module
[root@hadoop01 /opt]# tar -zxvf kafka_2.12-3.4.0.tgz -C /opt/module/

#2. 修改解压后的文件名称
[root@hadoop01 /opt]# cd module/
[root@hadoop01 /opt/module]# mv kafka_2.12-3.4.0/ kafka

#3. 进入到 opt/module/kafka 目录,修改配置文件
[root@hadoop01 /opt/module]# cd kafka/config/
[root@hadoop01 /opt/module/kafka/config]# vim server.properties 

# broker 的全局唯一编号,不能重复,只能是数字
broker.id=0
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘 IO 的线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
# kafka 运行日志数据存放的路径 ,路径不需要提前创建 kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
# topic 在当前 broker 上的分区个数
num. partitions=1
# 用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个 topic 创建时的副本数,默认时 1 个副本
offsets.topic.replication.factor=1
# segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
# 每个 segment 文件的大小,默认最大 1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
# 配置连接 Zookeeper 集群地址 (在 zk 根目录下创建 kafka ,方便管理
zookeeper.connect=hadoop01:2181,hadoop02:2181/kafka

#4. 编写分发脚本
[root@hadoop01 /opt/module]# pwd
/opt/module
[root@hadoop01 /opt/module]# vim xsync
#!/bin/bash

#1. 判断参数个数
if [ $# -lt 1 ]
then
    echo Not Enough Arguement!
    exit;
fi

#2. 遍历集群所有机器
for host in hadoop01 hadoop02
do
    echo ====================  $host  ====================
    #3. 遍历所有目录,挨个发送

    for file in $@
    do
        #4. 判断文件是否存在
        if [ -e $file ]
            then
                #5. 获取父目录
                pdir=$(cd -P $(dirname $file); pwd)

                #6. 获取当前文件的名称
                fname=$(basename $file)
                ssh $host "mkdir -p $pdir"
                rsync -av $pdir/$fname $host:$pdir
            else
                echo $file does not exists!
        fi
    done
done

[root@hadoop01 /opt/module]# chmod 777 xsync 
[root@hadoop01 /opt/module]# mv xsync /usr/bin/

#5. 配置主机解析。两台机子都要执行
[root@hadoop01 /opt/module]# vim /etc/hosts
192.168.170.130 hadoop01
192.168.170.131 hadoop02

#6. 配置免密登录
# ⼀直回⻋就⾏
[root@hadoop01 /opt/module]# ssh-keygen -t rsa
 
# 等待一会,然后输入 yes,再输入各主机的密码即可
[root@hadoop01 /opt/module]# for i in hadoop01 hadoop02;do ssh-copy-id -i /root/.ssh/id_rsa.pub $i;done

#7. 分发安装包
[root@hadoop01 /opt/module]# xsync kafka/

#8. 在 hadoop02 上修改配置文件 /opt/module/kafka/config/server.properties 中的 broker.id=1
[root@hadoop02 ~]# vim /opt/module/kafka/config/server.properties 
broker.id=1
# 注 broker.id 不得重复,整个集群中唯一。

2.1.3 配置环境变量(两台机器都要执行)

[root@hadoop01 ~]# vim /etc/profile
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

[root@hadoop01 ~]# source /etc/profile

2.1.4 安装 Zookeeper 集群

可以查看我的这篇文章:【Zookeeper 初级】02、Zookeeper 集群部署_Stars.Sky的博客-CSDN博客

2.1.5 启动 kafka 集群

# 先启动 Zookeeper 集群,然后依次启动 Kafka
[root@hadoop01 ~]# zk.sh start

# 依次在 hadoop01、 hadoop02 节点上 启动 Kafka
[root@hadoop01 ~]# cd /opt/module/kafka/
[root@hadoop01 /opt/module/kafka]# bin/kafka-server-start.sh -daemon config/server.properties

[root@hadoop02 ~]# cd /opt/module/kafka/
[root@hadoop02 /opt/module/kafka]# bin/kafka-server-start.sh -daemon config/server.properties

# 查看集群状态
[root@hadoop01 /opt/module/kafka]# jpsall 
========= hadoop01 =========
1938 kafka.Kafka
2024 sun.tools.jps.Jps
1517 org.apache.zookeeper.server.quorum.QuorumPeerMain
========= hadoop02 =========
2002 sun.tools.jps.Jps
1512 org.apache.zookeeper.server.quorum.QuorumPeerMain
1930 kafka.Kafka

2.1.6 关闭集群

[root@hadoop01 /opt/module/kafka]# bin/kafka-server-stop.sh 

[root@hadoop02 /opt/module/kafka]# bin/kafka-server-stop.sh 

[root@hadoop01 /opt/module/kafka]# jpsall 
========= hadoop01 =========
2084 sun.tools.jps.Jps
1517 org.apache.zookeeper.server.quorum.QuorumPeerMain
========= hadoop02 =========
2064 sun.tools.jps.Jps
1512 org.apache.zookeeper.server.quorum.QuorumPeerMain

2.2 kafka 集群启停脚本

[root@hadoop01 /opt/module/kafka]# vim /usr/bin/kf.sh
#!/bin/bash

case $1 in
"start") {
        for i in hadoop01 hadoop02
        do
                echo " ------- 启动 $i Kafka ------- "
                ssh $i "source /etc/profile; /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
        done
};;

"stop") {
        for i in hadoop01 hadoop02
        do
                echo " ------- 停止 $i Kafka ------- "
                ssh $i "source /etc/profile; /opt/module/kafka/bin/kafka-server-stop.sh"
        done
};;
esac

[root@hadoop01 /opt/module/kafka]# chmod 777 /usr/bin/kf.sh 

# 启动
[root@hadoop01 /opt/module/kafka]# kf.sh start
 ------- 启动 hadoop01 Kafka ------- 
 ------- 启动 hadoop02 Kafka ------- 
[root@hadoop01 /opt/module/kafka]# jpsall 
========= hadoop01 =========
5689 sun.tools.jps.Jps
1517 org.apache.zookeeper.server.quorum.QuorumPeerMain
5647 kafka.Kafka
========= hadoop02 =========
5105 kafka.Kafka
1512 org.apache.zookeeper.server.quorum.QuorumPeerMain
5132 sun.tools.jps.Jps

# 停止
[root@hadoop01 /opt/module/kafka]# kf.sh stop
 ------- 停止 hadoop01 Kafka ------- 
 ------- 停止 hadoop02 Kafka ------- 

        注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper 集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息, Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。

三、Kafka 命令行操作

3.1 Kafka 基础架构

3.2 主题命令行操作

3.2.1 查看操作主题命令参数

[root@hadoop01 /opt/module/kafka]# bin/kafka-topics.sh 
参数描述
--bootstrap-server <String: server toconnect to>连接的 Kafka Broker 主机名称和端口号
--topic <String: topic>操作的 topic 名称
--create创建主题
--delete删除主题
--alter修改主题
--list查看所有主题
--describe查看主题详细描述
--partitions <Integer: # of partitions>设置分区数
--replication-factor<Integer: replication factor>设置分区副本
--config <String: name=value>更新系统默认的配置

3.2.2 查看当前服务器中的所有 topic

[root@hadoop01 /opt/module/kafka]# bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --list

3.2.3 创建 first topic

[root@hadoop01 /opt/module/kafka]# bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --topic first --create --partitions 1 --replication-factor 2
Created topic first.

[root@hadoop01 /opt/module/kafka]# bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --list
first

选项说明:

  • --topic:定义 topic 名

  • --replication factor:定义副本数

  • --partitions:定义分区数 

3.2.4 查看 first 主题的详情

[root@hadoop01 /opt/module/kafka]# bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --describe --topic first
Topic: first	TopicId: bZhLNYEkRI2en0RF4y5mTg	PartitionCount: 1	ReplicationFactor: 2	Configs: segment.bytes=1073741824
	Topic: first	Partition: 0	Leader: 0	Replicas: 0,1	Isr: 0,1

3.2.5 修改分区数 (注意:分区数只能增加,不能减少)

[root@hadoop01 /opt/module/kafka]# bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --topic first --partitions 3 --alter

[root@hadoop01 /opt/module/kafka]# bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --describe --topic first
Topic: first	TopicId: bZhLNYEkRI2en0RF4y5mTg	PartitionCount: 3	ReplicationFactor: 2	Configs: segment.bytes=1073741824
	Topic: first	Partition: 0	Leader: 0	Replicas: 0,1	Isr: 0,1
	Topic: first	Partition: 1	Leader: 1	Replicas: 1,0	Isr: 1,0
	Topic: first	Partition: 2	Leader: 0	Replicas: 0,1	Isr: 0,1

3.2.6 删除 topic(暂不操作)

[root@hadoop01 /opt/module/kafka]# bin/kafka-topics.sh --bootstrap-server hadoop01:9092 --topic first --delete

3.3 生产者命令行操作

3.3.1 查看操作生产者命令参数

[root@hadoop01 /opt/module/kafka]# bin/kafka-console-producer.sh 
参数描述
--bootstrap-server <String: server toconnect to>连接的 Kafka Broker 主机名称和端口号
--topic <String: topic>操作的 topic 名称

3.3.2 发送消息

[root@hadoop01 /opt/module/kafka]# bin/kafka-console-producer.sh --bootstrap-server hadoop01:9092 --topic first
>hello world

3.4 消费者命令行操作

3.4.1 查看操作消费者命令参数

[root@hadoop02 /opt/module/kafka]# bin/kafka-console-consumer.sh 
参数描述
--bootstrap-server <String: server toconnect to>连接的 Kafka Broker 主机名称和端口号
--topic <String: topic>操作的 topic 名称
--from-beginning从头开始消费
--group <String: consumer group id>指定消费者组名称

3.4.2 消费信息(在 hadoop02 上操作)

# 消费 first 主题中的数据(同时在 hadoop01 上发送 hello)
[root@hadoop02 /opt/module/kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first
hello

# 把主题中所有的数据都读取出来(包括历史数据)
[root@hadoop02 /opt/module/kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first --from-beginning
hello world
hello