目录
创建执行环境
编 写 Flink 程 序 的 第 一 步 , 就 是 创 建 执 行 环 境 。 我 们 要 获 取 的 执 行 环 境 , 是
StreamExecutionEnvironment 类的对象,这是所有 Flink 程序的基础。在代码中创建执行环境的
方式,就是调用这个类的静态方法,具体有以下三种。
1. getExecutionEnvironment
最简单的方式,就是直接调用 getExecutionEnvironment 方法。它会根据当前运行的上下文
直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了 jar
包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方
法会根据当前运行的方式,自行决定该返回什么样的运行环境。
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
这种“智能”的方式不需要我们额外做判断,用起来简单高效,是最常用的一种创建执行环境的方式。
2. createLocalEnvironment
这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果
不传入,则默认并行度就是本地的 CPU 核心数。
StreamExecutionEnvironment localEnv =StreamExecutionEnvironment.createLocalEnvironment();
3. createRemoteEnvironment
这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定
要在集群中运行的 Jar 包。
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("host", // JobManager 主机名1234, // JobManager 进程端口号"path/to/jarFile.jar" // 提交给 JobManager 的 JAR 包);
在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。比如可以全局设置程
序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。关于时间语义和容错
机制,我们会在后续的章节介绍。
执行模式(Execution Mode)
上节中我们获取到的执行环境,是一个 StreamExecutionEnvironment ,顾名思义它应该是
做流处理的。那对于批处理,又应该怎么获取执行环境呢?
在之前的 Flink 版本中,批处理的执行环境与流处理类似,是调用类 ExecutionEnvironment
的静态方法,返回它的对象:
// 批处理环境ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();// 流处理环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
基于 ExecutionEnvironment 读入数据创建的数据集合,就是 DataSet ;对应的调用的一整
套转换方法,就是 DataSet API 。这些我们在第二章的批处理 word count 程序中已经有了基本
了解。
而从 1.12.0 版本起, Flink 实现了 API 上的流批统一。 DataStream API 新增了一个重要特
性:可以支持不同的“执行模式”( execution mode ),通过简单的设置就可以让一段 Flink 程序
在流处理和批处理之间切换。这样一来, DataSet API 也就没有存在的必要了。
流执行模式(STREAMING)
这是 DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是 STREAMING 执行模式。
批执行模式(BATCH)
专门用于批处理的执行模式, 这种模式下, Flink 处理作业的方式类似于 MapReduce 框架。
对于不会持续计算的有界数据,我们用这种模式处理会更方便。
自动模式(AUTOMATIC)
在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。
1. BATCH 模式的配置方法
由于 Flink 程序默认是 STREAMING 模式,我们这里重点介绍一下 BATCH 模式的配置。
主要有两种方式:
( 1 )通过命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH ...
在提交作业时,增加 execution.runtime-mode 参数,指定值为 BATCH 。
(2)通过代码配置
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.BATCH);
在代码中,直接基于执行环境调用 setRuntimeMode 方法,传入 BATCH 模式。
建议: 不要在代码中配置,而是使用命令行。这同设置并行度是类似的:在提交作业时指
定参数可以更加灵活,同一段应用程序写好之后,既可以用于批处理也可以用于流处理。而在
代码中硬编码( hard code )的方式可扩展性比较差,一般都不推荐。
2. 什么时候选择 BATCH 模式
我们知道,Flink 本身持有的就是流处理的世界观,即使是批量数据,也可以看作“有界
流”来进行处理。所以 STREAMING 执行模式对于有界数据和无界数据都是有效的;而 BATCH
模式仅能用于有界数据。
看起来 BATCH 模式似乎被 STREAMING 模式全覆盖了,那还有必要存在吗?我们能不
能所有情况下都用流处理模式呢?
当然是可以的,但是这样有时不够高效。
我们可以仔细回忆一下 word count 程序中, 批处理和流处理输出的不同:在 STREAMING
模式下,每来一条数据,就会输出一次结果(即使输入数据是有界的);而 BATCH 模式下,
只有数据全部处理完之后,才会一次性输出结果。 最终的结果两者是一致的,但是流处理模式
会将更多的中间结果输出。在本来输入有界、只希望通过批处理得到最终的结果的场景下,
STREAMING 模式的逐个输出结果就没有必要了。
所以总结起来,一个简单的原则就是:用 BATCH 模式处理批量数据,用 STREAMING
模式处理流式数据。因为数据有界的时候,直接输出结果会更加高效;而当数据无界的时候 , 我
们没得选择——只有 STREAMING 模式才能处理持续的数据流.
当然,在后面的示例代码中,即使是有界的数据源,我们也会统一用 STREAMING 模式
处理。这是因为我们的主要目标还是构建实时处理流数据的程序,有界数据源也只是我们用来
测试的手段。
触发程序执行
有了执行环境,我们就可以构建程序的处理流程了:基于环境读取数据源,进而进行各种
转换操作,最后输出结果到外部系统。
需要注意的是,写完输出(sink )操作并不代表程序已经结束。因为当 main() 方法被调用
时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据
——因为数据可能还没来。 Flink 是由事件驱动的,只有等到数据到来,才会触发真正的计算,
这也被称为“延迟执行”或“懒执行”( lazy execution )。
所以我们需要显式地调用执行环境的 execute() 方法,来触发程序执行。 execute() 方法将一
直等待作业完成,然后返回一个执行结果( JobExecutionResult )。
env.execute();
数据源操作
读取kafka数据源操作
Kafka 作为分布式消息传输队列,是一个高吞吐、易于扩展的消息系统。而消息队列的传
输方式,恰恰和流处理是完全一致的。所以可以说 Kafka 和 Flink 天生一对,是当前处理流式
数据的双子星。在如今的实时流处理应用中,由 Kafka 进行数据的收集和传输, Flink 进行分
析计算,这样的架构已经成为众多企业的首选,如图 5-3 所示。
略微遗憾的是,与 Kafka 的连接比较复杂, Flink 内部并没有提供预实现的方法。所以我
们只能采用通用的 addSource 方式、实现一个 SourceFunction 了。
好在Kafka 与 Flink 确实是非常契合,所以 Flink 官方提供了连接工具 flink-connector-kafka ,
直接帮我们实现了一个消费者 FlinkKafkaConsumer ,它就是用来读取 Kafka 数据的
SourceFunction 。
所以想要以 Kafka 作为数据源获取数据,我们只需要引入 Kafka 连接器的依赖。 Flink 官
方提供的是一个通用的 Kafka 连接器,它会自动跟踪最新版本的 Kafka 客户端。目前最新版本
只支持 0.10.0 版本以上的 Kafka ,读者使用时可以根据自己安装的 Kafka 版本选定连接器的依
赖版本。这里我们需要导入的依赖如下。
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>
然后调用 env.addSource() ,传入 FlinkKafkaConsumer 的对象实例就可以了。
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class SourceKafkaTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop102:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
DataStreamSource<String> stream = env.addSource(new
FlinkKafkaConsumer<String>(
"clicks",
new SimpleStringSchema(),
properties
));
stream.print("Kafka");
env.execute();
}
}
创建 FlinkKafkaConsumer 时需要传入三个参数:
第一个参数 topic ,定义了从哪些主题中读取数据。可以是一个 topic ,也可以是 topic
列表,还可以是匹配所有想要读取的 topic 的正则表达式。当从多个 topic 中读取数据
时, Kafka 连接器将会处理所有 topic 的分区,将这些分区的数据放到一条流中去。
第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema 。 Kafka 消
息被存储为原始的字节数据,所以需要反序列化成 Java 或者 Scala 对象。上面代码中
使用的 SimpleStringSchema ,是一个内置的 DeserializationSchema ,它只是将字节数
组简单地反序列化成字符串。 DeserializationSchema 和 KeyedDeserializationSchema 是
公共接口,所以我们也可以自定义反序列化逻辑。
第三个参数是一个 Properties 对象,设置了 Kafka 客户端的一些属性。
自定义Source
大多数情况下,前面的数据源已经能够满足需要。但是凡事总有例外,如果遇到特殊情况,
我们想要读取的数据源来自某个外部系统,而 flink 既没有预实现的方法、也没有提供连接器,
又该怎么办呢?
那就只好自定义实现 SourceFunction 了。
接下来我们创建一个自定义的数据源,实现 SourceFunction 接口。主要重写两个关键方法:
run() 和 cancel() 。
run() 方法:使用运行时上下文对象( SourceContext )向下游发送数据;
⚫ cancel() 方法:通过标识位控制退出循环,来达到中断数据源的效果。
代码如下:
我们先来自定义一下数据源:
package com.atmk.stream.app;
import com.atmk.stream.entity.Event;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Calendar;
import java.util.Random;
/**
* @author:lss
* @date:2022/11/3 17:18
* @description:some
*/
public class ClickSource implements SourceFunction<Event> {
//声明一个变量,作为控制数据生成的标识位
private Boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
//在指定数据集中随机选取数据
Random random = new Random();
String[] users = {"Mary","Bob","Alice","Cary"};
String[] urls = {"./home","./cart","./fav","./prod?id=1"};
while (running){
ctx.collect(new Event(
users[random.nextInt(users.length)],
urls[random.nextInt(urls.length)],
Calendar.getInstance().getTimeInMillis()
));
//隔一秒生成一个点击事件,方面观测
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
这个数据源,我们后面会频繁使用,所以在后面的代码中涉及到 ClickSource() 数据源,使
用上面的代码就可以了。
下面的代码我们来读取一下自定义的数据源。有了自定义的 source function ,接下来只要
调用 addSource() 就可以了:
env.addSource(new ClickSource())
下面是完整的代码:
package com.atmk.stream.app;
import com.atmk.stream.entity.Event;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author:lss
* @date:2022/11/3 17:26
* @description:some
*/
public class SourceCustom {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//使用自定义的source function,调用addSource方法
DataStreamSource<Event> stream = env.addSource(new ClickSource());
stream.print("SourceCustom");
env.execute();
}
}
这里要注意的是 SourceFunction 接口定义的数据源,并行度只能设置为 1 ,如果数据源设
置为大于 1 的并行度,则会抛出异常。如下程序所示:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
public class SourceThrowException {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new ClickSource()).setParallelism(2).print();
env.execute();
}
}
输出的异常如下:
Exception in thread "main" java.lang.IllegalArgumentException: The parallelism
of non parallel operator must be 1.
所以如果我们想要自定义并行的数据源的话,需要使用 ParallelSourceFunction ,示例程序
如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.Random;
public class ParallelSourceExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new CustomSource()).setParallelism(2).print();
env.execute();
}
public static class CustomSource implements ParallelSourceFunction<Integer>
{
private boolean running = true;
private Random random = new Random();
@Override
public void run(SourceContext<Integer> sourceContext) throws Exception {
while (running) {
sourceContext.collect(random.nextInt());
}
}
@Override
public void cancel() {
running = false;
}
}
}
输出结果如下:
2> -6861690472> 4295153972> -2235162882> 11379073122> -3801657302> 2082090389