1. JobManager 端checkpoint调度
dispatcher分发任务后会启动相应的jobMaster, 在创建jobMaster 构建过程中会执行jobGraph -> executeGraph的转换,源码如下:
// JobMaster类
public JobMaster(
RpcService rpcService,
JobMasterConfiguration jobMasterConfiguration,
...)
throws Exception {
...
this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
this.schedulerNG = createScheduler(executionDeploymentTracker, jobManagerJobMetricGroup);
this.jobStatusListener = null;
...
}
// SchedulerBase类
public SchedulerBase(
final Logger log,
final JobGraph jobGraph,
final BackPressureStatsTracker backPressureStatsTracker,
...)
throws Exception {
...
this.executionGraph =
createAndRestoreExecutionGraph(
jobManagerJobMetricGroup,
checkNotNull(shuffleMaster),
checkNotNull(partitionTracker),
checkNotNull(executionDeploymentTracker),
initializationTimestamp);
...
}
private ExecutionGraph createExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
ShuffleMaster<?> shuffleMaster,
final JobMasterPartitionTracker partitionTracker,
ExecutionDeploymentTracker executionDeploymentTracker,
long initializationTimestamp)
throws JobExecutionException, JobException {
...
return ExecutionGraphBuilder.buildGraph(null,
jobGraph,
jobMasterConfiguration,
...);
...
}
createAndRestoreExecutionGraph()方法调用了createExecutionGraph()方法最终使用ExecutionGraphBuilder进行了ExecuteGraph的生成。
在构建ExecutionGraph过程中(ExecutionGraphBuilder.buildGraph()方法),会调用ExecutionGraph.enableCheckpointing()方法,这个方法不管任务里有没有设置checkpoint都会调用的。在enableCheckpointing()方法里会创建CheckpointCoordinator,这是负责checkpoint的核心实现类,同时会给job添加一个监听器CheckpointCoordinatorDeActivator(只有设置了checkpoint才会注册这个监听器),CheckpointCoordinatorDeActivator负责checkpoint的启动和停止。源码如下:
// ExecutionGraphBuilder类
public static ExecutionGraph buildGraph(
@Nullable ExecutionGraph prior,
JobGraph jobGraph,
Configuration jobManagerConfig,
...)
throws JobExecutionException, JobException {
...
// configure the state checkpointing
JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
if (snapshotSettings != null) {
List<ExecutionJobVertex> triggerVertices =
idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
List<ExecutionJobVertex> ackVertices =
idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);
List<ExecutionJobVertex> confirmVertices =
idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);
// 一系列的checkpoint设置,包括statebackend, user-define hook, checkpointIdCounter等
...
executionGraph.enableCheckpointing(
chkConfig,
triggerVertices,
ackVertices,
confirmVertices,
hooks,
checkpointIdCounter,
completedCheckpoints,
rootBackend,
checkpointStatsTracker);
...
}
在 build graph 时确定了 triggerVertices ( 用来触发 chekcpoint),ackVertices ( 用来接收 checkpoint 已经完成的报告 )以及 confirmVertices ( 用来确认 checkpoint 已经完成 )。
executionGraph.enableCheckpointing()中做了一些checkpoint相关类的初始化操作,以及checkpoint状态监听器的注册。在JobManager端开始进行任务调度的时候,会对job的状态进行转换,由CREATED转成RUNNING,实现在transitionState()方法中,在这个过程中刚才设置的job监听器CheckpointCoordinatorDeActivator就开始启动checkpoint的定时任务了,调用链为ExecutionGraph.transitionToRunning() -> transitionState() -> notifyJobStatusChange() -> CheckpointCoordinatorDeActivator.jobStatusChanges() -> CheckpointCoordinator.startCheckpointScheduler()源码如下:
public void transitionToRunning() {
if (!transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
throw new IllegalStateException(
"Job may only be scheduled from state " + JobStatus.CREATED);
}
}
private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) {
...
if (state == current) {
notifyJobStatusChange(newState, error);
return true;
}
...
}
private void notifyJobStatusChange(JobStatus newState, Throwable error) {
if (jobStatusListeners.size() > 0) {
final long timestamp = System.currentTimeMillis();
final Throwable serializedError = error == null ? null : new SerializedThrowable(error);
for (JobStatusListener listener : jobStatusListeners) {
try {
listener.jobStatusChanges(getJobID(), newState, timestamp, serializedError);
} catch (Throwable t) {
LOG.warn("Error while notifying JobStatusListener", t);
}
}
}
}
// CheckpointCoordinatorDeActivator类
@Override
public void jobStatusChanges(
JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
if (newJobStatus == JobStatus.RUNNING) {
// start the checkpoint scheduler
coordinator.startCheckpointScheduler();
} else {
// anything else should stop the trigger for now
coordinator.stopCheckpointScheduler();
}
}
CheckpointCoordinator会部署一个定时任务,用于周期性的触发checkpoint,这个定时任务就是ScheduledTrigger类。
public void startCheckpointScheduler() {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}
// make sure all prior timers are cancelled
stopCheckpointScheduler();
periodicScheduling = true;
currentPeriodicTrigger = scheduleTriggerWithDelay(getRandomInitDelay());
}
}
private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
return timer.scheduleAtFixedRate(
new ScheduledTrigger(), initDelay, baseInterval, TimeUnit.MILLISECONDS);
}
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
CheckpointProperties props,
@Nullable String externalSavepointLocation,
boolean isPeriodic) {
if (props.getCheckpointType().getPostCheckpointAction() == PostCheckpointAction.TERMINATE
&& !(props.isSynchronous() && props.isSavepoint())) {
return FutureUtils.completedExceptionally(
new IllegalArgumentException(
"Only synchronous savepoints are allowed to advance the watermark to MAX."));
}
CheckpointTriggerRequest request =
new CheckpointTriggerRequest(props, externalSavepointLocation, isPeriodic);
// 首先做一些前置校验,看是否能触发checkpoint,主要就是检查最大并发checkpoint数,checkpoint间隔时间
// 在积压(如果有)的checkpoint中选一个进行处理
chooseRequestToExecute(request).ifPresent(this::startTriggeringCheckpoint);
return request.onCompletionPromise;
}
private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {
try {
synchronized (lock) {
preCheckGlobalState(request.isPeriodic);
}
// 找出需要发送checkpoint消息的task(即tasksToTrigger,由生成JobGraph时生成,由所有不包含输入的顶点组成)放入executions
// Check if all tasks that we need to trigger are running. If not, abort the checkpoint.
final Execution[] executions = getTriggerExecutions();
// 找出需要返回checkpoint的ack反馈信息的task放入ackTasks,并将其作为构造PendingCheckpoint的参数
// Check if all tasks that need to acknowledge the checkpoint are running. If not, abort the checkpoint
final Map<ExecutionAttemptID, ExecutionVertex> ackTasks = getAckTasks();
// 创建PendingCheckpoint, 用户自定义hook触发...
...
// no exception, no discarding, everything is OK
final long checkpointId =
checkpoint.getCheckpointId();
snapshotTaskState(
timestamp,
checkpointId,
checkpoint.getCheckpointStorageLocation(),
request.props,
executions);
coordinatorsToCheckpoint.forEach(
(ctx) ->
ctx.afterSourceBarrierInjection(
checkpointId));
...
} catch (Throwable throwable) {
onTriggerFailure(request, throwable);
}
}
private void snapshotTaskState(
long timestamp,
long checkpointID,
CheckpointStorageLocation checkpointStorageLocation,
CheckpointProperties props,
Execution[] executions) {
...
// send the messages to the tasks that trigger their checkpoint
for (Execution execution : executions) {
// 两者底层调用的是同一个方法,只有语义上的区别
if (props.isSynchronous()) {
execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions);
} else {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}
}
Execution.triggerCheckpoint()就是远程调用TaskManager的triggerCheckpoint()方法:
private void triggerCheckpointHelper(
long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
...
final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
taskManagerGateway.triggerCheckpoint(
attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
} else {
LOG.debug(
"The execution has no slot assigned. This indicates that the execution is no longer running.");
}
}
2. SourceStreamTask的Checkpoint执行
TaskManager的triggerCheckpoint()方法首先获取到source task(即SourceStreamTask),调用Task.triggerCheckpointBarrier(),triggerCheckpointBarrier()会异步的去执行一个独立线程,这个线程来负责source task的checkpoint执行。
// TaskExecutor类
public CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
long checkpointId,
long checkpointTimestamp,
CheckpointOptions checkpointOptions) {
log.debug(
"Trigger checkpoint {}@{} for {}.",
checkpointId,
checkpointTimestamp,
executionAttemptID);
...
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
...
}
}
// Task类
public void triggerCheckpointBarrier(
final long checkpointID,
final long checkpointTimestamp,
final CheckpointOptions checkpointOptions) {
...
try {
// invokable 事实上就是 StreamTask 类,而 StreamTask 也将委托给更具体的类,直到业务代码
invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
}
...
}
由 invokable 调用 triggerCheckpoint。由于 trigger task 都是 source operator chain 所以进入 SourceStreamTask:
public Future<Boolean> triggerCheckpointAsync(
CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
if (!externallyInducedCheckpoints) {
return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
} else {
// we do not trigger checkpoints here, we simply state whether we can trigger them
synchronized (lock) {
return CompletableFuture.completedFuture(isRunning());
}
}
}
checkpoint的核心实现在StreamTask.performCheckpoint()方法中,该方法主要有三个步骤
1、在checkpoint之前做一些准备工作,通常情况下operator在这个阶段是不做什么操作的
2、立即向下游广播CheckpointBarrier,以便使下游的task能够及时的接收到CheckpointBarrier也开始进行checkpoint的操作
3、开始进行状态的快照,即checkpoint操作。
在进行performCheckpoint()时,task任务线程是不能够进行数据处理的, checkpoint和任务处理使用的是同一把锁:
public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
...
triggerCheckpointAsyncInMailbox(checkpointMetaData, checkpointOptions));
...
}
private boolean triggerCheckpointAsyncInMailbox(
CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions)
throws Exception {
...
subtaskCheckpointCoordinator.initCheckpoint(
checkpointMetaData.getCheckpointId(), checkpointOptions);
boolean success =
performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
if (!success) {
declineCheckpoint(checkpointMetaData.getCheckpointId());
}
return success;
...
}
// SubtaskCheckpointCoordinatorImpl类
public void checkpointState(
CheckpointMetaData metadata,
CheckpointOptions options,
CheckpointMetricsBuilder metrics,
OperatorChain<?, ?> operatorChain,
Supplier<Boolean> isRunning)
throws Exception {
//校验checkpoint是否需要终止
// Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint
// if necessary.
lastCheckpointId = metadata.getCheckpointId();
if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
// broadcast cancel checkpoint marker to avoid downstream back-pressure due to
// checkpoint barrier align.
operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
LOG.info(
"Checkpoint {} has been notified as aborted, would not trigger any checkpoint.",
metadata.getCheckpointId());
return;
}
// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
// The pre-barrier work should be nothing or minimal in the common case.
// 一般无逻辑
operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());
// Step (2): Send the checkpoint barrier downstream
// 封装优先级buffer后add到ResultSubpartition的PrioritizedDeque队列中,更新buffer和backlog数
// 当notifyDataAvailable=true时 通知下游消费
// 下游CheckpointedInputGate拿到buffer后匹配到是checkpoint事件做出相应动作
operatorChain.broadcastEvent(
new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),
options.isUnalignedCheckpoint());
// Step (3): Prepare to spill the in-flight buffers for input and output
// 对齐直接跳过
if (options.isUnalignedCheckpoint()) {
// output data already written while broadcasting event
channelStateWriter.finishOutput(metadata.getCheckpointId());
}
// Step (4): Take the state snapshot. This should be largely asynchronous, to not impact
// progress of the
// streaming topology
Map<OperatorID, OperatorSnapshotFutures> snapshotFutures =
new HashMap<>(operatorChain.getNumberOfOperators());
try {
// takeSnapshotSync 执行checkpoint核心逻辑的入口
if (takeSnapshotSync(
snapshotFutures, metadata, metrics, options, operatorChain, isRunning)) {
// finishAndReportAsync 完成snapshot后,向jobMaster发送报告
finishAndReportAsync(snapshotFutures, metadata, metrics, isRunning);
} else {
cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));
}
} catch (Exception ex) {
cleanup(snapshotFutures, metadata, metrics, ex);
throw ex;
}
}
private boolean takeSnapshotSync(
Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress,
CheckpointMetaData checkpointMetaData,
CheckpointMetricsBuilder checkpointMetrics,
CheckpointOptions checkpointOptions,
OperatorChain<?, ?> operatorChain,
Supplier<Boolean> isRunning)
throws Exception {
...
// 存储checkpoint的位置(Memory/FS/RockDB)
CheckpointStreamFactory storage =
checkpointStorage.resolveCheckpointStorageLocation(
checkpointId, checkpointOptions.getTargetLocation());
try {
for (StreamOperatorWrapper<?, ?> operatorWrapper :
operatorChain.getAllOperators(true)) {
if (!operatorWrapper.isClosed()) {
operatorSnapshotsInProgress.put(
operatorWrapper.getStreamOperator().getOperatorID(),
// 执行checkpoint入口
buildOperatorSnapshotFutures(
checkpointMetaData,
checkpointOptions,
operatorChain,
operatorWrapper.getStreamOperator(),
isRunning,
channelStateWriteResult,
storage));
}
}
} finally {
checkpointStorage.clearCacheFor(checkpointId);
}
...
}
//StreamOperatorStateHandler类
void snapshotState(
CheckpointedStreamOperator streamOperator,
Optional<InternalTimeServiceManager<?>> timeServiceManager,
String operatorName,
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory,
OperatorSnapshotFutures snapshotInProgress,
StateSnapshotContextSynchronousImpl snapshotContext,
boolean isUsingCustomRawKeyedState)
throws CheckpointException {
try {
...
//执行需要持久化state的操作
//比如map操作,它生成的是StreamMap属于AbstractUdfStreamOperator子类,里面封装了snapshotState逻辑,如果没实现ck接口就跳过此步骤
streamOperator.snapshotState(snapshotContext);
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(
snapshotContext.getOperatorStateStreamFuture());
if (null != operatorStateBackend) {
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(
checkpointId, timestamp, factory, checkpointOptions));
}
if (null != keyedStateBackend) {
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(
checkpointId, timestamp, factory, checkpointOptions));
}
}
...
}
如果用户实现了Checkpoint接口则会持久化到指定的stateBackend中反之略过...
这里以AbstractUdfStreamOperator为例(map,filter等Operator都继承了该abstract类):
// AbstractUdfStreamOperator类
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
//判断userFunction是否属于CheckpointedFunction或者ListCheckpointed的实例
//如果是则调用用户实现的snapshotState执行相关逻辑
//比如FlinkKafkaConsumerBase则自己实现了CheckpointedFunction的接口
StreamingFunctionUtils.snapshotFunctionState(
context, getOperatorStateBackend(), userFunction);
}
// StreamingFunctionUtils类
public static void snapshotFunctionState(
StateSnapshotContext context, OperatorStateBackend backend, Function userFunction)
throws Exception {
Preconditions.checkNotNull(context);
Preconditions.checkNotNull(backend);
while (true) {
// 校验用户是否有自定义checkpoint逻辑并执行用户自定义逻辑
if (trySnapshotFunctionState(context, backend, userFunction)) {
break;
}
// inspect if the user function is wrapped, then unwrap and try again if we can snapshot
// the inner function
if (userFunction instanceof WrappingFunction) {
userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
} else {
break;
}
}
}
private static boolean trySnapshotFunctionState(
StateSnapshotContext context, OperatorStateBackend backend, Function userFunction)
throws Exception {
// 判断用户是否实现CheckpointedFunction接口
if (userFunction instanceof CheckpointedFunction) {
// 执行用户自定义的snapshot逻辑
((CheckpointedFunction) userFunction).snapshotState(context);
return true;
}
if (userFunction instanceof ListCheckpointed) {
// ListCheckpointed已废弃不再多说
...
}
return false;
}
3. Task上报checkpoint信息
整个快照生成完毕,最后Flink会调用finishAndReportAsync向Master发送完成报告:
private void finishAndReportAsync(
Map<OperatorID, OperatorSnapshotFutures> snapshotFutures,
CheckpointMetaData metadata,
CheckpointMetricsBuilder metrics,
Supplier<Boolean> isRunning) {
// we are transferring ownership over snapshotInProgressList for cleanup to the thread,
// active on submit
asyncOperationsThreadPool.execute(
new AsyncCheckpointRunnable(
snapshotFutures,
metadata,
metrics,
System.nanoTime(),
taskName,
registerConsumer(),
unregisterConsumer(),
env,
asyncExceptionHandler,
isRunning));
}
最终会调用到CheckpointCoordinator.receiveAcknowledgeMessage()方法:
public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message, String taskManagerLocationInfo)
throws CheckpointException {
...
synchronized (lock) {
// we need to check inside the lock for being shutdown as well, otherwise we
// get races and invalid error log messages
if (shutdown) {
return false;
}
final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
if (checkpoint != null && !checkpoint.isDisposed()) {
switch (checkpoint.acknowledgeTask(
message.getTaskExecutionId(),
message.getSubtaskState(),
message.getCheckpointMetrics())) {
case SUCCESS:
LOG.debug(
"Received acknowledge message for checkpoint {} from task {} of job {} at {}.",
checkpointId,
message.getTaskExecutionId(),
message.getJob(),
taskManagerLocationInfo);
if (checkpoint.isFullyAcknowledged()) {
completePendingCheckpoint(checkpoint);
}
break;
...
}
...
}
...
}
}
JobManager完成所有task的ack之后,会做以下操作:
1、将PendingCheckpoint 转成CompletedCheckpoint,标志着checkpoint过程完成,CompletedCheckpoint里包含了checkpoint的元数据信息,包括checkpoint的路径地址,状态数据大小等等,同时也会将元数据信息进行持久化,也会把过期的checkpoint数据给删除
2、通知所有的task进行commit操作。
private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint){
...
// 生成CompeletePoint
completedCheckpoint =
pendingCheckpoint.finalizeCheckpoint(
checkpointsCleaner, this::scheduleTriggerRequest, executor);
// 持久化checkpoint到state backend
completedCheckpointStore.addCheckpoint(
completedCheckpoint, checkpointsCleaner, this::scheduleTriggerRequest);
...
// 通知taskcheckpoint完成
// send the "notify complete" call to all vertices, coordinators, etc.
sendAcknowledgeMessages(checkpointId, completedCheckpoint.getTimestamp());
}
private void sendAcknowledgeMessages(long checkpointId, long timestamp) {
// commit tasks
for (ExecutionVertex ev : tasksToCommitTo) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ee.notifyCheckpointComplete(checkpointId, timestamp);
}
}
// commit coordinators
for (OperatorCoordinatorCheckpointContext coordinatorContext : coordinatorsToCheckpoint) {
coordinatorContext.notifyCheckpointComplete(checkpointId);
}
}
4. JobManager通知Task进行commit
task在接收到消息之后会调用Task.notifyCheckpointComplete()方法,最后会调用StreamOperator.notifyCheckpointComplete(),一般来说不做什么操作。但是像AbstractUdfStreamOperator这种的可能还会由一些其他操作:
// TaskExecutor类
public CompletableFuture<Acknowledge> confirmCheckpoint(
ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) {
...
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
task.notifyCheckpointComplete(checkpointId);
return CompletableFuture.completedFuture(Acknowledge.get());
}
...
}
// Task类
public void notifyCheckpointComplete(final long checkpointID) {
// invokable 事实上就是 StreamTask 类,而 StreamTask 也将委托给更具体的类,直到业务代码
final AbstractInvokable invokable = this.invokable;
if (executionState == ExecutionState.RUNNING && invokable != null) {
try {
invokable.notifyCheckpointCompleteAsync(checkpointID);
}...
}
}
AbstractUdfStreamOperator主要是针对用户自定义函数的operator,像StreamMap,StreamSource等等,如果用户定义的Function实现了CheckpointListener接口,则会进行额外的一些处理,例如FlinkKafkaConsumerBase会向kafka提交消费的offset,TwoPhaseCommitSinkFunction类会进行事务的提交,例如FlinkKafkaProducer(此处有个注意点,在设置为exactly once后,kafka数据的提交依赖checkpoint的完成,如果kafkaconsumer的隔离等级设为read_committed,只有等到checkpoint完成后才能消费到数据,消费数据会有0-checkpoint_interval的延迟)。
5. 非SourceStreamTask的checkpoint实现
上述是source task的checkpoint实现,source task的checkpoint是由JobManager来触发的,source task会向下游广播发送CheckpointBarrier,那么下游的task就会接收到source task发送的CheckpointBarrier,checkpoint的起始位置也在接收到CheckpointBarrier。非SourceTask一直通过循环从上游读取消息,当接收一条消息后,会对消息类型进行判断,如果是CheckpointBarrier类型的消息则会进一步判断是需要对齐或是进行checkpoint。该逻辑在 CheckpointInputGate#pollNext()
方法中进行:
public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
Optional<BufferOrEvent> next = inputGate.pollNext();
if (!next.isPresent()) {
return handleEmptyBuffer();
}
BufferOrEvent bufferOrEvent = next.get();
if (bufferOrEvent.isEvent()) {
return handleEvent(bufferOrEvent);
} else if (bufferOrEvent.isBuffer()) {
barrierHandler.addProcessedBytes(bufferOrEvent.getBuffer().getSize());
}
return next;
}
private Optional<BufferOrEvent> handleEvent(BufferOrEvent bufferOrEvent)
throws IOException, InterruptedException {
Class<? extends AbstractEvent> eventClass = bufferOrEvent.getEvent().getClass();
if (eventClass == CheckpointBarrier.class) {
CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
// 处理barrier
barrierHandler.processBarrier(checkpointBarrier, bufferOrEvent.getChannelInfo());
}
...
}
// SingleCheckpointBarrierHandler类 barrier对齐(CheckpointBarrierTracker类是 at last once)
public void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelInfo)
throws IOException {
long barrierId = barrier.getId();
LOG.debug("{}: Received barrier from channel {} @ {}.", taskName, channelInfo, barrierId);
// barrier滞后,已经在处理新checkpoint或者checkpoint已经超时置为非pending状态,丢弃原有的,释放通道消费
if (currentCheckpointId > barrierId
|| (currentCheckpointId == barrierId && !isCheckpointPending())) {
controller.obsoleteBarrierReceived(channelInfo, barrier);
return;
}
// 当前checkpoint滞后,收到新checkpoint的barrier,开始新的checkpoint
if (currentCheckpointId < barrierId) {
if (isCheckpointPending()) {
// cancel 旧的checkpoint
cancelSubsumedCheckpoint(barrierId);
}
if (getNumOpenChannels() == 1) {
// 如果上游通道数只有一个,直接触发checkpoint
markAlignmentStartAndEnd(barrierId, barrier.getTimestamp());
} else {
// 上游有多个通道,开始对齐
markAlignmentStart(barrierId, barrier.getTimestamp());
}
currentCheckpointId = barrierId;
numBarriersReceived = 0;
allBarriersReceivedFuture = new CompletableFuture<>();
try {
// 首次收到barrier处理
if (controller.preProcessFirstBarrier(channelInfo, barrier)) {
LOG.debug(
"{}: Triggering checkpoint {} on the first barrier at {}.",
taskName,
barrier.getId(),
barrier.getTimestamp());
notifyCheckpoint(barrier);
}
} catch (CheckpointException e) {
abortInternal(barrier.getId(), e);
return;
}
}
// 接收barrier并阻塞相应的channel
controller.barrierReceived(channelInfo, barrier);
if (currentCheckpointId == barrierId) {
if (++numBarriersReceived == numOpenChannels) {
if (getNumOpenChannels() > 1) {
markAlignmentEnd();
}
numBarriersReceived = 0;
// 所有barrier均已到达,处理最后一个barrier
if (controller.postProcessLastBarrier(channelInfo, barrier)) {
LOG.debug(
"{}: Triggering checkpoint {} on the last barrier at {}.",
taskName,
barrier.getId(),
barrier.getTimestamp());
// 开始checkpoint,实际还是调用之前的StreamTask.performCheckpoint()方法,后续跟以上source checkpoint一致
notifyCheckpoint(barrier);
}
allBarriersReceivedFuture.complete(null);
}
}
}
总结: 总的来说checkpoint是通过job状态的变更来启动,接下来找到source task 进行ck,同时将barrier发送到下游算子通知他们开始自己的ck,算子完成后进行回调通知。过称还算比较清晰,细节没有细抠。以上仅仅对checkpoint的barrier过程做了一次简单的分析,具体的状态持久化过程没有涉及,也没有对非对齐barrier做解析,有兴趣的可以自己看看,后续可能会有相应的文章
参考文章: