Kafka的命令都是基于bin目录下的脚本来使用的。
Topic命令
Topic的命令脚本是kafka-topics.sh,常见命令参数说明:
- --bootstrap-server <String: server toconnet to> 连接的Kafka Broker主机名称和端口号。(该参数必带)
- --topic <String: topic> 操作的topic名称。
- --create 创建主题。
- --delete 删除主题。
- --alter 修改主题。
- --list 查看所有主题。
- --describe 查看主题详细描述。
- --partitons <Integer:# of partitions> 设置分区数。
- --replication-factor<Integer:replication factor> 设置分区副本。
- --config<String:name=value> 更新系统默认的配置。
查看所有topic
./kafka-topics.sh --bootstrap-server [ip:port] --list
最简单的创建topic方式
./kafka-topics.sh --bootstrap-server [ip:port] --topic [topicName] --create
创建时设置分区数、分区副本数
./kafka-topics.sh --bootstrap-server [ip:port] --topic [topicName] --create --partitions [num] --replication-factor [num]
删除topic
./kafka-topics.sh --bootstrap-server [ip:port] --topic [topicName] --delete
查看topic详细信息
./kafka-topics.sh --bootstrap-server [ip:port] --topic [topicName] --describe
生产消息&消费消息
- 生产者的命令脚本是 kafka-console-producer.sh
- 消费者的命令脚本是 kafka-console-consumer.sh
生产者向指定topic发送消息的命令,执行后会进入生产者的界面中
./kafka-console-producer.sh --bootstrap-server [ip:port] --topic [topicName]
同时打开消费者,监听指定topic的消息,执行后会进入到消费者的界面中
./kafka-console-consumer.sh --bootstrap-server [ip:port] --topic [topicName]
然后在生产者输入需要发送的消息,回车就能发送到topic
同时消费者这边,也能收到消息
用这种方式启动消费者,只能看到消费者启动之后的消息,消费者启动之前的消息是看不见的。
如果想要看到所有历史消息,可以消费者启动的命令上,加一个参数:--from-beginning。
如果发送消息的时候,如果出现了这个异常:Error: NOT_LEADER_OR_FOLLOWER,就把该topic删掉,然后重新创建就可以了。详情看:pulsar的 Topic 自动删除_pulsar删除topic_跨行菜鸟运维的博客-CSDN博客