淘先锋技术网

首页 1 2 3 4 5 6 7

0、Debug样例

  Debug使用的算子样例,是后文debug截图的基础

DataGeneratorSource<Integer> src = new DataGeneratorSource<>(
        RandomGenerator.intGenerator(1, 9),1);

DataStreamSource source = env.addSource(src, TypeInformation.of(Integer.TYPE));
source.map(x->x.toString())
        .flatMap(new Tokenizer())
        .returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}))
        .keyBy(0)
        .sum(1)
        .print();

1、基本原理

  Flink Checkpoint的基本原理是Chandy-Lamport Algorithm。
  Flink为每个配置Checkpoint的作业生成一个CheckpointCoordinator,CheckpointCoordinator周期性地向source算子发送barrier,barrier在整个作业图中流转。当算子接收barrier时,会制作快照并存储,然后向CheckpointCoordinator报告。CheckpointCoordinator收到所有算子的报告后,认为本次checkpoint成功;若规定时间未收到所有报告,则认为失败。

2、配置Checkpoint

  Flink 默认不开启Checkpoint,作业使用checkpoint需要在自己的业务代码中进行相关配置,基础启动配置如下,其他配置自行官网查看:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 开启 Checkpoint,每 1000毫秒进行一次 Checkpoint
env.enableCheckpointing(1000);

3、构建CheckpointCoordinator

  CheckpointCoordinator构建是在JobManager构建executionGraph的时候,在ExecutionGraphBuilder.buildGraph当中,如下:

executionGraph.enableCheckpointing(
   chkConfig,
   triggerVertices,
   ackVertices,
   confirmVertices,
   hooks,
   checkpointIdCounter,
   completedCheckpoints,
   rootBackend,
   checkpointStatsTracker);

  executionGraph.enableCheckpointing中构建CheckpointCoordinator,如下:

checkpointCoordinator = new CheckpointCoordinator(
   jobInformation.getJobId(),
   chkConfig,
   tasksToTrigger,
   tasksToWaitFor,
   tasksToCommitTo,
   operatorCoordinators,
   checkpointIDCounter,
   checkpointStore,
   checkpointStateBackend,
   ioExecutor,
   new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),
   SharedStateRegistry.DEFAULT_FACTORY,
   failureManager);

  根据配置,是否需要启用checkpoint。判断条件即是checkpoint是否为-1,-1是默认值,即默认不开启

// interval of max long value indicates disable periodic checkpoint,
// the CheckpointActivatorDeactivator should be created only if the interval is not max value
if (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) {
   // the periodic checkpoint scheduler is activated and deactivated as a result of
   // job status changes (running -> on, all other states -> off)
   registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
}

4、启动Checkpoint

  checkpoint启停是在CheckpointCoordinatorDeActivator,CheckpointCoordinatorDeActivator是一个JobStatusListener的实现,状态变更时会触发。当状态变为RUNNING时,会开启checkpoint

public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
   if (newJobStatus == JobStatus.RUNNING) {
      // start the checkpoint scheduler
      coordinator.startCheckpointScheduler();
   } else {
      // anything else should stop the trigger for now
      coordinator.stopCheckpointScheduler();
   }
}

  最终触发到的是CheckpointCoordinator的startTriggeringCheckpoint方法,其关键内容如下

4.1、触发任务列表和确认任务列表获取

  触发任务遍历tasksToTrigger列表,获取每个任务最新的Execution,如果存在非运行状态的Execution,即异常退出Checkpoint;否则将Execution列表返回。
  确认任务类似,只是遍历的是tasksToWaitFor列表。

final Execution[] executions = getTriggerExecutions();
final Map<ExecutionAttemptID, ExecutionVertex> ackTasks = getAckTasks();

  其中,Execution是ExecutionVertex的一次运行尝试(可能对应多个,即失败了之后可能产生新的);ExecutionVertex是物理图中的概念。
  tasksToTrigger和tasksToWaitFor根本来源是jobVertices,差异在于,tasksToTrigger是输入任务的合集,tasksToWaitFor是所有任务的合集。(注意此处指的都是一个链任务,而不是一个算子)

for (JobVertex vertex : jobVertices.values()) {
   if (vertex.isInputVertex()) {
      triggerVertices.add(vertex.getID());
   }
   commitVertices.add(vertex.getID());
   ackVertices.add(vertex.getID());
}

  tasksToTrigger和tasksToWaitFor是CheckpointCoordinator的成员,debug如下

在这里插入图片描述

4.2、Checkpoint触发

  此处采用CompletableFuture的异步链式调用方法,触发一个新的checkpoint过程,获取最终成功或失败的结果。

4.2.1、初始化设置

  首先递增checkpoint ID

  初始化checkpoint元数据目录,根据Backend的不同,使用相对应的方法初始化创建目录相关

CheckpointStorageLocation checkpointStorageLocation = props.isSavepoint() ?
   checkpointStorage
      .initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
   checkpointStorage.initializeLocationForCheckpoint(checkpointID);

4.2.2、创建待处理检查点

  此处创建PendingCheckpoint,设置checkpoint的callback,并根据配置的超时时间,设置checkpoint的取消调度器

ScheduledFuture<?> cancellerHandle = timer.schedule(
   new CheckpointCanceller(checkpoint),
   checkpointTimeout, TimeUnit.MILLISECONDS);

4.2.3、Coordinator checkpoint

  此处是针对OperatorCoordinator的checkpoint,OperatorCoordinator是Flink1.11新增的内容。

4.2.4、Master状态处理

  此处是checkpoint框架变更后产生的bug处理步骤 https://issues.apache.org/jira/browse/FLINK-18641

4.2.5、开始checkpoint

  先检查所有的前置状态,然后调用snapshotTaskState进行触发。

  触发方式即对所有的输入任务调用触发,executions即前文中介绍的executions:

for (Execution execution: executions) {
   if (props.isSynchronous()) {
      execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
   } else {
      execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
   }
}

  实际的操作在每个execution当中,关系函数为triggerCheckpointHelper,其中调用TaskManagerGateway的checkpoint触发

if (slot != null) {
   final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

   taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
} else {
   LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
}

5、TaskManager执行–Source

  此处开始在TaskManager上执行,上一步TaskManagerGateway最终触发到TaskExecutor的triggerCheckpoint,之后触发任务执行checkpoint操作

  此处首先进入source的处理,触发source的checkpoint处理流程

if (task != null) {
   task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime);

   return CompletableFuture.completedFuture(Acknowledge.get());
}

  之后进入StreamTask的执行逻辑triggerCheckpointAsync

  initCheckpoint:此处有对齐和非对齐的两种处理模式

  执行checkpoint:实现位置为SubtaskCheckpointCoordinatorImpl的checkpointState方法

5.1、检查

  检查触发的checkpoint是否最新

if (lastCheckpointId >= metadata.getCheckpointId()) {
   LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId());
   channelStateWriter.abort(metadata.getCheckpointId(), new CancellationException(), true);
   checkAndClearAbortedStatus(metadata.getCheckpointId());
   return;
}

5.2、清除中断的checkpoint

  此步骤记录最新的checkpoint id,如果在abort列表,去除并且通知下游取消同步以防止背压

// Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint if necessary.
lastCheckpointId = metadata.getCheckpointId();
if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
   // broadcast cancel checkpoint marker to avoid downstream back-pressure due to checkpoint barrier align.
   operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
   LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId());
   return;
}

5.3、Checkpoint预处理

  此步骤遍历所有的operator并通知checkpoint准备,根据不同的算子实现,部分需要做工作,比如AbstractMapBundleOperator会把缓存的数据处理完

public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
   // go forward through the operator chain and tell each operator
   // to prepare the checkpoint
   for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators()) {
      if (!operatorWrapper.isClosed()) {
         operatorWrapper.getStreamOperator().prepareSnapshotPreBarrier(checkpointId);
      }
   }
}

5.4、向下游发送checkpoint

  此步骤向该operator的所有输出边发送一个checkpoint事件

operatorChain.broadcastEvent(
   new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),
   options.isUnalignedCheckpoint());
   
public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
   for (RecordWriterOutput<?> streamOutput : streamOutputs) {
      streamOutput.broadcastEvent(event, isPriorityEvent);
   }
}

5.5、溢出缓冲区

  对于非对齐checkpoint,处理缓冲区

if (options.isUnalignedCheckpoint()) {
   prepareInflightDataSnapshot(metadata.getCheckpointId());
}

5.6、快照制作

  即制作当前步骤的checkpoint快照,此步骤通常是异步的,以防止干扰数据流。在SubtaskCheckpointCoordinatorImpl的takeSnapshotSync

Map<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators());
try {
   if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isCanceled)) {
      finishAndReportAsync(snapshotFutures, metadata, metrics, options);
   } else {
      cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));
   }
} catch (Exception ex) {
   cleanup(snapshotFutures, metadata, metrics, ex);
   throw ex;
}

5.6.1、检查状态

  存在关闭的operator即异常结束

for (final StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) {
   if (operatorWrapper.isClosed()) {
      env.declineCheckpoint(checkpointMetaData.getCheckpointId(),
         new CheckpointException("Task Name" + taskName, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_CLOSING));
      return false;
   }
}

5.6.2、获取ChannelStateWriteResult

  此处有对齐和非对齐的区别

ChannelStateWriteResult channelStateWriteResult = checkpointOptions.isUnalignedCheckpoint() ?
                  channelStateWriter.getAndRemoveWriteResult(checkpointId) :
                  ChannelStateWriteResult.EMPTY;

5.6.3、获取checkpoint位置

5.6.4、buildOperatorSnapshotFutures -》streamOperator.snapshotState

  此步骤会进行checkpoint操作,连续调用到StreamOperatorStateHandler

  KeyGroupRange:本任务需要做checkpoint的key的范围,基于并行度计算。无状态的为空。计算方式如下

int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);
int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
return new KeyGroupRange(start, end);

  自定义状态处理

streamOperator.snapshotState(snapshotContext);

/**
 * Custom state handling hooks to be invoked by {@link StreamOperatorStateHandler}.
 */
public interface CheckpointedStreamOperator {
   void initializeState(StateInitializationContext context) throws Exception;

   void snapshotState(StateSnapshotContext context) throws Exception;
}

  处理State元数据

  首先检测State数量,内存存储的数量有限,为short最大值

int numStates = registeredKVStates.size() + registeredPQStates.size();

Preconditions.checkState(numStates <= Short.MAX_VALUE,
   "Too many states: " + numStates +
      ". Currently at most " + Short.MAX_VALUE + " states are supported");

  记录元数据进内存

processSnapshotMetaInfoForAllStates(
   metaInfoSnapshots,
   cowStateStableSnapshots,
   stateNamesToId,
   registeredKVStates,
   StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);

processSnapshotMetaInfoForAllStates(
   metaInfoSnapshots,
   cowStateStableSnapshots,
   stateNamesToId,
   registeredPQStates,
   StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);

  设置序列化代理

final KeyedBackendSerializationProxy<K> serializationProxy =
   new KeyedBackendSerializationProxy<>(
      // TODO: this code assumes that writing a serializer is threadsafe, we should support to
      // get a serialized form already at state registration time in the future
      getKeySerializer(),
      metaInfoSnapshots,
      !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, keyGroupCompressionDecorator));

  状态数据写内存,主要使用DataOutputViewStreamWrapper和KeyedBackendSerializationProxy

final DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(localStream);
serializationProxy.write(outView);

public void write(DataOutputView out) throws IOException {
   super.write(out);

   // write the compression format used to write each key-group
   out.writeBoolean(usingKeyGroupCompression);

   TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(out, keySerializerSnapshot, keySerializer);

   // write individual registered keyed state metainfos
   out.writeShort(stateMetaInfoSnapshots.size());
   for (StateMetaInfoSnapshot metaInfoSnapshot : stateMetaInfoSnapshots) {
      StateMetaInfoSnapshotReadersWriters.getWriter().writeStateMetaInfoSnapshot(metaInfoSnapshot, out);
   }
}

  CurrentWriterImpl实际对快照进行输出的地方,输出序列化的快照,会写State的数据信息

StateMetaInfoSnapshotReadersWriters.getWriter().writeStateMetaInfoSnapshot(metaInfoSnapshot, out);

for (Map.Entry<String, String> entry : optionsMap.entrySet()) {
   outputView.writeUTF(entry.getKey());
   outputView.writeUTF(entry.getValue());
}

  压缩方式写数据

OutputStream kgCompressionOut =
   keyGroupCompressionDecorator.decorateWithCompression(localStream)) {
DataOutputViewStreamWrapper kgCompressionView =
   new DataOutputViewStreamWrapper(kgCompressionOut);
kgCompressionView.writeShort(stateNamesToId.get(stateSnapshot.getKey()));
partitionedSnapshot.writeStateInKeyGroup(kgCompressionView, keyGroupId);

  operatorStateBackend、keyedStateBackend

   两个类型单独进行快照处理,根据backend的不同,会有不同的处理

  CheckpointStreamFactory

  不同的backend有不同的输出类,构建不同的CheckpointStateOutputStream

  CurrentWriterImpl

  实际对快照进行输出的地方,输出序列化的快照

6、TaskManager执行–StreamTask流任务处理

6.1、CheckpointedInputGate

  对于InputStreamTask,如OneInputStreamTask,初始化时会创建CheckpointedInputGate并将其和数据输入合并。以OneInputStreamTask为例,如下:

public void init() throws Exception {
   StreamConfig configuration = getConfiguration();
   int numberOfInputs = configuration.getNumberOfInputs();

   if (numberOfInputs > 0) {
      CheckpointedInputGate inputGate = createCheckpointedInputGate();
      DataOutput<IN> output = createDataOutput();
      StreamTaskInput<IN> input = createTaskInput(inputGate, output);
      inputProcessor = new StreamOneInputProcessor<>(
         input,
         output,
         operatorChain);
   }

  根据一致性的不同,对checkpoint会使用不同的Handler进行处理,Handler的选择也在这个输入合并的初始化过程当中:

switch (config.getCheckpointMode()) {
   case EXACTLY_ONCE:
      if (config.isUnalignedCheckpointsEnabled()) {
         return new AlternatingCheckpointBarrierHandler(
            new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, inputGates),
            new CheckpointBarrierUnaligner(checkpointCoordinator, taskName, toNotifyOnCheckpoint, inputGates),
            toNotifyOnCheckpoint);
      }
      return new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, inputGates);
   case AT_LEAST_ONCE:
      if (config.isUnalignedCheckpointsEnabled()) {
         throw new IllegalStateException("Cannot use unaligned checkpoints with AT_LEAST_ONCE " +
            "checkpointing mode");
      }
      int numInputChannels = Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels).sum();
      return new CheckpointBarrierTracker(numInputChannels, toNotifyOnCheckpoint);
   default:
      throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + config.getCheckpointMode());
}