1、什么是Flink中的转换算子
在使用 Flink DataStream API 开发流式计算任务时,可以将一个或多个 DataStream 转换成新的 DataStream,在应用程序中可以将多个数据转换算子合并成一个复杂的数据流拓扑图。
2、常用的转换算子
Flink提供了功能各异的转换算子,Map,FlatMap,Filter,KeyBy,Reduce,Window,WindowAll...
通过操作各种转换算子,来获取新的DataStream及子类的实例,来完成计算需求。
Tips: 下面测试用例基于 Flink1.17.0、java1.8 编写
3、基本转换算子(map/ filter/ flatMap)
3.1 Map
功能说明:
DataStream[T] → DataStream[R]
输入一个元素同时输出一个元素,可以对元素的数据类型和内容做转换,好比SQL中的UDF函数
代码示例:
package com.baidu.datastream.transform;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Map {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.使用 Map 算子
// 方式1:使用 Lambda表达式
env.fromElements("刘备", "张飞", "关羽", "赵云", "马超", "黄忠")
.map(value -> value + "_")
.print();
// 方式2:使用 MapFunction实现类
/*
* TODO MapFunction<T, O>
* 功能说明:
* 对元素做1:1映射转换
* 泛型说明:
* @T : 输入数据类型
* @O : 输出数据类型
* */
MapFunction<String, Integer> mapFunction = new MapFunction<String, Integer>() {
@Override
public Integer map(String value) throws Exception {
return value.length();
}
};
env.fromElements("刘备", "张飞", "关羽", "赵云", "马超", "黄忠")
.map(mapFunction)
.print();
// 3.触发程序执行
env.execute();
}
}
执行结果:
3.2 FlatMap
功能说明:
DataStream[T] → DataStream[R]
输入一个元素同时产生零个、一个或多个元素,好比SQL中的UDTF(1对多)函数
代码示例:
package com.baidu.datastream.transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class FlatMap {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.使用 FlatMap 算子
// 方式:使用 flatMapFunction实现类
/*
* TODO flatMapFunction<T, O>
* 功能说明:
* 对输入元素做1:多的转换(好比SQL中的UDTF函数)
* 泛型说明:
* @T : 输入数据类型
* @O : 输出数据类型
* */
FlatMapFunction<String, String> flatMapFunction = new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String s : value.split("_")) {
out.collect(s);
}
}
};
env.fromElements("刘_备", "张_飞", "关_羽", "赵_云", "马_超", "黄_忠")
.flatMap(flatMapFunction)
.print();
// 3.触发程序执行
env.execute();
}
}
执行结果:
3.3 Filter
功能说明:
DataStream[T] → DataStream[T]
为每个元素执行一个逻辑判断,并保留那些判断为 true 的元素,好比SQL中的where
代码示例:
package com.baidu.datastream.transform;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Filter {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.使用 Filter 算子
// 方式1:使用 Lambda表达式
env.fromElements("刘备", "张飞", "关羽", "赵云", "马超", "黄忠")
.filter(value -> value.equals("刘备"))
.print();
// 方式2:使用 FilterFunction实现类
/*
* TODO FilterFunction<T, O>
* 功能说明:
* 对元素过滤处理
* 泛型说明:
* @T : 输入数据类型
* */
FilterFunction<String> filterFunction = new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.equals("张飞");
}
};
env.fromElements("刘备", "张飞", "关羽", "赵云", "马超", "黄忠")
.filter(filterFunction)
.print();
// 3.触发程序执行
env.execute();
}
}
执行结果:
4、聚合算子
4.1 KeyBy(按键分区)
功能说明:
DataStream[T] → KeyedStream[T,K]
根据指定的字段(key),将数据划分到不相交的分区中。相同key的元素会被分到同一个分区中。
分区规则:
分区编号 = 指定字段(key) 的哈希值 % 分区个数(并行度)
思考:
1、哪些 数据类型 不能作为分区的key?
数组类型不能作为key
当key的类型为bean类型时,bean类必须要重写hashCode方法
代码示例:
package com.baidu.datastream.transform;
import com.baidu.bean.FlinkUser;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KeyBy {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
// 2.使用 KeyBy 算子
// 方式1:使用 Lambda表达式
// TODO key的类型为 String
KeyedStream<String, String> stringKeyedStream = env.fromElements("蜀_刘备", "蜀_关羽", "魏_曹操", "吴_孙权", "吴_孙坚", "吴_孙策").keyBy(value -> value.split("_")[0]);
stringKeyedStream.print();
// TODO key的类型为 bean (需重写hashCode方法)
KeyedStream<FlinkUser, FlinkUser> userKeyedStream = env.fromElements(new FlinkUser(1L, "x", 100L), new FlinkUser(2L, "x", 110L), new FlinkUser(3L, "y", 120L), new FlinkUser(4L, "y", 130L), new FlinkUser(5L, "z", 140L)).keyBy(user -> user);
// TODO key的类型为 数组(不支持)
// KeyedStream<String, String[]> arrayKeyedStream = env.fromElements("蜀_刘备", "蜀_关羽", "魏_曹操", "吴_孙权", "吴_孙坚", "吴_孙策")
// .keyBy(value -> value.split("_"));
// 方式2:使用 KeySelector实现类
/*
* TODO KeySelector<IN, KEY>
* 功能说明:
* 从输入的数据中提取key,然后根据 `key的hashcode%并行度` 进行分区
* 注意:这里的分区是逻辑分区
* 泛型说明:
* @IN : 输入数据类型
* @KEY : key的数据类型
* 重要提示:
* 什么类型的数据不能作为key呢?
* 1.当 POJO 类且没有重写 hashCode() 方法而是依赖依赖于 Object.hashCode() 实现时
* 2.任意类型的数组
* */
KeySelector<FlinkUser, String> keySelector = new KeySelector<FlinkUser, String>() {
@Override
public String getKey(FlinkUser value) throws Exception {
return value.name;
}
};
KeyedStream<FlinkUser, String> userNameKeyedStream = env.fromElements(new FlinkUser(1L, "x", 100L), new FlinkUser(2L, "x", 110L), new FlinkUser(3L, "y", 120L), new FlinkUser(4L, "y", 130L), new FlinkUser(5L, "z", 140L)).keyBy(keySelector);
// max("字段名称") pojo类一定要含有空参构造
//userNameKeyedStream.sum("id").print();
// 3.触发程序执行
env.execute();
}
}
执行结果:
4.2 Reduce
功能说明:
KeyedStream[T,K] → DataStream[T]
在相同key的数据流上`滚动`执行聚合操作。将当前元素与上次一次聚合后得到的值(保存的状态值)组合然后输出新值,并将这个值作为状态进行保存。
Reduce函数的弊端:
聚合前数据类型 = 聚合后数据类型,不能修改数据类型
不能提供初始值进行聚合操作,当只有一个元素时,不会触发reduce函数
代码示例:
package com.baidu.datastream.transform;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Reduce {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// 2.使用 Reduce 算子
/*
* TODO ReduceFunction<T>
* 功能说明:
* 对相同key中的元素进行聚合操作(依次聚合)
* 泛型说明:
* 输入数据和输出数据的类型
* 重要说明:
* 这种聚合方式不能修改value的数据类型
*
* */
ReduceFunction<Tuple2<String, Integer>> reduceFunction = new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2(value1.f0, value1.f1 + value2.f1);
}
};
// 统计每个国家出现的次数
env.fromElements("蜀_刘备", "蜀_关羽", "魏_曹操", "吴_孙权", "吴_孙坚", "吴_孙策")
.map(
new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return new Tuple2(value.split("_")[0], 1);
}
}
)
.keyBy(
new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}
)
.reduce(reduceFunction)
.print()
;
// 3.触发程序执行
env.execute();
}
}
运行结果:
4.3 sum、min、max、minBy、maxBy
功能说明:
KeyedStream[T,K] → DataStream[T]
在相同key的数据流上`滚动`执行相应聚合操作。
min、minBy的区别:
min:聚合状态中保存的是第一个元素的非聚合字段
minBy:聚合状态中保存的是当前元素的非聚合字段
代码示例:
package com.baidu.datastream.transform;
import com.baidu.bean.FlinkUser;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SumMinMaxMinByMaxBy {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
KeyedStream<FlinkUser, String> userKeyedStream = env.fromElements(
new FlinkUser(1L, "x", 100L),
new FlinkUser(2L, "x", 110L),
new FlinkUser(3L, "x", 120L),
new FlinkUser(4L, "x", 130L),
new FlinkUser(5L, "y", 140L)
).keyBy(user -> user.name);
/*
* TODO max("")、max(num)
* 功能说明:
* 根据指定的字段,做聚合操作
* 怎样指定聚合字段:
* 当 value类型为 pojo时,通过 max("字段名称") 来指定字段
* 当 value类型为 tuple时,通过 max(num) 来指定字段
* 重点说明:
* 当 value类型为pojo时,必须实现空参构造方法,才能提取字段
* */
//userKeyedStream.max("id").print();
//userKeyedStream.min("id").print();
//userKeyedStream.sum("id").print();
//userKeyedStream.maxBy("id").print();
userKeyedStream.minBy("id").print();
env.execute();
}
}
5、物理分区算子
Flink提供了将数据重新分区的方法,当任务发生数据倾斜时,这个算子会很有用。
5.1 shuffle - 随机分区
功能说明:
DataStream[T] → DataStream[T]
将元素随机地均匀分配到下游分区
Tips:
因为是完全随机,当输入相同时,每次执行的结果可能会不同
代码示例:
package com.baidu.datastream.transform;
import com.baidu.bean.FlinkUser;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Shuffle {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
KeyedStream<FlinkUser, String> userKeyedStream = env.fromElements(
new FlinkUser(1L, "x", 100L),
new FlinkUser(2L, "x", 110L),
new FlinkUser(3L, "x", 120L),
new FlinkUser(4L, "x", 130L),
new FlinkUser(5L, "x", 140L),
new FlinkUser(6L, "x", 150L)).keyBy(user -> user.name);
/*
* TODO 问题:由于 keyBy 算子,导致数据倾斜(key相同,导致数据都被同一个并行子任务处理)
* 我们可以使用 shuffle 算子将数据均匀的在分配到其他并行子任务中去
* 重点提示:
* shuffle 算子只能操作 DataStream,不能操作 KeyedStream
* */
userKeyedStream.sum("id").shuffle().print();
env.execute();
}
}
运行结果:
5.2 rebalance - 轮询分区
功能说明:
DataStream[T] → DataStream[T]
使用Round-Robin负载均衡算法,将输入的数据平均的分配到下游分区中去。
代码示例:
package com.baidu.datastream.transform;
import com.baidu.bean.FlinkUser;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Rebalance {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
KeyedStream<FlinkUser, String> userKeyedStream = env.fromElements(
new FlinkUser(1L, "x", 100L),
new FlinkUser(2L, "x", 110L),
new FlinkUser(3L, "x", 120L),
new FlinkUser(4L, "x", 130L),
new FlinkUser(5L, "x", 140L),
new FlinkUser(6L, "x", 150L)).keyBy(user -> user.name);
/*
* TODO 问题:由于 keyBy 算子,导致数据倾斜(key相同,导致数据都被同一个并行子任务处理)
* 我们可以使用 rebalance 算子将数据均匀的在分配到其他并行子任务中去
* 重点提示:
* rebalance 算子只能操作 DataStream,不能操作 KeyedStream
* */
userKeyedStream.sum("id").rebalance().print();
env.execute();
}
}
运行结果:
5.3 rescale - 重缩分区
功能说明:
DataStream[T] → DataStream[T]
使用Round-Robin负载均衡算法,将以分区为单位将输入的数据平均的分配到下游分区中去。
和rebalance的区别:
rebalance将输入数据作为一个整体,根据数据输入的顺序随机分发到下游分区(涉及到了网络传输)
rescale将以上游分区为单位,随机的分配到下游分区中去
使用场景:
当source算子为可并发数据源时(如kafka,5个分区),设置5个Task来读取分别读取每个分区的数据
此时,可以使用rescale来分发到下游实现负载均衡,这样可以做到数据只在本地传输而不是网络传输
5.4 global - 全局分区
功能说明:
DataStream[T] → DataStream[T]
将元素分发到下游的一个分区中去
5.5 broadcast - 广播分区
功能说明:
DataStream[T] → DataStream[T]
将元素广播到下游的每个分区
Tips:
数据被广播后,会在下游算子的每个分区中都保留一份,可以将数据进行重复处理
代码示例:
package com.baidu.datastream.transform;
import com.baidu.bean.FlinkUser;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Broadcast {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
KeyedStream<FlinkUser, String> userKeyedStream = env.fromElements(
new FlinkUser(1L, "x", 100L),
new FlinkUser(2L, "x", 110L),
new FlinkUser(3L, "x", 120L),
new FlinkUser(4L, "x", 130L),
new FlinkUser(5L, "x", 140L),
new FlinkUser(6L, "x", 150L)).keyBy(user -> user.name);
userKeyedStream.sum("id").broadcast().print();
env.execute();
}
}
运行结果:
5.6 自定义分区
功能说明:
DataStream[T] → DataStream[T]
使用用户定义的 Partitioner 将元素分发到下游算子的分区中去
代码示例:
package com.baidu.datastream.transform;
import com.baidu.bean.FlinkUser;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class PartitionCustom {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
KeyedStream<FlinkUser, String> userKeyedStream = env.fromElements(
new FlinkUser(1L, "x", 100L),
new FlinkUser(2L, "x", 110L),
new FlinkUser(3L, "x", 120L),
new FlinkUser(4L, "x", 130L),
new FlinkUser(5L, "x", 140L),
new FlinkUser(6L, "x", 150L)).keyBy(user -> user.name);
/*
* TODO Partitioner<K>
* 功能说明:
* 自定义分区器,根据输入的数据获取分区编号
* 泛型说明:
* @K : key的数据类型
* */
Partitioner<Long> partitioner = new Partitioner<Long>() {
@Override
public int partition(Long key, int numPartitions) {
if (key == 1L || key == 2L) {
return 0;
} else if (key == 3L || key == 4L) {
return 1;
} else {
return 2;
}
}
};
/*
* TODO KeySelector<IN, KEY>
* 功能说明:
* key提取器,根据输入的数据,获取key
* 泛型说明:
* @IN : 输入数据类型
* @KEY : 输出数据类型(key)
* */
KeySelector<FlinkUser, Long> keySelector = new KeySelector<FlinkUser, Long>() {
@Override
public Long getKey(FlinkUser value) throws Exception {
return value.id;
}
};
userKeyedStream.sum("id").partitionCustom(partitioner, keySelector).print();
env.execute();
}
}
运行结果:
6、分流
在处理数据的时候,经常会将一条流或者一个表根据某些条件拆分成多条流或者多个表
flink中提供了分流的方式:1、使用filter算子分流 2、使用侧输出流分流
6.1 使用filter算子分流 - 不推荐
这种分流方式的弊端:
需要将原始流复制多份,并对每一份做一次判断,效率很低 (多次读取,多次判断)
代码示例:
// 通过 filter 分流
public static void ByFilter() throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 根据国家,将 totalStream 分为三股流
DataStreamSource<String> totalStream = env.fromElements("蜀_刘备", "蜀_关羽", "魏_曹操", "吴_孙权", "吴_孙坚", "吴_孙策");
SingleOutputStreamOperator<String> weiStream = totalStream.filter(e -> e.contains("魏"));
SingleOutputStreamOperator<String> shuStream = totalStream.filter(e -> e.contains("蜀"));
SingleOutputStreamOperator<String> wuStream = totalStream.filter(e -> e.contains("吴"));
weiStream.print();
shuStream.print();
wuStream.print();
// 3.触发程序执行
env.execute();
}
6.2 使用侧输出流分流 - 推荐
避免了使用filter算子的弊端,指定source读取一次,判断一次即可完成分流操作
代码示例:
// 通过 侧输入流 分流
public static void ByOutputTag() throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 根据国家,将 totalStream 分为三股流
DataStreamSource<String> totalStream = env.fromElements("蜀_刘备", "蜀_关羽", "魏_曹操", "吴_孙权", "吴_孙坚", "吴_孙策");
// 初始化侧输出流
OutputTag weiOutputTag = new OutputTag("wei", Types.STRING);
OutputTag shuOutputTag = new OutputTag("shu", Types.STRING);
OutputTag wuOutputTag = new OutputTag("wu", Types.STRING);
// 通过 ProcessFunction向 侧输出流发送数据
SingleOutputStreamOperator<String> process = totalStream.process(
new ProcessFunction<String, String>() {
@Override
public void processElement(String value, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception {
// 往侧输出流中发送数据
if (value.contains("魏")) {
ctx.output(weiOutputTag, value);
} else if (value.contains("蜀")) {
ctx.output(shuOutputTag, value);
} else if (value.contains("吴")) {
ctx.output(wuOutputTag, value);
}
}
}
);
SideOutputDataStream weiStream = process.getSideOutput(weiOutputTag);
SideOutputDataStream shuStream = process.getSideOutput(shuOutputTag);
SideOutputDataStream wuStream = process.getSideOutput(wuOutputTag);
weiStream.print();
shuStream.print();
wuStream.print();
// 3.触发程序执行
env.execute();
}
7、合并流