Flink中的分流
在Flink中将数据流切分为多个子数据流,子数据流称为”旁路输出数据流“。
拆分流数据的方式
- Split,已经废弃,不推荐使用
- Fliter
- SideOut,推荐使用
Fliter分流的Java实现
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指标明细
DataStream<String> detailMessage = KafkaConfigUtil.buildSource(env)
.map((MapFunction<String, String>) kafkaMessage -> {
JSONObject jsonobject = null;
try {
jsonobject = JSONObject.parseObject(kafkaMessage);
} catch (Exception e) {
LOG.warn("报文格式错误:{}", kafkaMessage);
}
if (null == jsonobject || jsonobject.isEmpty()) {
LOG.warn("报文内容不合法:{}", JSONObject.toJSONString(jsonobject));
} else {
if (!EventsServiceEnum.MapReduce.getValue().equals(jsonobject.get("service"))
&& !EventsServiceEnum.Spark.getValue().equals(jsonobject.get("service"))) {
LOG.warn("报文所属服务不存在:{}", JSONObject.toJSONString(jsonobject));
}
}
return JSONObject.toJSONString(jsonobject);
});
// 将原始流中包含demo的数据筛选出来
DataStream<String> diagnosisMessages = detailMessage
.filter((FilterFunction<String>) kafkaMessage -> (kafkaMessage.contains("demo")))
.map((MapFunction<String, String>) sparkMessage -> {
// 为达到实验效果,进行日志输出
LOG.info("[is demo message]:{}", sparkMessage);
return sparkMessage;
});
env.execute("Flink Streaming Java API Skeleton");
}
SideOut分流的Java实现
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
System.out.println("【SideOutputDemo】");
// 指标明细
DataStream<String> mainMessage = KafkaConfigUtil.buildSource(env)
.map((MapFunction<String, String>) kafkaMessage -> {
JSONObject jsonobject = null;
try {
jsonobject = JSONObject.parseObject(kafkaMessage);
} catch (Exception e) {
LOG.warn("报文格式错误:{}", kafkaMessage);
}
if (null == jsonobject || jsonobject.isEmpty()) {
LOG.warn("报文内容不合法:{}", JSONObject.toJSONString(jsonobject));
} else {
if (!EventsServiceEnum.MapReduce.getValue().equals(jsonobject.get("service"))
&& !EventsServiceEnum.Spark.getValue().equals(jsonobject.get("service"))) {
LOG.warn("报文所属服务不存在:{}", JSONObject.toJSONString(jsonobject));
}
}
return JSONObject.toJSONString(jsonobject);
});
// 定义一个切分(旁路输出)
final OutputTag<String> outputTag = new OutputTag<String>("Spark_END") {
};
SingleOutputStreamOperator<String> sp = mainMessage
.process(new ProcessFunction<String, String>() {
@Override
public void processElement(
String s
, Context context
, Collector<String> collector) throws Exception {
// 向常规流(主流)中添加数据
collector.collect(s);
// 向旁路输出流中添加数据
if (s.contains(AppPhaseEnum.Spark_APP_End.getValue())) {
context.output(outputTag, s);
}
}
});
sp.map((MapFunction<String, String>) sparkMessage -> {
LOG.info("主流的数据: {}", sparkMessage);
return sparkMessage;
});
DataStream<String> tag = sp.getSideOutput(outputTag);
tag.map((MapFunction<String, String>) sparkMessage -> {
LOG.info("旁路[{}]的数据: {}", outputTag.getId(), sparkMessage);
return sparkMessage;
});
env.execute("Flink Streaming Java API Skeleton");
}
SideOutPut 是 Flink 框架推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:
-
为每个分支流定义一个 SideOutPut。
-
为定义好的 SideOutPut发出数据。只有以下特定的函数才能通过Context上下文对象,向旁路输出的SideOutPut发送数据。
- ProcessFunction:处理函数,单流输入函数
- KeyedProcessFunction:处理函数,单流输入函数
- CoProcessFunction:处理函数,双流流输入函数
- KeyedCoProcessFunction:处理函数,双流流输入函数
- ProcessWindowFunction:窗口函数,全量计算函数
- ProcessAllWindowFunction:窗口函数,全量计算函数,它与 ProcessWindowFunction 类似,但是它会对窗口中的所有数据进行处理,而不是仅处理触发窗口计算的数据。
例子中使用ProcessFunction实现流拆分。
-
根据SideOutPut 的ID标识获取旁路输出流,进行数据继续处理。
拆分方式 | 对比 |
---|---|
Split | 不支持链式拆分,切分得到的流,是不能进行再次切分的 |
Fliter | 多分支流,需要多次遍历原始流进行筛选。浪费集群的资源 |
SideOut | 以多次进行拆分的,支持链式拆分。 |