Flink源码分析-State
Flink中的容错,一致性语义都是靠State来实现的。
State需要结合Checkpoint,Snapshot才能发挥作用。
State 可以按照维度进行划分:
类型:
- Keyed State
- Operator
数据组织格式:
- Managed (List Value Map)
- Raw (用户自定义的格式)
State Backend是用于存储State的, 当前有三种存储:
- MemoryStateBackend (仅在测试时用)
- FsStateBackend
- RocksDBStateBackend
获取一个状态:
当从StateBackend获取一个状态时,会首先根据状态名称来判断其是否存在,如果存在就取出来,不存在就创建新的。
当Checkpoint触发算子Snapshot的时候,会保存算子的State到StateBackend。
CheckpointingOperation.executeCheckpointing
当一个task开始快照的时候,会调用这个方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27public void executeCheckpointing() throws Exception {
...
try {
// 对当前任务关联的所有算子执行快照
for (StreamOperator<?> op : allOperators) {
// 这个方法会生成每种状态的快照Future,例如:keyedStateManagedFuture operatorStateManagedFuture
// 生成写入StateBackend的RunnableFutre,都保存在operatorSnapshotsInProgress中
checkpointStreamOperator(op);
}
// 真正的将状态写入到StateBackend中
AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
owner,
operatorSnapshotsInProgress, // 非常关键的参数
checkpointMetaData,
checkpointMetrics,
startAsyncPartNano);
owner.cancelables.registerCloseable(asyncCheckpointRunnable);
// 异步执行写入操作
owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);
...
} catch (Exception ex) {
...
}
}AbstractStreamOperator.snapshotState
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35// factory生成真正执行保存State的OutputStream
public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory) throws Exception {
OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
// 保存有checkpointId,timestamp
StateSnapshotContextSynchronousImpl snapshotContext = ...;
try {
// 调用 AbstractUdfStreamOperator.snapshotState
// 会调用userFunction的snapshotState()
// 如果用户实现的是ListCheckpointed, 就会向OperatorStateBackend申请一个PartitionableListState并保存在OperatorStateBackend.registeredOperatorStates中
snapshotState(snapshotContext);
// 生成Raw State RunnableFuture,这个future中只有keyedStateCheckpointOutputStream,会在userFunction中执行out.write
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
if (null != operatorStateBackend) {
// (DefaultOperatorStateBackendSnapshotStrategy) operatorStateBackend.snapshot会生成一个将managed state写入StateBackend的runnable
// 如果为同步快照,此方法就会变成一个同步操作,直接执行AsyncSnapshotCallable
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
if (null != keyedStateBackend) {
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
} catch (Exception snapshotException) {
...
}
return snapshotInProgress;
}StreamTask.asyncOperationsThreadPool
执行AsyncCheckpointRunnable.run
来异步的将状态持久化到StateBackend,并且向JobMaster发送Ack,表示checkpoint操作完成1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43FileSystemSafetyNet.initializeSafetyNetForThread();
try {
// 快照结果,并且发送给JobMaster
TaskStateSnapshot jobManagerTaskOperatorSubtaskStates = ...;
TaskStateSnapshot localTaskOperatorSubtaskStates = ...;
for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {
OperatorID operatorID = entry.getKey();
// 异步快照Future集合
OperatorSnapshotFutures snapshotInProgress = entry.getValue();
// 判断Future是否已经(同步执行,与userFunction同一个线程)执行过,
// 如果没有就在当前线程执行, 相当于异步执行快照
// OperatorSnapshotFinalizer.jobManagerOwnedState 保存所有类型状态的结果
OperatorSnapshotFinalizer finalizedSnapshots =
new OperatorSnapshotFinalizer(snapshotInProgress);
jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
operatorID,
finalizedSnapshots.getJobManagerOwnedState());
localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
operatorID,
finalizedSnapshots.getTaskLocalState());
}
...
// 快照完成
if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,
CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {
// 向JobMaster发送快照结果,
reportCompletedSnapshotStates(
jobManagerTaskOperatorSubtaskStates,
localTaskOperatorSubtaskStates,
asyncDurationMillis);
} else {
...
}
...
}DefaultOperatorStateBackendSnapshotStrategy.snapshot
被AbstractStreamOperator.snapshotState
调用- 获取用户注册的算子状态和广播状态
- 生成
AsyncSnapshotCallable
instance - 如果 asynchronousSnapshots == true,直接执行 AsyncSnapshotCallable
- 返回 FutureTask
AsyncCheckpointRunnable.run
在初始化OperatorSnapshotFinalizer
的时候,会挨个调用OperatorSnapshotFutures
中的RunnableFuture
keyedStateManagedFuture
关联的是HeapSnapshotStrategy.AsyncSnapshotCallable
operatorStateManagedFuture
关联的是DefaultOperatorStateBackendSnapshotStrategy.AsyncSnapshotCallable
算子状态持久化过程
DefaultOperatorStateBackendSnapshotStrategy.AsyncSnapshotCallable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 创建OutputStream, 输入到文件或者内存
CheckpointStreamFactory.CheckpointStateOutputStream localOut = streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
snapshotCloseableRegistry.registerCloseable(localOut);
// get the registered operator state infos ...
List<StateMetaInfoSnapshot> operatorMetaInfoSnapshots = ...;
// ... get the registered broadcast operator state infos ...
List<StateMetaInfoSnapshot> broadcastMetaInfoSnapshots = ...;
// ... write them all in the checkpoint stream ...
DataOutputView dov = new DataOutputViewStreamWrapper(localOut);
OperatorBackendSerializationProxy backendSerializationProxy =
new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);
// 实际的写入操作
backendSerializationProxy.write(dov);
// we put BOTH normal and broadcast state metadata here
final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
new HashMap<>(initialMapCapacity);
// ... and, finally, create the state handle.
OperatorStateHandle retValue = null;
if (snapshotCloseableRegistry.unregisterCloseable(localOut)) {
// 关闭流并执行flush
StreamStateHandle stateHandle = localOut.closeAndGetHandle();
if (stateHandle != null) {
retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle);
}
// 会发送给JobMaster
return SnapshotResult.of(retValue);
} else {
throw new IOException("Stream was already unregistered.");
}