Flink源码分析-Checkpoint
Keyword
- Checkpoint
- Snapshot
- Barrier
- State
- Savepoint
CheckpointCoordinator定时checkpoint
CheckpointCoordinator会启动一个定时任务触发checkpoint
JobMaster启动时,会调用
Execution.scheduleForExecution
切换到Running状态时,启动Checkpoint定时任务当
CheckpointCoordinatorDeActivator.jobStatusChanges
被调用时,CheckpointCoordinator.startCheckpointScheduler
开启CheckpointCoordinator定时任务CheckpointCoordinatorDeActivator.triggerCheckpoint
会被定时触发checkpointID = checkpointIdCounter.getAndIncrement();
递增生成ID给checkpoint设置超时时间, 超时之后退出此次checkpoint,取消之前的调度任务,重新调度checkpoint
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// schedule the timer that will clean up the expired checkpoints
final Runnable canceller = () -> {
synchronized (lock) {
// only do the work if the checkpoint is not discarded anyways
// note that checkpoint completion discards the pending checkpoint object
if (!checkpoint.isDiscarded()) {
LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);
failPendingCheckpoint(checkpoint, CheckpointFailureReason.CHECKPOINT_EXPIRED);
pendingCheckpoints.remove(checkpointID);
rememberRecentCheckpointId(checkpointID);
// 取消之前的调度任务,重新调度checkpoint
triggerQueuedRequests();
}
}
};触发source算子checkpoint
1
2
3
4
5
6
7
8// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
if (props.isSynchronous()) {
execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
} else {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}通过Rpc TaskExecutorGateway.triggerCheckpoint 执行 StreamTask.performCheckpoint 并且生成初始的CheckpointBarrier
下游在收到CheckpointBarrier后,会执行StreamTask.triggerCheckpointOnBarrier执行StreamTask.performCheckpoint 并且继续向下游发送CheckpointBarrier
StreamTask.performCheckpoint最终会通过CheckpointingOperation.executeCheckpointing来执行实际的checkpoint
下游算子收到CheckpointBarrier的过程
StreamOneInputProcessor.processInput 获取下一个StreamElement
StreamTaskNetworkInput.pollNextNullable 调用 CheckpointedInputGate.pollNext来获取下一个BufferOrEvent
CheckpointedInputGate.pollNext,如果当前的buffer不为空,从buffer取BufferOrEvent;否则从InputGate取
判断当前BufferOrEvent是否处于阻塞状态,如果处于阻塞状态,将BufferOrEvent添加到buffer中
如果为BufferOrEvent为Buffer直接返回,buffer表示上游发的正常内容
如果为CheckpointBarrier, 调用CheckpointBarrierAligner.processBarrier处理checkpoint事件
CheckpointBarrierAligner.processBarrier中,如果当前算子的上游只有一个的话,会直接触发notifyCheckpoint
对barrier事件的处理又可以分为以下几种情况:
- barrier为新的事件,则会开启对应通道的对齐beginNewAlignment操作,此操作会阻塞对应channel的元素处理逻辑
- 如果当前算子已经收到的有barrier,判断barrier是否等于算子当前正在处理的currentCheckpointId, 如果相等,则执行阻塞操作onBarrier
- 如果barrierId > currentCheckpointId,说明currentCheckpointId出现异常。上报异常,并且充值算子checkpoint状态
- 如果barrierId < currentCheckpointId 不做任何处理
如果收到所有的barrier,开始执行CheckpointBarrierHandler.notifyCheckpoint
StreamTask.triggerCheckpointOnBarrier
StreamTask.performCheckpoint 继续向下游发送
operatorChain.broadcastCheckpointBarrier
checkpoint barrier广播,执行快照checkpointState()
Snapshot操作
CheckpointingOperation.executeCheckpointing 会对当前StreamTask相关联的所有Operator挨个执行CheckpointingOperation.checkpointStreamOperator
AbstractStreamOperator.snapshotState会对以下几种类型的状态都执行快照
- 用户算子状态
- managed keyedState
- raw keystate
snapshotContext.getKeyedStateStreamFuture()
- managed operator state
- raw operator state
snapshotContext.getOperatorStateStreamFuture()
AbstractUdfStreamOperator.snapshotState调用StreamingFunctionUtils.snapshotFunctionState对UserFunction执行快照操作
StreamingFunctionUtils.trySnapshotFunctionState中会实际调用自定义的snapshotState方法
自定义的方法只有实现了CheckpointedFunction 或者 ListCheckpointed 才能执行snapshot操作
Task发送Checkpoint Ack
Task在完成当前的快照之后会向JobMaster的CheckpointCoordinator回复一条Ack消息
StreamTask.executeCheckpointing 调用当前Task的所有Operator的snapshotState方法
执行异步操作AsyncCheckpointRunnable
等待snapshot结果
完成之后,调用reportCompletedSnapshotStates,获取
TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();
taskStateManager.reportTaskStateSnapshots
存储localStateStore.storeLocalState存储localstate
RpcCheckpointResponder.acknowledgeCheckpoint通过Rpc向CheckpointCoordinator发送ack消息表示本次checkpoint操作完成
CheckpointCoordinator响应Ack消息
CheckpointCoordinatorGateway的实现类为JobMaster
task在完成checkpoint之后通过rpc方式调用JobMaster.acknowledgeCheckpoint发送ack
JobMaster调用SchedulerNG.acknowledgeCheckpoint
ExecutionGraph.getCheckpointCoordinator()
获取对应的CheckpointCoordinator将CheckpointCoordinator对这条消息的处理放入到一个线程池中进行处理
1
2
3
4
5
6
7ioExecutor.execute(() -> {
try {
checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocationInfo);
} catch (Throwable t) {
log.warn("Error while processing checkpoint acknowledgement message", t);
}
});CheckpointCoordinator会对ack消息做以下校验
PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
- checkpoint存在并且没被丢弃,执行正常的处理逻辑
checkpoint.acknowledgeTask
- 存在但是被丢弃,抛出异常
- 不存在,执行丢弃
discardSubtaskState()
- checkpoint存在并且没被丢弃,执行正常的处理逻辑
PendingCheckpoint分别保存了
notYetAcknowledgedTasks
和acknowledgedTasks
PendingCheckpoint.acknowledgeTask处理收到的ack消息,上报checkpoint统计消息。并且更新内部状态
notYetAcknowledgedTasks
未收到ack的所有sub taskacknowledgedTasks
已收到的taskoperatorStates
Operator状态
PendingCheckpoint.acknowledgeTask根据checkpointId返回三种状态,
SUCCESS
DUPLICATE
UNKNOWN
DISCARDED
当ack为SUCCESS
时
1 | // 收到全部的ack消息 |
CompletedCheckpoint completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
checkpoint在切换状态时
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31try {
// 生成savepoint,包含了本次checkpoint的所有状态
// write out the metadata
final Savepoint savepoint = new SavepointV2(checkpointId, operatorStates.values(), masterState);
final CompletedCheckpointStorageLocation finalizedLocation;
// 写到statebackend
try (CheckpointMetadataOutputStream out = targetLocation.createMetadataOutputStream()) {
Checkpoints.storeCheckpointMetadata(savepoint, out);
finalizedLocation = out.closeAndFinalizeCheckpoint();
}
CompletedCheckpoint completed = new CompletedCheckpoint();
...
// 切换状态
// mark this pending checkpoint as disposed, but do NOT drop the state
dispose(false);
return completed;
}
```
3. 调用`triggerQueuedRequests()`来触发下一次的checkpoint
4. `dropSubsumedCheckpoints(checkpointId);` 丢弃之前的pending状态的checkpoint
5. Execution.TaskManagerGateway.notifyCheckpointComplete通知所有Task本次checkpoint完成
```
for (ExecutionVertex ev : tasksToCommitTo) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ee.notifyCheckpointComplete(checkpointId, timestamp);
}
}
监听Checkpoint完成
当operator收到checkpoint完成事件之后,就可以做一些额外操作,例如:
- Kafka提交offset
- TwoPhaseCommitSinkFunction