淘先锋技术网

首页 1 2 3 4 5 6 7

Flink中的分流

在Flink中将数据流切分为多个子数据流,子数据流称为”旁路输出数据流“。

拆分流数据的方式

  • Split,已经废弃,不推荐使用
  • Fliter
  • SideOut,推荐使用

Fliter分流的Java实现

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 指标明细
        DataStream<String> detailMessage = KafkaConfigUtil.buildSource(env)
                .map((MapFunction<String, String>) kafkaMessage -> {
                    JSONObject jsonobject = null;
                    try {
                        jsonobject = JSONObject.parseObject(kafkaMessage);
                    } catch (Exception e) {
                        LOG.warn("报文格式错误:{}", kafkaMessage);
                    }
                    if (null == jsonobject || jsonobject.isEmpty()) {
                        LOG.warn("报文内容不合法:{}", JSONObject.toJSONString(jsonobject));
                    } else {
                        if (!EventsServiceEnum.MapReduce.getValue().equals(jsonobject.get("service"))
                                && !EventsServiceEnum.Spark.getValue().equals(jsonobject.get("service"))) {
                            LOG.warn("报文所属服务不存在:{}", JSONObject.toJSONString(jsonobject));
                        }
                    }
                    return JSONObject.toJSONString(jsonobject);
                });
        // 将原始流中包含demo的数据筛选出来
        DataStream<String> diagnosisMessages = detailMessage
                .filter((FilterFunction<String>) kafkaMessage -> (kafkaMessage.contains("demo")))
                .map((MapFunction<String, String>) sparkMessage -> {
                    // 为达到实验效果,进行日志输出
                    LOG.info("[is demo message]:{}", sparkMessage);
                    return sparkMessage;
                });

        env.execute("Flink Streaming Java API Skeleton");
    }

SideOut分流的Java实现

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        System.out.println("【SideOutputDemo】");
        
        // 指标明细
        DataStream<String> mainMessage = KafkaConfigUtil.buildSource(env)
                .map((MapFunction<String, String>) kafkaMessage -> {
                    JSONObject jsonobject = null;
                    try {
                        jsonobject = JSONObject.parseObject(kafkaMessage);
                    } catch (Exception e) {
                        LOG.warn("报文格式错误:{}", kafkaMessage);
                    }
                    if (null == jsonobject || jsonobject.isEmpty()) {
                        LOG.warn("报文内容不合法:{}", JSONObject.toJSONString(jsonobject));
                    } else {
                        if (!EventsServiceEnum.MapReduce.getValue().equals(jsonobject.get("service"))
                                && !EventsServiceEnum.Spark.getValue().equals(jsonobject.get("service"))) {
                            LOG.warn("报文所属服务不存在:{}", JSONObject.toJSONString(jsonobject));
                        }
                    }
                    return JSONObject.toJSONString(jsonobject);
                });

        // 定义一个切分(旁路输出)
        final OutputTag<String> outputTag = new OutputTag<String>("Spark_END") {
        };

        SingleOutputStreamOperator<String> sp = mainMessage
                .process(new ProcessFunction<String, String>() {
                    @Override
                    public void processElement(
                            String s
                            , Context context
                            , Collector<String> collector) throws Exception {
                        // 向常规流(主流)中添加数据
                        collector.collect(s);
                        // 向旁路输出流中添加数据
                        if (s.contains(AppPhaseEnum.Spark_APP_End.getValue())) {
                            context.output(outputTag, s);
                        }
                    }
                });
        sp.map((MapFunction<String, String>) sparkMessage -> {
            LOG.info("主流的数据: {}", sparkMessage);
            return sparkMessage;
        });

        DataStream<String> tag = sp.getSideOutput(outputTag);
        tag.map((MapFunction<String, String>) sparkMessage -> {
            LOG.info("旁路[{}]的数据: {}", outputTag.getId(), sparkMessage);
            return sparkMessage;
        });

        env.execute("Flink Streaming Java API Skeleton");
    }

SideOutPut 是 Flink 框架推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:

  1. 为每个分支流定义一个 SideOutPut。

  2. 为定义好的 SideOutPut发出数据。只有以下特定的函数才能通过Context上下文对象,向旁路输出的SideOutPut发送数据。

    1. ProcessFunction:处理函数,单流输入函数
    2. KeyedProcessFunction:处理函数,单流输入函数
    3. CoProcessFunction:处理函数,双流流输入函数
    4. KeyedCoProcessFunction:处理函数,双流流输入函数
    5. ProcessWindowFunction:窗口函数,全量计算函数
    6. ProcessAllWindowFunction:窗口函数,全量计算函数,它与 ProcessWindowFunction 类似,但是它会对窗口中的所有数据进行处理,而不是仅处理触发窗口计算的数据。

    例子中使用ProcessFunction实现流拆分。

  3. 根据SideOutPut 的ID标识获取旁路输出流,进行数据继续处理。

拆分方式对比
Split不支持链式拆分,切分得到的流,是不能进行再次切分的
Fliter多分支流,需要多次遍历原始流进行筛选。浪费集群的资源
SideOut以多次进行拆分的,支持链式拆分。