0、Debug样例
Debug使用的算子样例,是后文debug截图的基础
DataGeneratorSource<Integer> src = new DataGeneratorSource<>(
RandomGenerator.intGenerator(1, 9),1);
DataStreamSource source = env.addSource(src, TypeInformation.of(Integer.TYPE));
source.map(x->x.toString())
.flatMap(new Tokenizer())
.returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}))
.keyBy(0)
.sum(1)
.print();
1、基本原理
Flink Checkpoint的基本原理是Chandy-Lamport Algorithm。
Flink为每个配置Checkpoint的作业生成一个CheckpointCoordinator,CheckpointCoordinator周期性地向source算子发送barrier,barrier在整个作业图中流转。当算子接收barrier时,会制作快照并存储,然后向CheckpointCoordinator报告。CheckpointCoordinator收到所有算子的报告后,认为本次checkpoint成功;若规定时间未收到所有报告,则认为失败。
2、配置Checkpoint
Flink 默认不开启Checkpoint,作业使用checkpoint需要在自己的业务代码中进行相关配置,基础启动配置如下,其他配置自行官网查看:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启 Checkpoint,每 1000毫秒进行一次 Checkpoint
env.enableCheckpointing(1000);
3、构建CheckpointCoordinator
CheckpointCoordinator构建是在JobManager构建executionGraph的时候,在ExecutionGraphBuilder.buildGraph当中,如下:
executionGraph.enableCheckpointing(
chkConfig,
triggerVertices,
ackVertices,
confirmVertices,
hooks,
checkpointIdCounter,
completedCheckpoints,
rootBackend,
checkpointStatsTracker);
executionGraph.enableCheckpointing中构建CheckpointCoordinator,如下:
checkpointCoordinator = new CheckpointCoordinator(
jobInformation.getJobId(),
chkConfig,
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
operatorCoordinators,
checkpointIDCounter,
checkpointStore,
checkpointStateBackend,
ioExecutor,
new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),
SharedStateRegistry.DEFAULT_FACTORY,
failureManager);
根据配置,是否需要启用checkpoint。判断条件即是checkpoint是否为-1,-1是默认值,即默认不开启
// interval of max long value indicates disable periodic checkpoint,
// the CheckpointActivatorDeactivator should be created only if the interval is not max value
if (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) {
// the periodic checkpoint scheduler is activated and deactivated as a result of
// job status changes (running -> on, all other states -> off)
registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
}
4、启动Checkpoint
checkpoint启停是在CheckpointCoordinatorDeActivator,CheckpointCoordinatorDeActivator是一个JobStatusListener的实现,状态变更时会触发。当状态变为RUNNING时,会开启checkpoint
public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
if (newJobStatus == JobStatus.RUNNING) {
// start the checkpoint scheduler
coordinator.startCheckpointScheduler();
} else {
// anything else should stop the trigger for now
coordinator.stopCheckpointScheduler();
}
}
最终触发到的是CheckpointCoordinator的startTriggeringCheckpoint方法,其关键内容如下
4.1、触发任务列表和确认任务列表获取
触发任务遍历tasksToTrigger列表,获取每个任务最新的Execution,如果存在非运行状态的Execution,即异常退出Checkpoint;否则将Execution列表返回。
确认任务类似,只是遍历的是tasksToWaitFor列表。
final Execution[] executions = getTriggerExecutions();
final Map<ExecutionAttemptID, ExecutionVertex> ackTasks = getAckTasks();
其中,Execution是ExecutionVertex的一次运行尝试(可能对应多个,即失败了之后可能产生新的);ExecutionVertex是物理图中的概念。
tasksToTrigger和tasksToWaitFor根本来源是jobVertices,差异在于,tasksToTrigger是输入任务的合集,tasksToWaitFor是所有任务的合集。(注意此处指的都是一个链任务,而不是一个算子)
for (JobVertex vertex : jobVertices.values()) {
if (vertex.isInputVertex()) {
triggerVertices.add(vertex.getID());
}
commitVertices.add(vertex.getID());
ackVertices.add(vertex.getID());
}
tasksToTrigger和tasksToWaitFor是CheckpointCoordinator的成员,debug如下
4.2、Checkpoint触发
此处采用CompletableFuture的异步链式调用方法,触发一个新的checkpoint过程,获取最终成功或失败的结果。
4.2.1、初始化设置
首先递增checkpoint ID
初始化checkpoint元数据目录,根据Backend的不同,使用相对应的方法初始化创建目录相关
CheckpointStorageLocation checkpointStorageLocation = props.isSavepoint() ?
checkpointStorage
.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
checkpointStorage.initializeLocationForCheckpoint(checkpointID);
4.2.2、创建待处理检查点
此处创建PendingCheckpoint,设置checkpoint的callback,并根据配置的超时时间,设置checkpoint的取消调度器
ScheduledFuture<?> cancellerHandle = timer.schedule(
new CheckpointCanceller(checkpoint),
checkpointTimeout, TimeUnit.MILLISECONDS);
4.2.3、Coordinator checkpoint
此处是针对OperatorCoordinator的checkpoint,OperatorCoordinator是Flink1.11新增的内容。
4.2.4、Master状态处理
此处是checkpoint框架变更后产生的bug处理步骤 https://issues.apache.org/jira/browse/FLINK-18641
4.2.5、开始checkpoint
先检查所有的前置状态,然后调用snapshotTaskState进行触发。
触发方式即对所有的输入任务调用触发,executions即前文中介绍的executions:
for (Execution execution: executions) {
if (props.isSynchronous()) {
execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
} else {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}
实际的操作在每个execution当中,关系函数为triggerCheckpointHelper,其中调用TaskManagerGateway的checkpoint触发
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
} else {
LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
}
5、TaskManager执行–Source
此处开始在TaskManager上执行,上一步TaskManagerGateway最终触发到TaskExecutor的triggerCheckpoint,之后触发任务执行checkpoint操作
此处首先进入source的处理,触发source的checkpoint处理流程
if (task != null) {
task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime);
return CompletableFuture.completedFuture(Acknowledge.get());
}
之后进入StreamTask的执行逻辑triggerCheckpointAsync
initCheckpoint:此处有对齐和非对齐的两种处理模式
执行checkpoint:实现位置为SubtaskCheckpointCoordinatorImpl的checkpointState方法
5.1、检查
检查触发的checkpoint是否最新
if (lastCheckpointId >= metadata.getCheckpointId()) {
LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId());
channelStateWriter.abort(metadata.getCheckpointId(), new CancellationException(), true);
checkAndClearAbortedStatus(metadata.getCheckpointId());
return;
}
5.2、清除中断的checkpoint
此步骤记录最新的checkpoint id,如果在abort列表,去除并且通知下游取消同步以防止背压
// Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint if necessary.
lastCheckpointId = metadata.getCheckpointId();
if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
// broadcast cancel checkpoint marker to avoid downstream back-pressure due to checkpoint barrier align.
operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId());
return;
}
5.3、Checkpoint预处理
此步骤遍历所有的operator并通知checkpoint准备,根据不同的算子实现,部分需要做工作,比如AbstractMapBundleOperator会把缓存的数据处理完
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
// go forward through the operator chain and tell each operator
// to prepare the checkpoint
for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators()) {
if (!operatorWrapper.isClosed()) {
operatorWrapper.getStreamOperator().prepareSnapshotPreBarrier(checkpointId);
}
}
}
5.4、向下游发送checkpoint
此步骤向该operator的所有输出边发送一个checkpoint事件
operatorChain.broadcastEvent(
new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),
options.isUnalignedCheckpoint());
public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
for (RecordWriterOutput<?> streamOutput : streamOutputs) {
streamOutput.broadcastEvent(event, isPriorityEvent);
}
}
5.5、溢出缓冲区
对于非对齐checkpoint,处理缓冲区
if (options.isUnalignedCheckpoint()) {
prepareInflightDataSnapshot(metadata.getCheckpointId());
}
5.6、快照制作
即制作当前步骤的checkpoint快照,此步骤通常是异步的,以防止干扰数据流。在SubtaskCheckpointCoordinatorImpl的takeSnapshotSync
Map<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators());
try {
if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isCanceled)) {
finishAndReportAsync(snapshotFutures, metadata, metrics, options);
} else {
cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));
}
} catch (Exception ex) {
cleanup(snapshotFutures, metadata, metrics, ex);
throw ex;
}
5.6.1、检查状态
存在关闭的operator即异常结束
for (final StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) {
if (operatorWrapper.isClosed()) {
env.declineCheckpoint(checkpointMetaData.getCheckpointId(),
new CheckpointException("Task Name" + taskName, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_CLOSING));
return false;
}
}
5.6.2、获取ChannelStateWriteResult
此处有对齐和非对齐的区别
ChannelStateWriteResult channelStateWriteResult = checkpointOptions.isUnalignedCheckpoint() ?
channelStateWriter.getAndRemoveWriteResult(checkpointId) :
ChannelStateWriteResult.EMPTY;
5.6.3、获取checkpoint位置
5.6.4、buildOperatorSnapshotFutures -》streamOperator.snapshotState
此步骤会进行checkpoint操作,连续调用到StreamOperatorStateHandler
KeyGroupRange:本任务需要做checkpoint的key的范围,基于并行度计算。无状态的为空。计算方式如下
int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);
int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
return new KeyGroupRange(start, end);
自定义状态处理
streamOperator.snapshotState(snapshotContext);
/**
* Custom state handling hooks to be invoked by {@link StreamOperatorStateHandler}.
*/
public interface CheckpointedStreamOperator {
void initializeState(StateInitializationContext context) throws Exception;
void snapshotState(StateSnapshotContext context) throws Exception;
}
处理State元数据
首先检测State数量,内存存储的数量有限,为short最大值
int numStates = registeredKVStates.size() + registeredPQStates.size();
Preconditions.checkState(numStates <= Short.MAX_VALUE,
"Too many states: " + numStates +
". Currently at most " + Short.MAX_VALUE + " states are supported");
记录元数据进内存
processSnapshotMetaInfoForAllStates(
metaInfoSnapshots,
cowStateStableSnapshots,
stateNamesToId,
registeredKVStates,
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);
processSnapshotMetaInfoForAllStates(
metaInfoSnapshots,
cowStateStableSnapshots,
stateNamesToId,
registeredPQStates,
StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
设置序列化代理
final KeyedBackendSerializationProxy<K> serializationProxy =
new KeyedBackendSerializationProxy<>(
// TODO: this code assumes that writing a serializer is threadsafe, we should support to
// get a serialized form already at state registration time in the future
getKeySerializer(),
metaInfoSnapshots,
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, keyGroupCompressionDecorator));
状态数据写内存,主要使用DataOutputViewStreamWrapper和KeyedBackendSerializationProxy
final DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(localStream);
serializationProxy.write(outView);
public void write(DataOutputView out) throws IOException {
super.write(out);
// write the compression format used to write each key-group
out.writeBoolean(usingKeyGroupCompression);
TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(out, keySerializerSnapshot, keySerializer);
// write individual registered keyed state metainfos
out.writeShort(stateMetaInfoSnapshots.size());
for (StateMetaInfoSnapshot metaInfoSnapshot : stateMetaInfoSnapshots) {
StateMetaInfoSnapshotReadersWriters.getWriter().writeStateMetaInfoSnapshot(metaInfoSnapshot, out);
}
}
CurrentWriterImpl实际对快照进行输出的地方,输出序列化的快照,会写State的数据信息
StateMetaInfoSnapshotReadersWriters.getWriter().writeStateMetaInfoSnapshot(metaInfoSnapshot, out);
for (Map.Entry<String, String> entry : optionsMap.entrySet()) {
outputView.writeUTF(entry.getKey());
outputView.writeUTF(entry.getValue());
}
压缩方式写数据
OutputStream kgCompressionOut =
keyGroupCompressionDecorator.decorateWithCompression(localStream)) {
DataOutputViewStreamWrapper kgCompressionView =
new DataOutputViewStreamWrapper(kgCompressionOut);
kgCompressionView.writeShort(stateNamesToId.get(stateSnapshot.getKey()));
partitionedSnapshot.writeStateInKeyGroup(kgCompressionView, keyGroupId);
operatorStateBackend、keyedStateBackend
两个类型单独进行快照处理,根据backend的不同,会有不同的处理
CheckpointStreamFactory
不同的backend有不同的输出类,构建不同的CheckpointStateOutputStream
CurrentWriterImpl
实际对快照进行输出的地方,输出序列化的快照
6、TaskManager执行–StreamTask流任务处理
6.1、CheckpointedInputGate
对于InputStreamTask,如OneInputStreamTask,初始化时会创建CheckpointedInputGate并将其和数据输入合并。以OneInputStreamTask为例,如下:
public void init() throws Exception {
StreamConfig configuration = getConfiguration();
int numberOfInputs = configuration.getNumberOfInputs();
if (numberOfInputs > 0) {
CheckpointedInputGate inputGate = createCheckpointedInputGate();
DataOutput<IN> output = createDataOutput();
StreamTaskInput<IN> input = createTaskInput(inputGate, output);
inputProcessor = new StreamOneInputProcessor<>(
input,
output,
operatorChain);
}
根据一致性的不同,对checkpoint会使用不同的Handler进行处理,Handler的选择也在这个输入合并的初始化过程当中:
switch (config.getCheckpointMode()) {
case EXACTLY_ONCE:
if (config.isUnalignedCheckpointsEnabled()) {
return new AlternatingCheckpointBarrierHandler(
new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, inputGates),
new CheckpointBarrierUnaligner(checkpointCoordinator, taskName, toNotifyOnCheckpoint, inputGates),
toNotifyOnCheckpoint);
}
return new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, inputGates);
case AT_LEAST_ONCE:
if (config.isUnalignedCheckpointsEnabled()) {
throw new IllegalStateException("Cannot use unaligned checkpoints with AT_LEAST_ONCE " +
"checkpointing mode");
}
int numInputChannels = Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels).sum();
return new CheckpointBarrierTracker(numInputChannels, toNotifyOnCheckpoint);
default:
throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + config.getCheckpointMode());
}