一 、 task执行入口 0:15
接 上期 回顾
★ ——7 》 TaskExecutor#submitTask()
第一个入口:Task 构造函数
——》Task 构造函数()
▼
* 注释: 当前任务的 Task 信息
*/
this.taskInfo = new TaskInfo()
......
* 注释: 初始化 ResultPartition 和 ResultSubpartition 关于输出的抽象
* ResultSubpartition具体实现为 PipelinedSubpartition
*/
final ResultPartitionWriter[] resultPartitionWriters = shuffleEnvironment
.createResultPartitionWriters()
this.consumableNotifyingPartitionWriters =
ConsumableNotifyingResultPartitionWriterDecorator
......
* 注释: 初始化 InputGate 输入的 对象,
inputchanle 从上游一个task节点拉取数据
*/
final IndexedInputGate[] gates = shuffleEnvironment.createInputGates()
* 注释:创建 Task 的线程 但是不执行 run() 方法
*/
executingThread = new Thread(TASK_THREADS_GROUP,
第二个入口:task.startTaskThread(); 通过一个线程来运行 Task 0:45
—— 》Task #run()
—— ★》Task #dorun()
▼
★ 重要步骤 反射实例化StreamTask实例
dorun:1 if(transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING))
dorun:2 setupPartitionsAndGates(consumableNotifyingPartitionWriters,
dorun:3 Environment env = new RuntimeEnvironment(jobId, vertexId,
dorun:4 invokable = loadAndInstantiateInvokable(userCodeClassLoader,
dorun: 5 invokable.invoke();
—— dorun:4》Task #loadAndInstantiateInvokable()
▼
反射调用构造函数
statelessCtor = invokableClass.getConstructor(Environment.class);
//#1 SourceStreamTask 带RuntimeEnvvironment的构造函数
//#2 OneInputStreamTask 带RuntimeEnvvironment的构造函数
Constructor<? extends AbstractInvokable> statelessCtor;
try {
statelessCtor = invokableClass.getConstructor(Environment.class);
}
——#1 SourceStreamTask(Environment env) 构造函数
——#2 OneInputStreamTask (Environment env) 构造函数
//1,2最后都到 父类构造函数 StreamTask()
——★ 》 父类构造函数 StreamTask()
【streamtask 截图或者笔记】
4件大事
StreamTask:1 this.recordWriter = createRecordWriterDelegate(configuration,
StreamTask:2 this.mailboxProcessor = new MailboxProcessor(this::processInput,
StreamTask:3 this.stateBackend = createStateBackend();
StreamTask:4 this.subtaskCheckpointCoordinator = new
—— StreamTask:1 》 StreamTask. createRecordWriterDelegate 1:04
——》1》StreamTask.createRecordWriters()
——》1》StreamTask.createRecordWriter()
——》1》StreamTask.createRecordWriters
▼
// TODO_MA 注释: 初始化一个 ArrayList 容器用来存放创建出来的 RecordWriter
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>
// TODO_MA 注释: 获取该 StreamTask 的输出 StreamEdge 集合
List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder
// TODO_MA 注释: 一个 out StreamEdge 来构建 一个 RecordWriter
// TODO_MA 注释: 大概率 createRecordWriter() 方法的返回值是:
ChannelSelectorRecordWriter
recordWriters.add(createRecordWriter(edge, i,
——》1》StreamTask.createRecordWriters()
▼
- 1、如果上游 StreamNode 和 下游 StreamNode 的并行度一样,则使用: ForwardPartitioner 数据分发策略
-
// TODO_MA 注释: 其实这个 output 就是负责帮您完成这个 StrewamTask 的所有数据的输出2、如果上游 StreamNode 和 下游 StreamNode 的并行度不一样,则使用: RebalancePartitioner 数据分发策略 StreamPartitioner<OUT> outputPartitioner = null; try { outputPartitioner = InstantiationUtil.clone(
// TODO_MA 注释: 输出到 ResultPartition
// TODO_MA 注释: 初始化输出 ChannelSelectorRecordWriter
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output
= new RecordWriterBuilder<SerializationDelegate<StreamRecord<OUT>>>() .setChannelSelector(outputPartitioner)
.setTimeout(bufferTimeout)
.setTaskName(taskName) // TODO_MA 注释: 构建一个 RecordWriter 返回
.build(bufferWriter);
▲回到父类构造函数 StreamTask ▲
StreamTask:2——》 this.mailboxProcessor = new
MailboxProcessor(this::processInput,
//#1 SourceStreamTask 带RuntimeEnvvironment的构造函数
//#2 OneInputStreamTask 带RuntimeEnvvironment的构造函数
▲回到 父类构造函数 StreamTask ▲
StreamTask:3 ——》 this.stateBackend = createStateBackend(); 1:26 ~
state简介 部分
——》 StreamTask.createStateBackend() 1:36
▼
* 注释: 根据配置获取 StateBackend
* 一般情况下,我们在生产环境中, 在 flink-conf.yaml 文件中进行配置:
* 1、state.backend: filesystem
* 2、state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
* 一般有三种方式:
* 1、state.backend: filesystem = FsStateBackend
* 2、state.backend: jobmanager = MemoryStateBackend
* 3、state.backend: rocksdb = RocksDBStateBackend
* 也可以在程序中,进行设置 这种方式会覆盖配置文件中的配置:
* StreamExecutionEnvironment.setStateBackend(StateBackend backend)
*/
return StateBackendLoader.fromApplicationOrConfigOrDefault
▲ 回 父类构造函数 StreamTask▲
******创建 CheckpointStorage
* 1、FsStateBackend = FsCheckpointStorage
*/
stateBackend.createCheckpointStorage(getEnvironment()
↓
FsStateBackendcreateCheckpointStorage
****** 注释: 创建 Channel 的 IO 线程池
*/
this.channelIOExecutor = Executors.newSingleThreadExecutor
▲ 回 父类构造函数 StreamTask▲
▲回 ★》Task #dorun()下 的 dorun: 5 ▲
*** dorun: 5 invokable.invoke();
* 注释: 运行任务, 在流式应用程序中,都是 StreamTask 的子类
* AbstractInvokable 是 Task 执行的主要逻辑,也是所有被执行的任务的基类,包括 Streaming 模式和 Batch 模式。
* 在 Streaming 模式下,所有任务都继承自 StreamTask,
* 包括 StreamTask 的子类包括 SourceStreamTask, OneInputStreamTask, TwoInputStreamTask,
* 以及用于迭代模式下的 StreamIterationHead 和 StreamIterationTail。
* 每一个 StreamNode 在添加到 StreamGraph 的时候都会有一个关联的 jobVertexClass 属性,
* 这个属性就是该 StreamNode 对应的 StreamTask 类型;对于一个 OperatorChain 而言,它所对应的
* StreamTask 就是其 head operator 对应的 StreamTask。
*/
// run the invokable
invokable.invoke();
↓
——》StreamTask#invoke()
▼
inv 1: beforeInvoke()
inv 2: runMailboxLoop();
inv 3: afterInvoke();
inv 4: cleanUpInvoke();
——inv 1 》StreamTask#beforeInvoke()
▼
-
注释: 构建 OperatorChain 对象,里面会做很多事情
* 初始化 output 输出对象
* 主要做三件事情:
* 1、调用createStreamOutput()创建对应的下游输出RecordWriterOutput
* 2、调用createOutputCollector()将优化逻辑计划当中Chain中的StreamConfig(也就是数据)写入到第三步创建的RecordWriterOutput中
* 3、通过调用getChainedOutputs()输出结果RecordWriterOutput
*/
operatorChain = new OperatorChain<>(this, recordWriter);——inv 1 》OperatorChain 构造函数 ▼ // TODO_MA 注释: 遍历每个输出边,给每个 outEdge 构造一个 RecordWriterOutput 实例
for(int i = 0; i < outEdgesInOrder.size(); i++) {
* 为每一个 Operator 构造 RecordWriterOutput
RecordWriterOutput<?> streamOutput =
createStreamOutput(recordWriterDelegate
//放到map 里面
streamOutputMap.put(outEdge, streamOutput);
} 注释: 为每一个 Operator 创建 OutputCollector
*/
this.chainEntryPoint = createOutputCollector(-
注释: 创建 Operator
Tuple2<OP, Optional<ProcessingTimeService>> headOperatorAndTimeService =- 注释: 创建 OperatorWrapper
this.headOperatorWrapper = createOperatorWrapper(
// TODO_MA 注释: 所有 OperatorWrapper 对象集合,把 headOperatorWrapper 放入到最后
// TODO_MA 注释: 其实一个 OperatorChain 中,包含了多个 Operator,最终都被封装成 OperatorWrapper 放入这个集合中
// add head operator to end of chain
allOpWrappers.add(headOperatorWrapper);
- 注释: 创建 OperatorWrapper
// TODO_MA 注释: tailOperatorWrapper 在 ArrayList 的最前面
this.tailOperatorWrapper = allOpWrappers.get(0);
-
注释: 以 forward topological order 链接全部的 operator wrappers为一张图
*/
linkOperatorWrappers(allOpWrappers);▲OperatorChain 构造函数结束▲
▲回inv 1 》StreamTask#beforeInvoke()方法 里面
// TODO_MA 注释: 获取 OperatorChain 的第一个 Operator
// TODO_MA 注释: 可以认为 接收数据线程中,用到的 headOpeartor 终于被初始化
// TODO_MA 注释: 其实到此为止,可以认为,在当前 OperatorChain 中要用到的各种组件都已经创建好了,可以接收数据,然后开始流式处理了。
headOperator = operatorChain.getHeadOperator();
- 注释: 执行 StreamTask 的初始化
- 1、可能是 SourceStreamTask, 对于 SourceStreamTask 来说,只是注册一个 savepoint 钩子
- 2、也可能是 OneInputStreamTask
*/
init(); - 注释: 状态恢复入口
operatorChain.initializeStateAndOpenOperators( - 注释: 初始化 Mail
* 这个地方主要是初始化 InputGate 等输入相关的细节
*/
readRecoveredChannelState();
----------------------至此 所有准备工作完成, 准备真正执行task数据流处理到此为止,Task 初始化和预执行相关的,都基本到位了,然后就开始从我们的 SourceStreamTask 的HeadOperator 的数据接收线程,开始流式处理。-------------
中场休息
▲回到 inv 2: runMailboxLoop(); 开始 2:17
inv 2: runMailboxLoop();
——inv 2》MailboxProcessor.runMailboxLoop()
—— 跳转到父类入口 》SourceStreamTask#LegacySourceFunctionThread.run()
headOperator.run(lock,
——★ 》StreamSource#run()
▼
注释: 获取 Operator 的执行上下文对象
*/
this.ctx = StreamSourceContexts.getSourceContext(timeCharacteristic注释: 真正运行用户的 Operator
* 1、如果你使用:env.socketTextStream() 则调用:
SocketTextStreamFunction
* 2、如果你使用:Kafka数据源, 则调用: FlinkKafkaConsumerBase
* ......
* function --> transformation ---> streamOperator
* headOperator.run();
*/
userFunction.run(ctx);
↓ 有很多source ,选择SocketTextStreamFunction 为例
—— 》SocketTextStreamFunction.run()
// TODO_MA 注释: 没有数据,则阻塞在这儿
// TODO_MA 注释: 在 SourceStreamTask 初始化的时候,SourceThread 的代码能执行到这儿
while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
// TODO_MA 注释: 一直读到有分隔符
while (buffer.length() >= delimiter.length()
.......
***** 把读取到的数据传给ctx 上下文处理****
ctx.collect(record);
↓ 选择NonTimestamp - 时间戳为 processtime
StreamSourceContexts#NonTimestampContext#collect()
▼
* output = CountingOutput
* -
* element 收到的 一条数据
* reuse 装一条数据,待序列化的容器
* collect 执行 collect 会对 reuse 对象执行序列化
*/
output.collect(reuse.replace(element));
↓
—— 》 CountingOutput.collect()
* output = ChainingOutput
*/
output.collect(record);
↓
—— 》OperatorChain#ChainingOutput.collect()
—— 》 pushToOperator(record);
▼
-
注释: 调用 Operator 的 processElement 来处理 castRecord 数据记录
* 假设下一个算子是 keyBy, 则跳转到 : KeyedProcessOperator
* 因为之后要 shuffle 了,所以之后就没有其他的 Operator 了
* map() = StreamOperator = StreamMap = operator
*/
operator.processElement(castRecord);
↓
—— 》StreamMap.processElement()
▼- 注释: 通过 Output 收集处理之后的结果数据
- 1、userFunction.map(element.getValue()) 这是用户自定义的 map 逻辑
- 2、然后计算完的结果替换掉当前 Opeartor 中的成员变量
- 3、然后被 StreamMap 这个 StreamOperator 继续收集
-
- 记住:因为当前是 map 操作,所以下一步肯定不是 shuffle
- 但是,如果是 shuffle 算子,则会执行输出了。
*/
output.collect(element.replace(userFunction.map(element.getValue())));
↓
—— 》▲回到 OperatorChain#ChainingOutput..collect() 递归式链接调用
// TODO_MA 注释: 跳转到下一个 Operator 来处理元素
// TODO_MA 注释: StreamMap
// TODO_MA 注释: keyBy
pushToOperator(record);
▼
operator.processElement(castRecord);
↓
—— 》 KeyedProcessOperator.processElement(castRecord);
▼
- 注释: 通过 Output 收集处理之后的结果数据
注释: 处理元素
userFunction.processElement(element.getValue(), context, collector);
↓ keyby就是聚合
—— 》 GroupAggFunction.processElement()
▼-
注释: 状态初始化 拿到初始值
*/
RowData accumulators = accState.value();
// 设置累加器到第一个
function.setAccumulators(accumulators);
// 得到上一个值
RowData prevAggValue = function.getValue();
// TODO_MA 注释: 累加 或者 缩回,其实都是聚合
// update aggregate result and set to the newRow
if (isAccumulateMsg(input)) {
// accumulate input
function.accumulate(input);
} else {
// retract input
function.retract(input);
}
// TODO_MA 注释: 计算完毕,得到新的结果
// get current aggregate result
RowData newAggValue = function.getValue();
// get accumulator
accumulators = function.getAccumulators();
// TODO_MA 注释: 讲计算得到的结果,替换到当前的 resultRow 成员变量中
// prepare UPDATE_BEFORE message for previous row
resultRow.replace(currentKey, prevAggValue)
.setRowKind(RowKind.UPDATE_BEFORE);
// TODO_MA 注释: 写出
out.collect(resultRow);
↓
RecordWriterOutput.collect(resultRow)
—— 》 pushToRecordWriter(StreamRecord<X> record)
* 注释: 发送
*/
recordWriter.emit(serializationDelegate);
↓ 一般不是广播
—— 》 ChannelSelectorRecordWriter.emit()
▼
* 注释: channelSelector确定目标channel- channelSelector 的作用,就和 mapreduce 框架中的 Partitioner 是一样的作用:
- 用来决定 record 到底被分发到那个一个分区
- channelSelector.selectChannel(record) = partitioner.getPartition()
*/
emit(record, channelSelector.selectChannel(record));
▼
// TODO_MA 注释: 序列化
serializer.serializeRecord(record);
// TODO_MA 注释: 将序列化器中的序列化结果写入目标 channel if (copyFromSerializerToTargetChannel(targetChannel)) {
- channelSelector 的作用,就和 mapreduce 框架中的 Partitioner 是一样的作用:
—— 》 ChannelSelectorRecordWriter.copyFromSerializerToTargetChannel()
flushTargetPartition(targetChannel);
↓ 一般不是广播
—— 》ResultPartitionWriter#flushTargetPartition()
* 注释: flush 到对应的 ResultPartition 中
* targetChannel = InputChannel
* targetPartition = ResultPartition
*/
targetPartition.flush(targetChannel);
↓
ResultPartition.flush()
↓
PipelinedSubpartition.flush()
—— 》 PipelinedSubpartition# notifyDataAvailable();
▼
// TODO_MA 注释: readView 是 ResultSubPartition 的消费者视图 对象
// TODO_MA 注释: 下游的一个Task 可能会消费上游的多个Task的某一个分区的数据。
/ TODO_MA 注释: 上游个任意一个Task的任意一个分区叫做: ResultSubPartition,
// TODO_MA 注释: 这个 ResultSubPartition 对应一个消费者:
PipelinedSubpartitionView
readView.notifyDataAvailable();
—— 》PipelinedSubpartitionView#notifyDataAvailable()
↓
—— 》LocalInputChannel#notifyDataAvailable()
* 注释:
*/
inputGate.notifyChannelNonEmpty(this);
—— 》SingleInputGate.notifyChannelNonEmpty()
* 注释: 某个 channel 有可写入数了,该干活了。
*/
queueChannel(checkNotNull(channel));
—— 》SingleInputGate.queueChannel()
▼
注释: 加入队列中
* 既然将 有数据可用的channel 加入到 inputChannelsWithData,
* 那就证明,一定有其他的什么角色来从这个队列中获取 可用的channel 来消费数据
*/
inputChannelsWithData.add(channel);注释: 发送信号!
*/
if(availableChannels == 0) {
// TODO_MA 注释: 如果之前队列中没有channel
//,这个channel加入后,通知等待的线程
inputChannelsWithData.notifyAll(); 2:50
以上线程会接到 steamTask启动时wait的 线程传到这个方法*
—— 》SingleInputGate.getChannel()
——主数据处理执行这个 》 StreamTaskNetworkInput.emitNext()
—— 输出》 OneInputStreamTask#StreamTaskNetworkOutput.emitRecord()