Flink以DAG的方式来执行程序,它会根据用户的代码生成三个Graph,但我认为实际上还有一个Graph,就是用户的程序直接映射出来的。
- Plan ProgramGraph
- StreamGraph
- JobGraph
- ExecutionGraph
JobGraph
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) { StreamNode upStreamVertex = streamGraph.getSourceVertex(edge); StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory(); StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();
return downStreamVertex.getInEdges().size() == 1 && outOperator != null && headOperator != null && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) && (edge.getPartitioner() instanceof ForwardPartitioner) && edge.getShuffleMode() != ShuffleMode.BATCH && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() && streamGraph.isChainingEnabled(); }
|
判断两个算子是否能串联到一起
- 图允许链接
- 下游节点只有一个上游节点
- 有同样的槽共享组
- 上下游并行度一致
- 边的shuffle模式不能为batch
- 边的分区为Forward分区器
- 下游的链接策略为always
- 上游的链接策略为 HEAD 或者 always