首先对kafka的科普,这里不会讲了,不过这有一篇文章,我感觉用来入门还是挺好。
深入浅出理解基于 Kafka 和 ZooKeeper 的分布式消息队列https://gitbook.cn/books/5ae1e77197c22f130e67ec4e/index.html
操作系统:macOS12
mac M1系统,已经不支持用brew直接安装使用kafka了,所以需要到官网下载安装包。
当然,首先需要在电脑上配置好java环境,这个就不在这说了
在终端输入 java -version 显示类似如下信息,说明就配置好java环境了
openjdk version "1.8.0_322"
OpenJDK Runtime Environment (Zulu 8.60.0.21-CA-macos-aarch64) (build 1.8.0_322-b06)
OpenJDK 64-Bit Server VM (Zulu 8.60.0.21-CA-macos-aarch64) (build 25.322-b06, mixed mode)
其实完全按照kafka官网给的指导就能顺利操作成功
1. 先下载下来安装包,按照官网建议,选择第一行的链接。
然后解压,进入到安装包内,目录如下:
2. 开启kafka环境:
在此文件夹中打开终端,先开启的是zookeeper服务:
bin/zookeeper-server-start.sh config/zookeeper.properties
然后再打开一个终端,这次开启的是kafka服务:
bin/kafka-server-start.sh config/server.properties
如果两条命令都没有报错,那么就可以认定kafka的运行环境启动成功了
3. 安装python运行kafka的标准库
官网上给的这一步,是基于命令行操作的kafka,在这我们要做的是利用python操作kafka。
所以,接下来要先安装kafka的python包,如下:
pip install kafka-python
这里需要说的是,我还没有找到可以像brew启动服务一样启动kafka服务,所以暂定以第2步这种比较麻烦的方式开启服务。当然如果读者有更好的,欢迎留言告诉我。
4. 一个kafka-python的demo
# kafka_demo.py
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json
def producer_demo():
# 假设生产的消息是json格式
producer = KafkaProducer( # 实例化一个生产者
bootstrap_servers=['localhost:9092'],
key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode())
# 发送消息
for i in range(5):
future = producer.send(
'kafka_demo',
key='count_num', # 同一个key值,会被送至同一个分区
value=str(i))
print(f"send {i}")
try:
future.get(timeout=10) # 监控是否发送成功
except KafkaError as e: # 发送失败抛出KafkaError
print(e)
def consumer_demo():
consumer = KafkaConsumer( # 实例化一个消费者
'kafka_demo',
bootstrap_servers=':9092',
group_id='test'
)
for message in consumer:
print("receive, key: {}, value: {}".format(
json.loads(message.key.decode()),
json.loads(message.value.decode())
)
)
相信大家在其他地方已经学习了kafka的生产-消费模式,这里就不多说了。
代码中的两个方法要分开执行,可以进入python交互模式导入kafka_demo,然后依次执行两个方法。
待续.....