淘先锋技术网

首页 1 2 3 4 5 6 7

Kafka Streams

1、概述

Kafka一直被认为是一个强大的消息中间件,它实现了高吞吐、高可用和低延时的消息传输能力,这让它成为流式处理系统中完美的数据来源。目前通用的一些流式处理框架如Apache Spark、Apache Flink、Apache Storm等都可以将Kafka作为可靠的数据来源。但遗憾的是,在0.l0.x版本之前,Kafka还并不具备任何数据处理的能力,但在此之后,Kafka Streams应运而生。

Kafka Streams是一个用于处理和分析数据的客户端库。它先把存储在Kafka中的数据进行处理和分析,然后将最终所得的数据结果回写到Kafka或发送到外部系统。它建立在一些非常重要的流式处理概念之上,例如适当区分事件时间和处理时间、窗口支持,以及应用程序状态的简单(高效)管理。同时,它也基于Kafka中的许多概念,例如通过划分主题进行扩展。此外,由于这个原因,它作为一个轻量级的库可以集成到应用程序中。这个应用程序可以根据需要独立运行、在应用程序服务器中运行、作为Docker容器,或者通过资源管理器(如Mesos)进行操作。

Kafka Streams直接解决了流式处理中的很多问题:

  • 毫秒级延迟的逐个事件处理。
  • 有状态的处理,包括连接(join)和聚合类操作。
  • 提供了必要的流处理原语,包括高级流处理DSL和低级处理器API。高级流处理DSL提供了常用流处理变换操作,低级处理器API支持客户端自定义处理器并与状态仓库交互。
  • 使用类似DataFlow的模型对无序数据进行窗口化处理。
  • 具有快速故障切换的分布式处理和容错能力。
  • 无停机滚动部署。

2、单词统计

单词统计是流式处理领域中最常见的示例,这里我们同样使用它来演示一下Kafka Streams的用法。在Kafka的代码中就包含了一个单词统计的示例程序,即org.apache.kafka.streams. examples.wordcount.WordCountDemo,这个示例中以硬编码的形式用到了两个主题:
streams-plaintext-inputstreams-wordcount-output。为了能够使示例程序正常运行,我们需要预先准备好这两个主题:

./kafka-topics.sh --create --bootstrap-server localhost:9092 --topic streams-plaintext-input --replication-factor 1 --partitions 1

./kafka-topics.sh --create --bootstrap-server localhost:9092 --topic streams-wordcount-output --replication-factor 1 --partitions 1

这两个主题的详细信息如下:

./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic streams-plaintext-input,streams-wordcount-output

在这里插入图片描述

之后就可以运行WordCountDemo这个示例:

./kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

这个示例程序将从主题streams-plaintext-input中读取消息,然后对读取的消息执行单词统计,并将结果持续写入主题streams-wordcount-output。

使用Kafka自带的console producer来生产一些输入数据供WordCount程序消费:

./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input

再运行console consumer脚本来验证WordCount程序的计算结果:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer 

输入一些单词:
在这里插入图片描述
统计结果:输出结果中的第一列是消息的key,这里表示被计数的单词,第二列是消息的value,这里表示该单词的最新计数。
在这里插入图片描述

下面我们通过WordCountDemo程序来了解一下Kafka Streams的开发方式,WordCountDemo程序如下所示:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.3.0</version>
</dependency>
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

/**
 * @Author: acton_zhang
 * @Date: 2023/8/17 11:41 下午
 * @Version 1.0
 * 单词统计
 */
public class WordCountDemo {
    public static void main(String[] args) {
        //构建Kafka Streams的配置
        Properties props = new Properties();
        //每个Kafka Streams应用程序必须要有一个application.id,这个applicationID用于协调应用实例
        //也用于命名内部的本地存储和相关主题。在整个Kafka集群中,applicationId必须是唯一的
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        //bootstrap.servers配置的是Kafka集群的地址,必填
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //最大缓冲字节数
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        //default.key.serde设置key的序列化器
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        //default.value.serde设置value的序列化器
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        //创建StreamBuilder实例
        StreamsBuilder builder = new StreamsBuilder();
        //创建一个KStream实例,并设定了输入主题streams-plaintext-input
        KStream<String, String> source = builder.stream("streams-plaintext-input");
        //具体的单词统计逻辑
        //KStream是一个由键值对构成的抽象记录流,每个键值对是一个独立单元,即使相同的key也不会被覆盖,类似数据库的插入操作
        //KTable可以理解成一个基于表主键的日志更新流,相同key的每条记录只保存最新的一条记录,类似数据库中基于主键的更新
        //无论记录流(用KStream定义),还是更新日志流(用KTable定义),都可以从一个或多个Kafka主题数据源来创建。
        //一个KStream可以与另一个KStream或KTable进行Join操作,或者聚合成一个KTable。同样,一个KTable也可以转换成一个KStream。
        //KStream和KTable都提供了一系列转换操作,每个转换操作都可以转化为一个KStream或KTable对象,将这些转换操作连接在一起就构成了一个处理器拓扑。
        KTable<String, Long> counts = source.flatMapValues(value -> Arrays.asList(
                value.toLowerCase(Locale.getDefault()).split(" ")))
                .groupBy((key, value) -> value)
                .count();
        //toStream().to()将单词统计结果写入输出主题streams-wordcount-output,key是String类型,value是Long类型
        counts.toStream().to("streams-wordcount-output",
                Produced.with(Serdes.String(), Serdes.Long()));
        //基于拓扑和配置来订阅一个KafkaStreams对象
        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        final CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });
        //启动KafkaStreams引擎
        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}