Flink源码分析-Checkpoint

Keyword

  • Checkpoint
  • Snapshot
  • Barrier
  • State
  • Savepoint

CheckpointCoordinator定时checkpoint

CheckpointCoordinator会启动一个定时任务触发checkpoint

  1. JobMaster启动时,会调用Execution.scheduleForExecution 切换到Running状态时,启动Checkpoint定时任务

  2. CheckpointCoordinatorDeActivator.jobStatusChanges被调用时,CheckpointCoordinator.startCheckpointScheduler开启CheckpointCoordinator定时任务

  3. CheckpointCoordinatorDeActivator.triggerCheckpoint会被定时触发

  4. checkpointID = checkpointIdCounter.getAndIncrement();递增生成ID

  5. 给checkpoint设置超时时间, 超时之后退出此次checkpoint,取消之前的调度任务,重新调度checkpoint

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    	// schedule the timer that will clean up the expired checkpoints
    final Runnable canceller = () -> {
    synchronized (lock) {
    // only do the work if the checkpoint is not discarded anyways
    // note that checkpoint completion discards the pending checkpoint object
    if (!checkpoint.isDiscarded()) {
    LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);

    failPendingCheckpoint(checkpoint, CheckpointFailureReason.CHECKPOINT_EXPIRED);
    pendingCheckpoints.remove(checkpointID);
    rememberRecentCheckpointId(checkpointID);
    // 取消之前的调度任务,重新调度checkpoint
    triggerQueuedRequests();
    }
    }
    };
  6. 触发source算子checkpoint

    1
    2
    3
    4
    5
    6
    7
    8
    // send the messages to the tasks that trigger their checkpoint
    for (Execution execution: executions) {
    if (props.isSynchronous()) {
    execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
    } else {
    execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
    }
    }
  7. 通过Rpc TaskExecutorGateway.triggerCheckpoint 执行 StreamTask.performCheckpoint 并且生成初始的CheckpointBarrier

  8. 下游在收到CheckpointBarrier后,会执行StreamTask.triggerCheckpointOnBarrier执行StreamTask.performCheckpoint 并且继续向下游发送CheckpointBarrier

  9. StreamTask.performCheckpoint最终会通过CheckpointingOperation.executeCheckpointing来执行实际的checkpoint

下游算子收到CheckpointBarrier的过程

  1. StreamOneInputProcessor.processInput 获取下一个StreamElement

  2. StreamTaskNetworkInput.pollNextNullable 调用 CheckpointedInputGate.pollNext来获取下一个BufferOrEvent

  3. CheckpointedInputGate.pollNext,如果当前的buffer不为空,从buffer取BufferOrEvent;否则从InputGate取

  4. 判断当前BufferOrEvent是否处于阻塞状态,如果处于阻塞状态,将BufferOrEvent添加到buffer中

  5. 如果为BufferOrEvent为Buffer直接返回,buffer表示上游发的正常内容

  6. 如果为CheckpointBarrier, 调用CheckpointBarrierAligner.processBarrier处理checkpoint事件

  7. CheckpointBarrierAligner.processBarrier中,如果当前算子的上游只有一个的话,会直接触发notifyCheckpoint

  8. 对barrier事件的处理又可以分为以下几种情况:

    1. barrier为新的事件,则会开启对应通道的对齐beginNewAlignment操作,此操作会阻塞对应channel的元素处理逻辑
    2. 如果当前算子已经收到的有barrier,判断barrier是否等于算子当前正在处理的currentCheckpointId, 如果相等,则执行阻塞操作onBarrier
    3. 如果barrierId > currentCheckpointId,说明currentCheckpointId出现异常。上报异常,并且充值算子checkpoint状态
    4. 如果barrierId < currentCheckpointId 不做任何处理
  9. 如果收到所有的barrier,开始执行CheckpointBarrierHandler.notifyCheckpoint

  10. StreamTask.triggerCheckpointOnBarrier

  11. StreamTask.performCheckpoint 继续向下游发送operatorChain.broadcastCheckpointBarrier checkpoint barrier广播,执行快照checkpointState()

Snapshot操作

  1. CheckpointingOperation.executeCheckpointing 会对当前StreamTask相关联的所有Operator挨个执行CheckpointingOperation.checkpointStreamOperator

  2. AbstractStreamOperator.snapshotState会对以下几种类型的状态都执行快照

    1. 用户算子状态
    2. managed keyedState
    3. raw keystate snapshotContext.getKeyedStateStreamFuture()
    4. managed operator state
    5. raw operator state snapshotContext.getOperatorStateStreamFuture()
  3. AbstractUdfStreamOperator.snapshotState调用StreamingFunctionUtils.snapshotFunctionState对UserFunction执行快照操作

  4. StreamingFunctionUtils.trySnapshotFunctionState中会实际调用自定义的snapshotState方法

  5. 自定义的方法只有实现了CheckpointedFunction 或者 ListCheckpointed 才能执行snapshot操作

Task发送Checkpoint Ack

Task在完成当前的快照之后会向JobMaster的CheckpointCoordinator回复一条Ack消息

  1. StreamTask.executeCheckpointing 调用当前Task的所有Operator的snapshotState方法

  2. 执行异步操作AsyncCheckpointRunnable

  3. 等待snapshot结果

  4. 完成之后,调用reportCompletedSnapshotStates,获取 TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();

  5. taskStateManager.reportTaskStateSnapshots

  6. 存储localStateStore.storeLocalState存储localstate

  7. RpcCheckpointResponder.acknowledgeCheckpoint通过Rpc向CheckpointCoordinator发送ack消息表示本次checkpoint操作完成

CheckpointCoordinator响应Ack消息

  1. CheckpointCoordinatorGateway的实现类为JobMaster

  2. task在完成checkpoint之后通过rpc方式调用JobMaster.acknowledgeCheckpoint发送ack

  3. JobMaster调用SchedulerNG.acknowledgeCheckpoint

  4. ExecutionGraph.getCheckpointCoordinator()获取对应的CheckpointCoordinator

  5. 将CheckpointCoordinator对这条消息的处理放入到一个线程池中进行处理

    1
    2
    3
    4
    5
    6
    7
    ioExecutor.execute(() -> {
    try {
    checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocationInfo);
    } catch (Throwable t) {
    log.warn("Error while processing checkpoint acknowledgement message", t);
    }
    });
  6. CheckpointCoordinator会对ack消息做以下校验
    PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);

    1. checkpoint存在并且没被丢弃,执行正常的处理逻辑checkpoint.acknowledgeTask
    2. 存在但是被丢弃,抛出异常
    3. 不存在,执行丢弃discardSubtaskState()
  7. PendingCheckpoint分别保存了notYetAcknowledgedTasksacknowledgedTasks

  8. PendingCheckpoint.acknowledgeTask处理收到的ack消息,上报checkpoint统计消息。并且更新内部状态

    1. notYetAcknowledgedTasks未收到ack的所有sub task
    2. acknowledgedTasks已收到的task
    3. operatorStatesOperator状态
  9. PendingCheckpoint.acknowledgeTask根据checkpointId返回三种状态,SUCCESS DUPLICATE UNKNOWN DISCARDED

当ack为SUCCESS

1
2
3
4
5
// 收到全部的ack消息
if (checkpoint.isFullyAcknowledged()) {
// 执行完成操作
completePendingCheckpoint(checkpoint);
}
  1. CompletedCheckpoint completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();

  2. checkpoint在切换状态时

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    try {
    // 生成savepoint,包含了本次checkpoint的所有状态
    // write out the metadata
    final Savepoint savepoint = new SavepointV2(checkpointId, operatorStates.values(), masterState);
    final CompletedCheckpointStorageLocation finalizedLocation;
    // 写到statebackend
    try (CheckpointMetadataOutputStream out = targetLocation.createMetadataOutputStream()) {
    Checkpoints.storeCheckpointMetadata(savepoint, out);
    finalizedLocation = out.closeAndFinalizeCheckpoint();
    }

    CompletedCheckpoint completed = new CompletedCheckpoint();
    ...
    // 切换状态
    // mark this pending checkpoint as disposed, but do NOT drop the state
    dispose(false);
    return completed;
    }
    ```
    3. 调用`triggerQueuedRequests()`来触发下一次的checkpoint
    4. `dropSubsumedCheckpoints(checkpointId);` 丢弃之前的pending状态的checkpoint

    5. Execution.TaskManagerGateway.notifyCheckpointComplete通知所有Task本次checkpoint完成

    ```
    for (ExecutionVertex ev : tasksToCommitTo) {
    Execution ee = ev.getCurrentExecutionAttempt();
    if (ee != null) {
    ee.notifyCheckpointComplete(checkpointId, timestamp);
    }
    }

监听Checkpoint完成

当operator收到checkpoint完成事件之后,就可以做一些额外操作,例如:

  1. Kafka提交offset
  2. TwoPhaseCommitSinkFunction

评论