Flink源码分析-State

Flink中的容错,一致性语义都是靠State来实现的。

State需要结合Checkpoint,Snapshot才能发挥作用。

State 可以按照维度进行划分:

类型:

  1. Keyed State
  2. Operator

数据组织格式:

  1. Managed (List Value Map)
  2. Raw (用户自定义的格式)

State Backend是用于存储State的, 当前有三种存储:

  1. MemoryStateBackend (仅在测试时用)
  2. FsStateBackend
  3. RocksDBStateBackend

获取一个状态:

当从StateBackend获取一个状态时,会首先根据状态名称来判断其是否存在,如果存在就取出来,不存在就创建新的。

当Checkpoint触发算子Snapshot的时候,会保存算子的State到StateBackend。

  1. CheckpointingOperation.executeCheckpointing 当一个task开始快照的时候,会调用这个方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public void executeCheckpointing() throws Exception {
    ...
    try {
    // 对当前任务关联的所有算子执行快照
    for (StreamOperator<?> op : allOperators) {
    // 这个方法会生成每种状态的快照Future,例如:keyedStateManagedFuture operatorStateManagedFuture
    // 生成写入StateBackend的RunnableFutre,都保存在operatorSnapshotsInProgress中
    checkpointStreamOperator(op);
    }

    // 真正的将状态写入到StateBackend中
    AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
    owner,
    operatorSnapshotsInProgress, // 非常关键的参数
    checkpointMetaData,
    checkpointMetrics,
    startAsyncPartNano);

    owner.cancelables.registerCloseable(asyncCheckpointRunnable);
    // 异步执行写入操作
    owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);
    ...
    } catch (Exception ex) {
    ...

    }
    }
  2. AbstractStreamOperator.snapshotState

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    	// factory生成真正执行保存State的OutputStream
    public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
    CheckpointStreamFactory factory) throws Exception {

    OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
    // 保存有checkpointId,timestamp
    StateSnapshotContextSynchronousImpl snapshotContext = ...;

    try {
    // 调用 AbstractUdfStreamOperator.snapshotState
    // 会调用userFunction的snapshotState()
    // 如果用户实现的是ListCheckpointed, 就会向OperatorStateBackend申请一个PartitionableListState并保存在OperatorStateBackend.registeredOperatorStates中
    snapshotState(snapshotContext);

    // 生成Raw State RunnableFuture,这个future中只有keyedStateCheckpointOutputStream,会在userFunction中执行out.write
    snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
    snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());

    if (null != operatorStateBackend) {
    // (DefaultOperatorStateBackendSnapshotStrategy) operatorStateBackend.snapshot会生成一个将managed state写入StateBackend的runnable
    // 如果为同步快照,此方法就会变成一个同步操作,直接执行AsyncSnapshotCallable
    snapshotInProgress.setOperatorStateManagedFuture(
    operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
    }

    if (null != keyedStateBackend) {
    snapshotInProgress.setKeyedStateManagedFuture(
    keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
    }
    } catch (Exception snapshotException) {
    ...
    }

    return snapshotInProgress;
    }
  3. StreamTask.asyncOperationsThreadPool执行AsyncCheckpointRunnable.run来异步的将状态持久化到StateBackend,并且向JobMaster发送Ack,表示checkpoint操作完成

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    	FileSystemSafetyNet.initializeSafetyNetForThread();
    try {
    // 快照结果,并且发送给JobMaster
    TaskStateSnapshot jobManagerTaskOperatorSubtaskStates = ...;
    TaskStateSnapshot localTaskOperatorSubtaskStates = ...;

    for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {

    OperatorID operatorID = entry.getKey();
    // 异步快照Future集合
    OperatorSnapshotFutures snapshotInProgress = entry.getValue();

    // 判断Future是否已经(同步执行,与userFunction同一个线程)执行过,
    // 如果没有就在当前线程执行, 相当于异步执行快照
    // OperatorSnapshotFinalizer.jobManagerOwnedState 保存所有类型状态的结果
    OperatorSnapshotFinalizer finalizedSnapshots =
    new OperatorSnapshotFinalizer(snapshotInProgress);

    jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
    operatorID,
    finalizedSnapshots.getJobManagerOwnedState());

    localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
    operatorID,
    finalizedSnapshots.getTaskLocalState());
    }

    ...

    // 快照完成
    if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,
    CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {
    // 向JobMaster发送快照结果,
    reportCompletedSnapshotStates(
    jobManagerTaskOperatorSubtaskStates,
    localTaskOperatorSubtaskStates,
    asyncDurationMillis);

    } else {
    ...
    }
    ...
    }
  4. DefaultOperatorStateBackendSnapshotStrategy.snapshotAbstractStreamOperator.snapshotState调用

    1. 获取用户注册的算子状态和广播状态
    2. 生成AsyncSnapshotCallable instance
    3. 如果 asynchronousSnapshots == true,直接执行 AsyncSnapshotCallable
    4. 返回 FutureTask
  5. AsyncCheckpointRunnable.run在初始化OperatorSnapshotFinalizer的时候,会挨个调用OperatorSnapshotFutures中的RunnableFuture

    • keyedStateManagedFuture关联的是HeapSnapshotStrategy.AsyncSnapshotCallable
    • operatorStateManagedFuture关联的是DefaultOperatorStateBackendSnapshotStrategy.AsyncSnapshotCallable
  6. 算子状态持久化过程DefaultOperatorStateBackendSnapshotStrategy.AsyncSnapshotCallable

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36

    // 创建OutputStream, 输入到文件或者内存
    CheckpointStreamFactory.CheckpointStateOutputStream localOut = streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
    snapshotCloseableRegistry.registerCloseable(localOut);

    // get the registered operator state infos ...
    List<StateMetaInfoSnapshot> operatorMetaInfoSnapshots = ...;
    // ... get the registered broadcast operator state infos ...
    List<StateMetaInfoSnapshot> broadcastMetaInfoSnapshots = ...;
    // ... write them all in the checkpoint stream ...
    DataOutputView dov = new DataOutputViewStreamWrapper(localOut);

    OperatorBackendSerializationProxy backendSerializationProxy =
    new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);
    // 实际的写入操作
    backendSerializationProxy.write(dov);

    // we put BOTH normal and broadcast state metadata here
    final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
    new HashMap<>(initialMapCapacity);

    // ... and, finally, create the state handle.
    OperatorStateHandle retValue = null;

    if (snapshotCloseableRegistry.unregisterCloseable(localOut)) {
    // 关闭流并执行flush
    StreamStateHandle stateHandle = localOut.closeAndGetHandle();

    if (stateHandle != null) {
    retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle);
    }
    // 会发送给JobMaster
    return SnapshotResult.of(retValue);
    } else {
    throw new IOException("Stream was already unregistered.");
    }

评论