环境
linux版本:centos7.9
kafka版本:3.5.1
jdk版本:jdk1.8,kafka是scala语言开发的,该语言依赖JVM运行环境
ZooKeeper版本:3.8.2,用于选举partition
下载
选择kafka_2.13-3.5.1.tgz
安装包,其中2.13是开发kafka语言scala版本,3.5.1是kafka版本。
Scala语言说明:
Scala是一种运行于JVM虚拟机之上的语言。在运行时,只需要安装JDK就可以了,选哪个Scala版本没有区别。但是如果要调试源码,就必须选择对应的Scala版本。因为Scala语言的版本并不是向后兼容的。
下载解压:
[root@hecs-403280 ~]# wget https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
[root@hecs-403280 ~]# cp kafka_2.13-3.5.1.tgz /usr/local
[root@hecs-403280 local]# tar -zxvf kafka_2.13-3.5.1.tgz
单机服务
kafka自带有ZooKeeper,在kafka目录下的libs下可看到ZooKeeper相关jar。建议使用自己安装的ZooKeeper,这里还是讲一下如何使用kafka自带的ZooKeeper服务。
启动服务
启动自带ZooKeeper:
# 可以看到相应的zk程序jar包
[root@hecs-403280 kafka_2.13-3.5.1]# ls libs | grep zookeeper
zookeeper-3.6.4.jar
zookeeper-jute-3.6.4.jar
# 后台启动zk,默认端口为2181,对应的配置文件在config/zookeeper.properties
[root@hecs-403280 kafka_2.13-3.5.1]# nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
启动kafka:
# nohup命令启动kafka,日志默认输出到nohup.out文件
[root@hecs-403280 kafka_2.13-3.5.1]# nohup bin/kafka-server-start.sh config/server.properties &
kafka默认端口为9092
,如下说明kafka服务已启动:
[root@hecs-403280 kafka_2.13-3.5.1]# netstat -tunlp | grep 9092
tcp6 0 0 :::9092 :::* LISTEN 15281/java
也可以通过如下命令:
[root@hecs-403280 kafka_2.13-3.5.1]# jps
15281 Kafka
22540 Jps
27036 QuorumPeerMain
简单收发消息
生产消费图示:
创建topic:
# 创建一个名为test的Topic
[root@hecs-403280 kafka_2.13-3.5.1]# bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
Created topic test.
# 查看Topic
[root@hecs-403280 kafka_2.13-3.5.1]# bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
Topic: test TopicId: A46bV8mbQS-OeKeYUaJPIg PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
[root@hecs-403280 kafka_2.13-3.5.1]#
启动一个生产端发送消息:
[root@hecs-403280 kafka_2.13-3.5.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>jay
可通过ctrl+C
退出命令行。
如果不提前创建Topic,那么在第一次往一个之前不存在的Topic发送消息时,消息也能正常发送,只是会抛出LEADER_NOT_AVAILABLE
警告。
启动一个消费端接收消息:
[root@hecs-403280 kafka_2.13-3.5.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
jay
如果客户端先发送消息,再打开消费端时是看不到历史消息的,可以通过指定partition和offset来消费消息:
[root@hecs-403280 kafka_2.13-3.5.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --partition 0 --offset 0 --topic test
jay
ji
jolin
jay
如果想看命令支持哪些参数,可以直接输入命令,后面不接任何参数:
部分结果:
集群服务
环境准备
这里不采用kafka自带的ZooKeeper,而是使用单独的ZK集群。生产环境中,ZK是比较基础的中间件,这里使用公共的ZK便于维护。
ZooKeeper安装请参考如下文章。
传送门:https://blog.csdn.net/u010355502/article/details/132308824
准备三台服务:
192.168.0.83
192.168.0.184
192.168.0.48
关闭防护墙:
如果是云服务器,打开安全组中对应的端口即可。
# 查看防火墙状态
[root@hecs-403280 ~]# firewall-cmd --state
not running
# 如果防火墙是开启的,则关闭防火墙
[root@hecs-403280 ~]# systemctl stop firewalld.service
下载安装
kafka服务并不需要进行选举,因此也没有奇数台服务的建议。
下载kafka安装包后,放到三台服务器的/usr/local目录下。
[root@hecs-403280 ~]# scp ~/kafka_2.13-3.5.1.tgz [email protected]:/usr/local
[root@hecs-403280 ~]# scp ~/kafka_2.13-3.5.1.tgz [email protected]:/usr/local
分别进入三台机器将安装包解压。
修改配置文件
三台服务器都要修改配置文件,注意broker.id
不能相同。
[root@hecs-403280 config]# pwd
/usr/local/kafka_2.13-3.5.1/config
[root@hecs-403280 config]# vim server.properties
修改server.properties文件如下:
# broker的全局唯一编号,不能重复,只能是数字。
broker.id=0
# 数据文件地址。默认是给的/tmp目录。
log.dirs=/usr/local/kafka_2.13-3.5.1/kafka-logs
# 默认的每个Topic的分区数
num.partitions=1
# kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://192.168.0.83:9092
# zookeeper的服务地址
# zookeeper.connect=192.168.0.83:2181,192.168.0.184:2181,192.168.0.48:2181
# 可以选择指定zookeeper上的基础节点。
zookeeper.connect=192.168.0.83:2181,192.168.0.184:2181,192.168.0.48:2181/kafka
详细参数说明:
Property | Default | Description |
---|---|---|
broker.id | 0 | broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯一,每个broker都可以用一个唯一的非负整数id进行标识。 |
log.dirs | /tmp/kafka-logs | kafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行。 |
listeners | PLAINTEXT://127.0.0.1:9092 | server接受客户端连接的端口,ip配置kafka本机ip即可 |
zookeeper.connect | localhost:2181 | zookeeper连接地址。hostname:port。如果是Zookeeper集群,用逗号连接。 |
log.retention.hours | 168 | 每个日志文件删除之前保存的时间。 |
num.partitions | 1 | 创建topic的默认分区数 |
default.replication.factor | 1 | 自动创建topic的默认副本数量 |
min.insync.replicas | 1 | 当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常 |
delete.topic.enable | false | 是否允许删除主题 |
启动集群
启动服务时需要指定配置文件:
bin/kafka-server-start.sh -daemon config/server.properties
或
# 不建议使用下面的命令,因为当前shell窗口关闭后,kafka服务也停止了
bin/kafka-server-start.sh config/server.properties &
-daemon
表示后台启动kafka服务,这样就不会占用当前命令窗口。
启动日志位置:logs/server.log
。
停止kafka服务:bin/kafka-server-stop.sh
或者 kill pid
。
启动报错如下:
[root@ecs-002 kafka_2.13-3.5.1]# bin/kafka-server-start.sh config/server.properties
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 1073741824 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /usr/local/kafka_2.13-3.5.1/hs_err_pid6550.log
提示分配内存大小不足,修改启动命令的内存参数:
修改为:
再次启动,就没有报错了。
可能还会出现另一种报错:
[2023-08-22 11:39:44,066] ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$)
kafka.common.InconsistentClusterIdException: The Cluster ID C2l9ZUrNRvKKNJKY48oRPg doesn't match stored clusterId Some(1C2l9ZUrNRvKKNJKY48oRPg) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
at kafka.server.KafkaServer.startup(KafkaServer.scala:242)
at kafka.Kafka$.main(Kafka.scala:113)
at kafka.Kafka.main(Kafka.scala)
[2023-08-22 11:39:44,073] INFO shutting down (kafka.server.KafkaServer)
原因:
报错说的是集群中的集群id不匹配,正常情况下同一个集群下的kafka生成的cluster.id
是相同的,可能原因是之前启动过集群导致的混乱。
解决方案:
进入报错机器对应的kafka数据目录(在conf/server.properties
中可找到数据目录),找到meta.properties
文件,删除掉,然后再次启动kafka服务。
通过jps指令可以查看各个节点Kafka的进程:
[root@hecs-403280 logs]# jps
28134 Kafka
27036 QuorumPeerMain
30686 Jps
在Zookeeper中查看集群是否启动起来:
[root@ecs-003 apache-zookeeper-3.8.2-bin]# bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /kafka/brokers/ids
[0, 1, 2]
可以看到集群里面已经有3个kafka节点了。