介绍
Apache Flink是用于分布式流和批处理数据处理的开源平台。 Flink是具有多个API的流数据流引擎,用于创建面向数据流的应用程序。
Flink应用程序通常使用Apache Kafka进行数据输入和输出。 本文将指导您逐步使用Apache Flink和Kafka。
先决条件
- Apache Kafka 0.9.x
- 吉特
- Maven 3.x或更高版本
创建您的Flink流项目
第一步是创建Java应用程序,最简单的方法是使用flink-quickstart-java原型,该原型包含核心依赖关系和打包任务。 本文与Apache Flink快速入门示例相似,重点明确介绍了MapR Streams的数据输入和输出。
在此应用程序中,我们将创建两个作业:
-
WriteToKafka
:生成随机字符串,然后使用Kafka Flink连接器及其Producer API将其发布到MapR Streams主题。 -
ReadFromKafka
:读取相同的主题,并使用Kafka Flink连接器及其使用方在标准输出中显示消息。 API。
完整项目可在GitHub上找到:
让我们使用Apache Maven创建项目:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink\
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.1.2 \
-DgroupId=com.grallandco.demos \
-DartifactId=kafka-flink-101 \
-Dversion=1.0-SNAPSHOT \
-DinteractiveMode=false
Maven将创建以下结构:
tree kafka-flink-101/
kafka-flink-101/
├── pom.xml
└── src
└── main
├── java
│ └── com
│ └── grallandco
│ └── demos
│ ├── BatchJob.java
│ ├── SocketTextStreamWordCount.java
│ ├── StreamingJob.java
│ └── WordCount.java
└── resources
└── log4j.properties
7 directories, 6 files
该项目被配置为创建一个Jar文件,该文件包含您的flink项目代码,还包括运行该文件所需的所有依赖项。
该项目包含其他一些示例工作,本文不需要它们,您可以将其用于教育目的,也可以将其从项目中删除。
添加Kafka连接器
打开pom.xml
并将以下依赖项添加到您的项目中:
第一步,我们必须添加Flink Kafka连接器作为依赖项,以便我们可以使用Kafka接收器。 将此添加到“依赖项”部分的pom.xml文件中:
您现在必须添加Flink Kafka Connector依赖项才能使用Kafka接收器。 在<dependencies>
元素中添加以下条目:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
现在,Flink项目已准备就绪,可以通过Kafka连接器使用DataStream,因此您可以从Apache Kafka发送和接收消息。
安装并启动Kafka
下载Kafka,在终端中输入以下命令:
curl -O http://www.us.apache.org/dist/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
tar -xzf kafka_2.11-0.9.0.0.tgz
cd kafka_2.11-0.9.0.0
Kafka使用ZooKeeper,如果您没有运行Zookeeper,则可以使用以下命令启动它:
./bin/zookeeper-server-start.sh config/zookeeper.properties
通过在新终端中运行以下命令来启动Kafka代理:
./bin/kafka-server-start.sh config/server.properties
在另一个终端中,运行以下命令来创建一个名为flink-demo
的Kafka主题:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-demo
使用Kafka工具将消息发布和使用到flink-demo
主题。
制片人
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-demo
消费者
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic flink-demo --from-beginning
在生产者窗口中,您可以发布一些消息,并在消费者窗口中查看它们。 我们将使用这些工具来跟踪Kafka和Flink之间的交互。
编写您的Flink应用程序
现在让我们使用Flink Kafka Connector将消息发送到Kafka并使用它们。
制片人
生产者使用SimpleStringGenerator()
类生成消息,并将该字符串发送到flink-demo
主题。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", “localhost:9092");
DataStream<String> stream = env.addSource(new SimpleStringGenerator());
stream.addSink(new FlinkKafkaProducer09<>("flink-demo", new SimpleStringSchema(), properties));
env.execute();
}
SimpleStringGenerator()
方法代码在此处可用。
主要步骤是:
- 在任何Flink应用程序的基础上创建一个新的
StreamExecutionEnvironment
- 在应用程序环境中创建一个新的
DataStream
时,SimpleStringGenerator
类将Flink中所有流数据源的Source接口实现SourceFunction 。 - 将
FlinkKafkaProducer09
器添加到主题。
消费者
使用者只需从flink-demo
主题中读取消息,然后将它们打印到控制台中即可。
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", “localhost:9092");
properties.setProperty("group.id", "flink_consumer");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer09<>(
"flink-demo", new SimpleStringSchema(), properties) );
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Stream Value: " + value;
}
}).print();
env.execute();
}
主要步骤是:
- 在任何Flink应用程序的基础上创建一个新的
StreamExecutionEnvironment
- 使用消费者信息创建一组属性,在此应用程序中,我们只能设置消费者
group.id
。 - 使用
FlinkKafkaConsumer09
从主题flink-demo
获取消息
生成并运行应用程序
让我们直接从Maven(或从您最喜欢的IDE)运行应用程序。
1-建立专案:
$ mvn clean package
2-运行Flink生产者作业
$ mvn exec:java -Dexec.mainClass=com.mapr.demos.WriteToKafka
3-运行Flink消费者工作
$ mvn exec:java -Dexec.mainClass=com.mapr.demos.ReadFromKafka
在终端中,您应该看到生产者生成的消息
现在,您可以在Flink群集上部署并执行此作业。
结论
在本文中,您学习了如何将Flink与kafka结合使用来写入和读取数据流。
翻译自: https://www.javacodegeeks.com/2016/10/getting-started-apache-flink-kafka.html