# 指定对应的配置文件启动
kafka-server-start.sh /hadoop/kafka/config/server.properties
# -daemon 以后台的方式启动
kafka-server-start.sh -daemon /hadoop/kafka/config/server.properties
kafka停止命令
kafka-server-stop.sh
或者
kill -9 kafka的pid
# 消费者列表查询
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
#创建 topic
./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 3 --partitions 1 --topic chltest
选项说明:
--topic 定义topic名
--replication-factor 定义副本数
--partitions 定义分区数
# 删除topic
./kafka-topics.sh --zookeeper server01:2181 --delete --topic chltest
出现提示:
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
这里只是标记删除,并没有删除数据,同时也不能往这个topic写入数据
# 修改topic的分区,注意:分区数量只能增加,不能减少
./kafka-topics.sh --zookeeper server01:2181 --alter --topic chltest --partitions 5
# describe命令还提供一些参数,用于过滤输出结果,如:
# --topic-with-overrides:可以找出所有包含覆盖配置的主题,它只会列出包含与集群不一样配置的主题
./kafka-topics.sh --zookeeper kafka-test-1:2181 --describe --topics-with-overrides
# describe有两个参数用于找出有问题的分区
# --unavailable-partitions:列出所有没有首领的分区,这些分区已经处于离线状态,对于生产者和消费者来说是不可用的
# --under-replicated-partitions:列出所有包含不同步副本的分区。
./kafka-topics.sh --zookeeper kafka-test-1:2181 --describe --unavailable-partitions
./kafka-topics.sh --zookeeper kafka-test-1:2181 --describe --under-replicated-partitions
#发送消息(生产者)
./kafka-console-producer.sh --broker-list kafka-test-1:9092 --topic chltest
>hello world
>hadoop hadoop
#消费消息(消费者)(kafka 0.9 版本之前的 使用 --zookeeper ,而不是 --bootstrap-server)(注意:端口是 kafka 的端口,这里是 9092,而不是zookeeper端口)
./kafka-console-consumer.sh --bootstrap-server kafka-test-1:9092 --from-beginning --topic chltest --consumer-property group.id=test-consumer-group-1
通过添加–consumer-property group.id= test-consumer-group-1 指定消费组为 test-consumer-group-1 (同时也可以添加消费组)
#查看某个Topic的详情
./kafka-topics.sh --topic chltest --describe --zookeeper kafka-test-1:2181
获取kafka中所有组
./kafka-consumer-groups.sh --bootstrap-server kafka-test-1:9092 --list
查看组消费情况
./kafka-consumer-groups.sh --bootstrap-server kafka-test-1:9092 --describe --group test-consumer-group-1
# 输出结果
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
chltest 0 13 13 0 - - -
# CURRENT-OFFSET: 当前消费者群组最近提交的offset,也就是消费者分区里读取的当前位置
# LOG-END-OFFSET: 当前最高水位偏移量,也就是最近一个读取消息的偏移量,同时也是最近一个提交到集群的偏移量
# LAG:消费者的CURRENT-OFFSET与broker的LOG-END-OFFSET之间的差距
# 删除消费者群组
./kafka-consumer-groups.sh --bootstrap-server kafka-test-1:9092 --delete --group test-consumer-group-1
# 删除消费者群组中的topic -->chltest
./kafka-consumer-groups.sh --bootstrap-server kafka-test-1:9092 --delete --group test-consumer-group-1 --topic chltest