淘先锋技术网

首页 1 2 3 4 5 6 7

# 指定对应的配置文件启动
kafka-server-start.sh /hadoop/kafka/config/server.properties


# -daemon 以后台的方式启动
kafka-server-start.sh -daemon /hadoop/kafka/config/server.properties


kafka停止命令
kafka-server-stop.sh

或者

kill -9 kafka的pid


# 消费者列表查询
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --list


#创建 topic
./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 3 --partitions 1 --topic chltest
选项说明:
--topic 定义topic名
--replication-factor  定义副本数
--partitions  定义分区数

# 删除topic
./kafka-topics.sh --zookeeper server01:2181 --delete --topic chltest
出现提示:
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
这里只是标记删除,并没有删除数据,同时也不能往这个topic写入数据


# 修改topic的分区,注意:分区数量只能增加,不能减少
./kafka-topics.sh --zookeeper server01:2181 --alter --topic chltest --partitions 5


# describe命令还提供一些参数,用于过滤输出结果,如:
# --topic-with-overrides:可以找出所有包含覆盖配置的主题,它只会列出包含与集群不一样配置的主题
./kafka-topics.sh  --zookeeper kafka-test-1:2181 --describe --topics-with-overrides


# describe有两个参数用于找出有问题的分区 
# --unavailable-partitions:列出所有没有首领的分区,这些分区已经处于离线状态,对于生产者和消费者来说是不可用的
# --under-replicated-partitions:列出所有包含不同步副本的分区。
./kafka-topics.sh  --zookeeper kafka-test-1:2181 --describe --unavailable-partitions
./kafka-topics.sh  --zookeeper kafka-test-1:2181 --describe --under-replicated-partitions


#发送消息(生产者)
./kafka-console-producer.sh --broker-list kafka-test-1:9092 --topic chltest
>hello world
>hadoop  hadoop


#消费消息(消费者)(kafka 0.9 版本之前的  使用 --zookeeper ,而不是 --bootstrap-server)(注意:端口是 kafka 的端口,这里是 9092,而不是zookeeper端口)
./kafka-console-consumer.sh --bootstrap-server kafka-test-1:9092 --from-beginning --topic chltest --consumer-property group.id=test-consumer-group-1
通过添加–consumer-property group.id= test-consumer-group-1 指定消费组为 test-consumer-group-1 (同时也可以添加消费组)

 
#查看某个Topic的详情
./kafka-topics.sh --topic chltest --describe --zookeeper kafka-test-1:2181


获取kafka中所有组
./kafka-consumer-groups.sh  --bootstrap-server kafka-test-1:9092 --list

查看组消费情况
./kafka-consumer-groups.sh --bootstrap-server kafka-test-1:9092 --describe --group test-consumer-group-1
# 输出结果
TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
chltest         0          13              13              0               -               -               -


# CURRENT-OFFSET: 当前消费者群组最近提交的offset,也就是消费者分区里读取的当前位置
# LOG-END-OFFSET: 当前最高水位偏移量,也就是最近一个读取消息的偏移量,同时也是最近一个提交到集群的偏移量
# LAG:消费者的CURRENT-OFFSET与broker的LOG-END-OFFSET之间的差距


# 删除消费者群组
./kafka-consumer-groups.sh --bootstrap-server kafka-test-1:9092 --delete --group test-consumer-group-1


# 删除消费者群组中的topic -->chltest
./kafka-consumer-groups.sh --bootstrap-server kafka-test-1:9092 --delete --group test-consumer-group-1 --topic chltest