淘先锋技术网

首页 1 2 3 4 5 6 7

一 、 task执行入口 0:15

接 上期 回顾
★ ——7 》 TaskExecutor#submitTask()

第一个入口:Task 构造函数
——》Task 构造函数()

* 注释: 当前任务的 Task 信息
*/
this.taskInfo = new TaskInfo()
......
* 注释: 初始化 ResultPartition 和 ResultSubpartition 关于输出的抽象
* ResultSubpartition具体实现为 PipelinedSubpartition
*/
final ResultPartitionWriter[] resultPartitionWriters = shuffleEnvironment
.createResultPartitionWriters()
this.consumableNotifyingPartitionWriters =
ConsumableNotifyingResultPartitionWriterDecorator
......
* 注释: 初始化 InputGate 输入的 对象,
inputchanle 从上游一个task节点拉取数据
*/
final IndexedInputGate[] gates = shuffleEnvironment.createInputGates()
* 注释:创建 Task 的线程 但是不执行 run() 方法
*/
executingThread = new Thread(TASK_THREADS_GROUP,

第二个入口:task.startTaskThread(); 通过一个线程来运行 Task 0:45
—— 》Task #run()
—— ★》Task #dorun()

Task run过程

★ 重要步骤 反射实例化StreamTask实例

image.png

dorun:1 if(transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING))
dorun:2 setupPartitionsAndGates(consumableNotifyingPartitionWriters,
dorun:3 Environment env = new RuntimeEnvironment(jobId, vertexId,
dorun:4 invokable = loadAndInstantiateInvokable(userCodeClassLoader,
dorun: 5 invokable.invoke();

—— dorun:4》Task #loadAndInstantiateInvokable()

反射调用构造函数
statelessCtor = invokableClass.getConstructor(Environment.class);

//#1 SourceStreamTask 带RuntimeEnvvironment的构造函数
//#2 OneInputStreamTask 带RuntimeEnvvironment的构造函数

Constructor<? extends AbstractInvokable> statelessCtor;
try {
statelessCtor = invokableClass.getConstructor(Environment.class);
}

——#1 SourceStreamTask(Environment env) 构造函数
——#2 OneInputStreamTask (Environment env) 构造函数

//1,2最后都到 父类构造函数 StreamTask()  

    ——★  》 父类构造函数 StreamTask()
           【streamtask 截图或者笔记】
           4件大事

StreamTask:1 this.recordWriter = createRecordWriterDelegate(configuration,
StreamTask:2 this.mailboxProcessor = new MailboxProcessor(this::processInput,
StreamTask:3 this.stateBackend = createStateBackend();
StreamTask:4 this.subtaskCheckpointCoordinator = new

—— StreamTask:1 》 StreamTask. createRecordWriterDelegate 1:04
——》1》StreamTask.createRecordWriters()
——》1》StreamTask.createRecordWriter()
——》1》StreamTask.createRecordWriters

// TODO_MA 注释: 初始化一个 ArrayList 容器用来存放创建出来的 RecordWriter
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>
// TODO_MA 注释: 获取该 StreamTask 的输出 StreamEdge 集合
List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder
// TODO_MA 注释: 一个 out StreamEdge 来构建 一个 RecordWriter
// TODO_MA 注释: 大概率 createRecordWriter() 方法的返回值是:
ChannelSelectorRecordWriter
recordWriters.add(createRecordWriter(edge, i,
——》1》StreamTask.createRecordWriters()

  • 1、如果上游 StreamNode 和 下游 StreamNode 的并行度一样,则使用: ForwardPartitioner 数据分发策略
  • 2、如果上游 StreamNode 和 下游 StreamNode 的并行度不一样,则使用: RebalancePartitioner 数据分发策略 
    StreamPartitioner<OUT> outputPartitioner = null; 
    try {
        outputPartitioner = InstantiationUtil.clone(
    
    // TODO_MA 注释: 其实这个 output 就是负责帮您完成这个 StrewamTask 的所有数据的输出
    // TODO_MA 注释: 输出到 ResultPartition
    // TODO_MA 注释: 初始化输出 ChannelSelectorRecordWriter
    RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output
    = new RecordWriterBuilder<SerializationDelegate<StreamRecord<OUT>>>() .setChannelSelector(outputPartitioner)
    .setTimeout(bufferTimeout)
    .setTaskName(taskName) // TODO_MA 注释: 构建一个 RecordWriter 返回
    .build(bufferWriter);
    ▲回到父类构造函数 StreamTask ▲

StreamTask:2——》 this.mailboxProcessor = new
MailboxProcessor(this::processInput,
//#1 SourceStreamTask 带RuntimeEnvvironment的构造函数
//#2 OneInputStreamTask 带RuntimeEnvvironment的构造函数
▲回到 父类构造函数 StreamTask ▲

StreamTask:3 ——》 this.stateBackend = createStateBackend(); 1:26 ~
state简介 部分

——》 StreamTask.createStateBackend() 1:36

* 注释: 根据配置获取 StateBackend
* 一般情况下,我们在生产环境中, 在 flink-conf.yaml 文件中进行配置:
* 1、state.backend: filesystem
* 2、state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
* 一般有三种方式:
* 1、state.backend: filesystem = FsStateBackend
* 2、state.backend: jobmanager = MemoryStateBackend
* 3、state.backend: rocksdb = RocksDBStateBackend
* 也可以在程序中,进行设置 这种方式会覆盖配置文件中的配置:
* StreamExecutionEnvironment.setStateBackend(StateBackend backend)
*/
return StateBackendLoader.fromApplicationOrConfigOrDefault
▲ 回 父类构造函数 StreamTask▲

******创建 CheckpointStorage
* 1、FsStateBackend = FsCheckpointStorage
*/
stateBackend.createCheckpointStorage(getEnvironment()

FsStateBackendcreateCheckpointStorage

****** 注释: 创建 Channel 的 IO 线程池
*/
this.channelIOExecutor = Executors.newSingleThreadExecutor
▲ 回 父类构造函数 StreamTask▲

▲回     ★》Task #dorun()下 的  dorun: 5   ▲

*** dorun: 5 invokable.invoke();
* 注释: 运行任务, 在流式应用程序中,都是 StreamTask 的子类
* AbstractInvokable 是 Task 执行的主要逻辑,也是所有被执行的任务的基类,包括 Streaming 模式和 Batch 模式。
* 在 Streaming 模式下,所有任务都继承自 StreamTask,
* 包括 StreamTask 的子类包括 SourceStreamTask, OneInputStreamTask, TwoInputStreamTask,
* 以及用于迭代模式下的 StreamIterationHead 和 StreamIterationTail。
* 每一个 StreamNode 在添加到 StreamGraph 的时候都会有一个关联的 jobVertexClass 属性,
* 这个属性就是该 StreamNode 对应的 StreamTask 类型;对于一个 OperatorChain 而言,它所对应的
* StreamTask 就是其 head operator 对应的 StreamTask。
*/
// run the invokable
invokable.invoke();

——》StreamTask#invoke()

inv 1: beforeInvoke()
inv 2: runMailboxLoop();
inv 3: afterInvoke();
inv 4: cleanUpInvoke();

——inv 1 》StreamTask#beforeInvoke()

  • 注释: 构建 OperatorChain 对象,里面会做很多事情
    * 初始化 output 输出对象
    * 主要做三件事情:
    * 1、调用createStreamOutput()创建对应的下游输出RecordWriterOutput
    * 2、调用createOutputCollector()将优化逻辑计划当中Chain中的StreamConfig(也就是数据)写入到第三步创建的RecordWriterOutput中
    * 3、通过调用getChainedOutputs()输出结果RecordWriterOutput
    */
    operatorChain = new OperatorChain<>(this, recordWriter);

    ——inv 1 》OperatorChain 构造函数
                                                    ▼
      // TODO_MA 注释: 遍历每个输出边,给每个 outEdge 构造一个 
          RecordWriterOutput 实例
    

    for(int i = 0; i < outEdgesInOrder.size(); i++) {
    * 为每一个 Operator 构造 RecordWriterOutput
    RecordWriterOutput<?> streamOutput =
    createStreamOutput(recordWriterDelegate
    //放到map 里面
    streamOutputMap.put(outEdge, streamOutput);
    }

  • 注释: 为每一个 Operator 创建 OutputCollector
    */
    this.chainEntryPoint = createOutputCollector(

  • 注释: 创建 Operator
    Tuple2<OP, Optional<ProcessingTimeService>> headOperatorAndTimeService =

    • 注释: 创建 OperatorWrapper
      this.headOperatorWrapper = createOperatorWrapper(
      // TODO_MA 注释: 所有 OperatorWrapper 对象集合,把 headOperatorWrapper 放入到最后
      // TODO_MA 注释: 其实一个 OperatorChain 中,包含了多个 Operator,最终都被封装成 OperatorWrapper 放入这个集合中
      // add head operator to end of chain
      allOpWrappers.add(headOperatorWrapper);

// TODO_MA 注释: tailOperatorWrapper 在 ArrayList 的最前面
this.tailOperatorWrapper = allOpWrappers.get(0);

  • 注释: 以 forward topological order 链接全部的 operator wrappers为一张图
    */
    linkOperatorWrappers(allOpWrappers);

    operatorchain.png

    ▲OperatorChain 构造函数结束▲

    ▲回inv 1 》StreamTask#beforeInvoke()方法 里面
    

// TODO_MA 注释: 获取 OperatorChain 的第一个 Operator
// TODO_MA 注释: 可以认为 接收数据线程中,用到的 headOpeartor 终于被初始化
// TODO_MA 注释: 其实到此为止,可以认为,在当前 OperatorChain 中要用到的各种组件都已经创建好了,可以接收数据,然后开始流式处理了。
headOperator = operatorChain.getHeadOperator();

  • 注释: 执行 StreamTask 的初始化
  • 1、可能是 SourceStreamTask, 对于 SourceStreamTask 来说,只是注册一个 savepoint 钩子
  • 2、也可能是 OneInputStreamTask
    */
    init();
  • 注释: 状态恢复入口
    operatorChain.initializeStateAndOpenOperators(
  • 注释: 初始化 Mail
    * 这个地方主要是初始化 InputGate 等输入相关的细节
    */
    readRecoveredChannelState();

----------------------至此 所有准备工作完成, 准备真正执行task数据流处理到此为止,Task 初始化和预执行相关的,都基本到位了,然后就开始从我们的 SourceStreamTask 的HeadOperator 的数据接收线程,开始流式处理。-------------

中场休息

▲回到 inv 2: runMailboxLoop(); 开始 2:17

inv 2: runMailboxLoop();

——inv 2》MailboxProcessor.runMailboxLoop()

—— 跳转到父类入口 》SourceStreamTask#LegacySourceFunctionThread.run()
headOperator.run(lock,
——★ 》StreamSource#run()

  • 注释: 获取 Operator 的执行上下文对象
    */
    this.ctx = StreamSourceContexts.getSourceContext(timeCharacteristic

  • 注释: 真正运行用户的 Operator
    * 1、如果你使用:env.socketTextStream() 则调用:
    SocketTextStreamFunction
    * 2、如果你使用:Kafka数据源, 则调用: FlinkKafkaConsumerBase
    * ......
    * function --> transformation ---> streamOperator
    * headOperator.run();
    */
    userFunction.run(ctx);
    ↓ 有很多source ,选择SocketTextStreamFunction 为例
    —— 》SocketTextStreamFunction.run()

// TODO_MA 注释: 没有数据,则阻塞在这儿
// TODO_MA 注释: 在 SourceStreamTask 初始化的时候,SourceThread 的代码能执行到这儿
while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
// TODO_MA 注释: 一直读到有分隔符
while (buffer.length() >= delimiter.length()
.......
***** 把读取到的数据传给ctx 上下文处理****
ctx.collect(record);
↓ 选择NonTimestamp - 时间戳为 processtime
StreamSourceContexts#NonTimestampContext#collect()

* output = CountingOutput
* -
* element 收到的 一条数据
* reuse 装一条数据,待序列化的容器
* collect 执行 collect 会对 reuse 对象执行序列化
*/
output.collect(reuse.replace(element));

—— 》 CountingOutput.collect()
* output = ChainingOutput
*/
output.collect(record);

—— 》OperatorChain#ChainingOutput.collect()
—— 》 pushToOperator(record);

  • 注释: 调用 Operator 的 processElement 来处理 castRecord 数据记录
    * 假设下一个算子是 keyBy, 则跳转到 : KeyedProcessOperator
    * 因为之后要 shuffle 了,所以之后就没有其他的 Operator 了
    * map() = StreamOperator = StreamMap = operator
    */
    operator.processElement(castRecord);

    —— 》StreamMap.processElement()

    • 注释: 通过 Output 收集处理之后的结果数据
      • 1、userFunction.map(element.getValue()) 这是用户自定义的 map 逻辑
      • 2、然后计算完的结果替换掉当前 Opeartor 中的成员变量
      • 3、然后被 StreamMap 这个 StreamOperator 继续收集
      • 记住:因为当前是 map 操作,所以下一步肯定不是 shuffle
      • 但是,如果是 shuffle 算子,则会执行输出了。
        */
        output.collect(element.replace(userFunction.map(element.getValue())));

        —— 》▲回到 OperatorChain#ChainingOutput..collect() 递归式链接调用
        // TODO_MA 注释: 跳转到下一个 Operator 来处理元素
        // TODO_MA 注释: StreamMap
        // TODO_MA 注释: keyBy
        pushToOperator(record);

        operator.processElement(castRecord);

        —— 》 KeyedProcessOperator.processElement(castRecord);
  • 注释: 处理元素
    userFunction.processElement(element.getValue(), context, collector);
    ↓ keyby就是聚合
    —— 》 GroupAggFunction.processElement()

  • 注释: 状态初始化 拿到初始值
    */
    RowData accumulators = accState.value();
    // 设置累加器到第一个
    function.setAccumulators(accumulators);
    // 得到上一个值
    RowData prevAggValue = function.getValue();
    // TODO_MA 注释: 累加 或者 缩回,其实都是聚合
    // update aggregate result and set to the newRow
    if (isAccumulateMsg(input)) {
    // accumulate input
    function.accumulate(input);
    } else {
    // retract input
    function.retract(input);
    }
    // TODO_MA 注释: 计算完毕,得到新的结果
    // get current aggregate result
    RowData newAggValue = function.getValue();
    // get accumulator
    accumulators = function.getAccumulators();
    // TODO_MA 注释: 讲计算得到的结果,替换到当前的 resultRow 成员变量中
    // prepare UPDATE_BEFORE message for previous row
    resultRow.replace(currentKey, prevAggValue)
    .setRowKind(RowKind.UPDATE_BEFORE);
    // TODO_MA 注释: 写出
    out.collect(resultRow);

    RecordWriterOutput.collect(resultRow)
    —— 》 pushToRecordWriter(StreamRecord<X> record)
    * 注释: 发送
    */
    recordWriter.emit(serializationDelegate);
    ↓ 一般不是广播
    —— 》 ChannelSelectorRecordWriter.emit()

    * 注释: channelSelector确定目标channel

    • channelSelector 的作用,就和 mapreduce 框架中的 Partitioner 是一样的作用:
      • 用来决定 record 到底被分发到那个一个分区
      • channelSelector.selectChannel(record) = partitioner.getPartition()
        */
        emit(record, channelSelector.selectChannel(record));

        // TODO_MA 注释: 序列化
        serializer.serializeRecord(record);
    // TODO_MA 注释: 将序列化器中的序列化结果写入目标 channel      
    if (copyFromSerializerToTargetChannel(targetChannel)) {
    

—— 》 ChannelSelectorRecordWriter.copyFromSerializerToTargetChannel()
flushTargetPartition(targetChannel);
↓ 一般不是广播
—— 》ResultPartitionWriter#flushTargetPartition()
* 注释: flush 到对应的 ResultPartition 中
* targetChannel = InputChannel
* targetPartition = ResultPartition
*/
targetPartition.flush(targetChannel);

ResultPartition.flush()

PipelinedSubpartition.flush()
—— 》 PipelinedSubpartition# notifyDataAvailable();

// TODO_MA 注释: readView 是 ResultSubPartition 的消费者视图 对象
// TODO_MA 注释: 下游的一个Task 可能会消费上游的多个Task的某一个分区的数据。
/ TODO_MA 注释: 上游个任意一个Task的任意一个分区叫做: ResultSubPartition,
// TODO_MA 注释: 这个 ResultSubPartition 对应一个消费者:
PipelinedSubpartitionView

readView.notifyDataAvailable();

—— 》PipelinedSubpartitionView#notifyDataAvailable()

—— 》LocalInputChannel#notifyDataAvailable()
* 注释:
*/
inputGate.notifyChannelNonEmpty(this);
—— 》SingleInputGate.notifyChannelNonEmpty()
* 注释: 某个 channel 有可写入数了,该干活了。
*/
queueChannel(checkNotNull(channel));
—— 》SingleInputGate.queueChannel()

  • 注释: 加入队列中
    * 既然将 有数据可用的channel 加入到 inputChannelsWithData,
    * 那就证明,一定有其他的什么角色来从这个队列中获取 可用的channel 来消费数据
    */
    inputChannelsWithData.add(channel);

  • 注释: 发送信号!
    */
    if(availableChannels == 0) {
    // TODO_MA 注释: 如果之前队列中没有channel
    //,这个channel加入后,通知等待的线程
    inputChannelsWithData.notifyAll(); 2:50

以上线程会接到 steamTask启动时wait的 线程传到这个方法*
—— 》SingleInputGate.getChannel()
——主数据处理执行这个 》 StreamTaskNetworkInput.emitNext()
—— 输出》 OneInputStreamTask#StreamTaskNetworkOutput.emitRecord()