淘先锋技术网

首页 1 2 3 4 5 6 7

1、API说明

非并行数据源:
        def fromElements[T: TypeInformation](data: T*): DataStream[T]
        def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T] 
        def fromCollection[T: TypeInformation] (data: Iterator[T]): DataStream[T] 

并行数据源:
        def fromParallelCollection[T: TypeInformation] (data: SplittableIterator[T])

使用场景:

        常用来调试代码使用


2、这是一个完整的入门案例

开发语言:Java1.8

Flink版本:flink1.17.0

package com.baidu.datastream.source;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.NumberSequenceIterator;

import java.util.Arrays;
import java.util.List;

// --------------------------------------------------------------------------------------------
//  TODO 从集合中读取数据
// --------------------------------------------------------------------------------------------

/*
 *  TODO 通过`读取Java集合中数据`来创建 DataStreamSource
 *
 *  方法1:fromCollection
 *        Collection、Iterator -> DataStreamSource
 *  方法2:fromElements
 *        OUT... data -> DataStreamSource
 *  方法3:fromParallelCollection
 *        SplittableIterator -> DataStreamSource
 *  重要提示:
 *       fromCollection、fromElements 创建的是非并行source算子(并行度只能为1)
 *       fromParallelCollection 创建的是并行算子(并行度>=1)
 * */

public class ReadCollection {
    public static void main(String[] args) throws Exception {
        fromCollection();
        //fromElements();
        //fromParallelCollection();
    }

    public static void fromCollection() throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        // 2.读取Java集合数据
        List<String> list = Arrays.asList("刘备", "张飞", "关羽", "赵云", "马超", "黄忠");
        env.fromCollection(list).print();

        // 3.触发程序执行
        env.execute();
    }

    public static void fromElements() throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        // 2.读取给定的对象序列
        env.fromElements("刘备", "张飞", "关羽", "赵云", "马超", "黄忠").print();

        // 3.触发程序执行
        env.execute();
    }

    public static void fromParallelCollection() throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        // 2.读取给定的对象序列
        NumberSequenceIterator numberSequenceIterator = new NumberSequenceIterator(1, 10);

        env.fromParallelCollection(numberSequenceIterator, Long.class).print();
        /*
         * 注意: fromParallelCollection生成的source为并行算子
         *       集合中的数据会被平均分配到并行子任务中去
         * */

        // 3.触发程序执行
        env.execute();
    }
}