1、API说明
非并行数据源:
def fromElements[T: TypeInformation](data: T*): DataStream[T]
def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T]
def fromCollection[T: TypeInformation] (data: Iterator[T]): DataStream[T]
并行数据源:
def fromParallelCollection[T: TypeInformation] (data: SplittableIterator[T])
使用场景:
常用来调试代码使用
2、这是一个完整的入门案例
开发语言:Java1.8
Flink版本:flink1.17.0
package com.baidu.datastream.source;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.NumberSequenceIterator;
import java.util.Arrays;
import java.util.List;
// --------------------------------------------------------------------------------------------
// TODO 从集合中读取数据
// --------------------------------------------------------------------------------------------
/*
* TODO 通过`读取Java集合中数据`来创建 DataStreamSource
*
* 方法1:fromCollection
* Collection、Iterator -> DataStreamSource
* 方法2:fromElements
* OUT... data -> DataStreamSource
* 方法3:fromParallelCollection
* SplittableIterator -> DataStreamSource
* 重要提示:
* fromCollection、fromElements 创建的是非并行source算子(并行度只能为1)
* fromParallelCollection 创建的是并行算子(并行度>=1)
* */
public class ReadCollection {
public static void main(String[] args) throws Exception {
fromCollection();
//fromElements();
//fromParallelCollection();
}
public static void fromCollection() throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
// 2.读取Java集合数据
List<String> list = Arrays.asList("刘备", "张飞", "关羽", "赵云", "马超", "黄忠");
env.fromCollection(list).print();
// 3.触发程序执行
env.execute();
}
public static void fromElements() throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
// 2.读取给定的对象序列
env.fromElements("刘备", "张飞", "关羽", "赵云", "马超", "黄忠").print();
// 3.触发程序执行
env.execute();
}
public static void fromParallelCollection() throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
// 2.读取给定的对象序列
NumberSequenceIterator numberSequenceIterator = new NumberSequenceIterator(1, 10);
env.fromParallelCollection(numberSequenceIterator, Long.class).print();
/*
* 注意: fromParallelCollection生成的source为并行算子
* 集合中的数据会被平均分配到并行子任务中去
* */
// 3.触发程序执行
env.execute();
}
}