淘先锋技术网

首页 1 2 3 4 5 6 7

Kafka的命令都是基于bin目录下的脚本来使用的。

Topic命令

Topic的命令脚本是kafka-topics.sh,常见命令参数说明:

  • --bootstrap-server <String: server toconnet to>    连接的Kafka Broker主机名称和端口号。(该参数必带)
  • --topic <String: topic>    操作的topic名称。
  • --create    创建主题。
  • --delete    删除主题。
  • --alter    修改主题。
  • --list    查看所有主题。
  • --describe    查看主题详细描述。
  • --partitons <Integer:# of partitions>    设置分区数。
  • --replication-factor<Integer:replication factor>    设置分区副本。
  • --config<String:name=value>    更新系统默认的配置。

查看所有topic

./kafka-topics.sh --bootstrap-server [ip:port] --list

最简单的创建topic方式

./kafka-topics.sh --bootstrap-server [ip:port] --topic [topicName] --create

创建时设置分区数、分区副本数

./kafka-topics.sh --bootstrap-server [ip:port] --topic [topicName] --create --partitions [num] --replication-factor [num]

删除topic

./kafka-topics.sh --bootstrap-server [ip:port] --topic [topicName] --delete

查看topic详细信息

./kafka-topics.sh --bootstrap-server [ip:port] --topic [topicName] --describe

生产消息&消费消息

  • 生产者的命令脚本是 kafka-console-producer.sh
  • 消费者的命令脚本是 kafka-console-consumer.sh

生产者向指定topic发送消息的命令,执行后会进入生产者的界面中

./kafka-console-producer.sh --bootstrap-server [ip:port] --topic [topicName]

同时打开消费者,监听指定topic的消息,执行后会进入到消费者的界面中

./kafka-console-consumer.sh --bootstrap-server [ip:port] --topic [topicName]

然后在生产者输入需要发送的消息,回车就能发送到topic

同时消费者这边,也能收到消息

用这种方式启动消费者,只能看到消费者启动之后的消息,消费者启动之前的消息是看不见的。

如果想要看到所有历史消息,可以消费者启动的命令上,加一个参数:--from-beginning。

如果发送消息的时候,如果出现了这个异常:Error: NOT_LEADER_OR_FOLLOWER,就把该topic删掉,然后重新创建就可以了。详情看:pulsar的 Topic 自动删除_pulsar删除topic_跨行菜鸟运维的博客-CSDN博客