Flink源码分析-集群部署

分析Flink程序的启动过程,有助于理解和把握Flink是如何启动程序,集群和单机运行的区别,当出错时如何恢复重启,如何分配资源。

Demo

先看一个简单的stream demo

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) throws Exception {
// 获取执行环境,如果是在本地模式下被调用,返回的是LocalStreamEnvironment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 构建DAG,DAG是由多个 Operator(Transformation(UserFunction)) 组成
DataStream<Integer> source = env.fromElements(1, 2, 3, 4, 5);

source.map((MapFunction<Integer, Integer>) value -> value).print();
// 执行
env.execute();
}

执行环境

Flink程序有两种执行环境:

  1. 集群
    通过Client将程序提交到其它集群执行

  2. 本地
    通过Flink程序的main方法执行,这种方式适用于本地开发调试。

注意:Flink程序通过集群或者本地方式运行时,都会调用main方法。但是集群中调用和本地调用的方式不一样。

关键概念

组件

Flink Cluster
由一个Flink Master和多个Task Manager组成,这几个组件既可以是单独的进程,也可以都在一个进程。Flink集群有两种模式:

  1. session 可以同时执行多个Flink Job,集群的生命周期不依赖Job的生命周期,这种状态的集群被称为Flink Session Cluster
  2. job 集群只执行一个Flink Job,当Job状态结束之后,集群也会结束,这种状态的集群被称为Flink Session Application

Flink Job Master
Flink集群的Master节点,由三部分组成DispatcherResourceManagerJobManager组成。Master节点中DispatcherResourceManager的实例只会有一个,JobManager可能会有多个实例。

集群运行时的几个关键类

ExecutionEnvironment

执行环境是Flink程序的运行上下文。批处理的执行环境是:ExecutionEnvironment; 流处理的执行环境是:StreamExecutionEnvironment
Flink程序既能在本地的JVM中运行使用Local...ExecutionEnvironment;发送到远程运行则使用Remote...ExecutionEnvironment;

Dispatcher

  1. 接收任务的提交并且会将jobgraph持久化存储,之后在集群重启的时候使用
  2. 执行提交的任务
  3. 集群和所有Job的详细状态都可以通过这个组件获取

ResourceManager

  1. 保存有所有正在运行中的JobManagerTaskManager的信息,通过心跳来监听它们的状态
  2. TaskManager在创建时会上报Slot的信息
  3. 负责给Job Manager分配SlotTaskManager在槽使用完成之后回报Slot可用
  4. 与DisPatcher运行在同一进程

SlotManager

  1. ResourceManager关于Slot的操作都会交给SlotManager管理
  2. 将所有注册的Slot维护成一张视图来处理Slot的分配和待处理的请求
  3. 当资源不够时,会向ResourceManager请求新的资源。

JobMaster

  1. master的运行是通过此类运行的,一个job会有一个对应的实例
  2. 在启动时会将JobGrapth转换成对应的ExecutionGraph
  3. SchedulerNG负责任务的调度、部署、执行
  4. SchedulerNG.ExecutionGrapth.CheckpointCoordinator协调算子和状态的分布式快照。通过发送消息给相关的task来触发快照。
  5. 监测TaskManager的状态

ClusterEntrypoint

集群启动的入口

HighAvailabilityServices

  1. 高可用服务的集合,这些服务的功能包括:高可用存储、注册表、leader选举、分布式计数器

  2. 每个高可用服务都会有一个响应的选举服务LeaderElectionService和提取服务LeaderRetrievalService

  3. ResourceManager/JobManager leader选举服务通过LeaderElectionService实现,在new ResourceManager/JobManager()时,也会创建一个相应的LeaderElectionService, 当Manager启动后会调用选举服务的start(LeaderContender contender)方法进行通知;leader变更服务通过LeaderRetrievalService实现,在taskmanager 收到一个新的请求requestSlot()时,会通过LeaderRetrievalService来获取jobmaster的地址;

MiniCluster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
public void start() throws Exception {
synchronized (lock) {
checkState(!running, "MiniCluster is already running");

LOG.info("Starting Flink Mini Cluster");
LOG.debug("Using configuration {}", miniClusterConfiguration);

final Configuration configuration = miniClusterConfiguration.getConfiguration();
final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;

try {
initializeIOFormatClasses(configuration);

LOG.info("Starting Metrics Registry");
// 指标注册表
// TaskManager JobManager在启动时都会创建一个指标注册表
metricRegistry = createMetricRegistry(configuration);

// bring up all the RPC services
LOG.info("Starting RPC Service(s)");

AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);

final RpcServiceFactory dispatcherResourceManagreComponentRpcServiceFactory;

if (useSingleRpcService) {
// we always need the 'commonRpcService' for auxiliary calls
commonRpcService = createRpcService(akkaRpcServiceConfig, false, null);
final CommonRpcServiceFactory commonRpcServiceFactory = new CommonRpcServiceFactory(commonRpcService);
taskManagerRpcServiceFactory = commonRpcServiceFactory;
dispatcherResourceManagreComponentRpcServiceFactory = commonRpcServiceFactory;
} else {
// we always need the 'commonRpcService' for auxiliary calls
commonRpcService = createRpcService(akkaRpcServiceConfig, true, null);

// start a new service per component, possibly with custom bind addresses
final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress();
final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();

dispatcherResourceManagreComponentRpcServiceFactory = new DedicatedRpcServiceFactory(akkaRpcServiceConfig, jobManagerBindAddress);
taskManagerRpcServiceFactory = new DedicatedRpcServiceFactory(akkaRpcServiceConfig, taskManagerBindAddress);
}

RpcService metricQueryServiceRpcService = MetricUtils.startMetricsRpcService(
configuration,
commonRpcService.getAddress());
metricRegistry.startQueryService(metricQueryServiceRpcService, null);

ioExecutor = Executors.newFixedThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("mini-cluster-io"));
haServices = createHighAvailabilityServices(configuration, ioExecutor);

blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();

heartbeatServices = HeartbeatServices.fromConfiguration(configuration);

blobCacheService = new BlobCacheService(
configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())
);

startTaskManagers();

MetricQueryServiceRetriever metricQueryServiceRetriever = new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService());

dispatcherResourceManagerComponents.addAll(createDispatcherResourceManagerComponents(
configuration,
dispatcherResourceManagreComponentRpcServiceFactory,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
metricQueryServiceRetriever,
new ShutDownFatalErrorHandler()
));

resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();
webMonitorLeaderRetrievalService = haServices.getWebMonitorLeaderRetriever();

dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
commonRpcService,
DispatcherGateway.class,
DispatcherId::fromUuid,
20,
Time.milliseconds(20L));
resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
commonRpcService,
ResourceManagerGateway.class,
ResourceManagerId::fromUuid,
20,
Time.milliseconds(20L));
webMonitorLeaderRetriever = new LeaderRetriever();

resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
webMonitorLeaderRetrievalService.start(webMonitorLeaderRetriever);
}
catch (Exception e) {
// cleanup everything
try {
close();
} catch (Exception ee) {
e.addSuppressed(ee);
}
throw e;
}

// create a new termination future
terminationFuture = new CompletableFuture<>();

// now officially mark this as running
running = true;

LOG.info("Flink Mini Cluster started successfully");
}
}
    *
  • ResourceManager leader election and leader retrieval
  • *
  • JobManager leader election and leader retrieval
  • *
  • Persistence for checkpoint metadata
  • *
  • Registering the latest completed checkpoint(s)
  • *
  • Persistence for the BLOB store
  • *
  • Registry that marks a job's status
  • *
  • Naming of RPC endpoints
  • *

评论