淘先锋技术网

首页 1 2 3 4 5 6 7

首先对kafka的科普,这里不会讲了,不过这有一篇文章,我感觉用来入门还是挺好。

深入浅出理解基于 Kafka 和 ZooKeeper 的分布式消息队列icon-default.png?t=M1L8https://gitbook.cn/books/5ae1e77197c22f130e67ec4e/index.html​​​​​​​

操作系统:macOS12

mac M1系统,已经不支持用brew直接安装使用kafka了,所以需要到官网下载安装包。

当然,首先需要在电脑上配置好java环境,这个就不在这说了

在终端输入 java -version 显示类似如下信息,说明就配置好java环境了

openjdk version "1.8.0_322"
OpenJDK Runtime Environment (Zulu 8.60.0.21-CA-macos-aarch64) (build 1.8.0_322-b06)
OpenJDK 64-Bit Server VM (Zulu 8.60.0.21-CA-macos-aarch64) (build 25.322-b06, mixed mode)

其实完全按照kafka官网给的指导就能顺利操作成功

1. 先下载下来安装包,按照官网建议,选择第一行的链接。

然后解压,进入到安装包内,目录如下:

 

 2. 开启kafka环境:

        在此文件夹中打开终端,先开启的是zookeeper服务:

bin/zookeeper-server-start.sh config/zookeeper.properties

        然后再打开一个终端,这次开启的是kafka服务:

bin/kafka-server-start.sh config/server.properties

如果两条命令都没有报错,那么就可以认定kafka的运行环境启动成功了

3. 安装python运行kafka的标准库

  官网上给的这一步,是基于命令行操作的kafka,在这我们要做的是利用python操作kafka。

  所以,接下来要先安装kafka的python包,如下:

pip install kafka-python

  这里需要说的是,我还没有找到可以像brew启动服务一样启动kafka服务,所以暂定以第2步这种比较麻烦的方式开启服务。当然如果读者有更好的,欢迎留言告诉我。

4. 一个kafka-python的demo

# kafka_demo.py

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json

def producer_demo():
    # 假设生产的消息是json格式
    producer = KafkaProducer(  # 实例化一个生产者
        bootstrap_servers=['localhost:9092'], 
        key_serializer=lambda k: json.dumps(k).encode(),
        value_serializer=lambda v: json.dumps(v).encode())
    # 发送消息
    for i in range(5):
        future = producer.send(
            'kafka_demo',
            key='count_num',  # 同一个key值,会被送至同一个分区
            value=str(i))
        print(f"send {i}")
        try:
            future.get(timeout=10) # 监控是否发送成功           
        except KafkaError as e:  # 发送失败抛出KafkaError
            print(e)
            
def consumer_demo():
    consumer = KafkaConsumer(  # 实例化一个消费者
        'kafka_demo', 
        bootstrap_servers=':9092',
        group_id='test'
    )
    for message in consumer:
        print("receive, key: {}, value: {}".format(
            json.loads(message.key.decode()),
            json.loads(message.value.decode())
            )
        )
        

相信大家在其他地方已经学习了kafka的生产-消费模式,这里就不多说了。

代码中的两个方法要分开执行,可以进入python交互模式导入kafka_demo,然后依次执行两个方法。

待续.....