Flink源码分析-集群部署
分析Flink程序的启动过程,有助于理解和把握Flink是如何启动程序,集群和单机运行的区别,当出错时如何恢复重启,如何分配资源。
Demo
先看一个简单的stream demo
1 | public static void main(String[] args) throws Exception { |
执行环境
Flink程序有两种执行环境:
集群
通过Client将程序提交到其它集群执行本地
通过Flink程序的main
方法执行,这种方式适用于本地开发调试。
注意:Flink程序通过集群或者本地方式运行时,都会调用
main
方法。但是集群中调用和本地调用的方式不一样。
关键概念
组件
Flink Cluster
由一个Flink Master
和多个Task Manager
组成,这几个组件既可以是单独的进程,也可以都在一个进程。Flink集群有两种模式:
session
可以同时执行多个Flink Job
,集群的生命周期不依赖Job的生命周期,这种状态的集群被称为Flink Session Cluster
。job
集群只执行一个Flink Job
,当Job状态结束之后,集群也会结束,这种状态的集群被称为Flink Session Application
。
Flink Job Master
Flink集群的Master节点,由三部分组成Dispatcher
、 ResourceManager
、 JobManager
组成。Master节点中Dispatcher
、 ResourceManager
的实例只会有一个,JobManager
可能会有多个实例。
集群运行时的几个关键类
ExecutionEnvironment
执行环境是Flink程序的运行上下文。批处理的执行环境是:ExecutionEnvironment
; 流处理的执行环境是:StreamExecutionEnvironment
。
Flink程序既能在本地的JVM中运行使用Local...ExecutionEnvironment
;发送到远程运行则使用Remote...ExecutionEnvironment
;
Dispatcher
- 接收任务的提交并且会将jobgraph持久化存储,之后在集群重启的时候使用
- 执行提交的任务
- 集群和所有Job的详细状态都可以通过这个组件获取
ResourceManager
- 保存有所有正在运行中的
JobManager
、TaskManager
的信息,通过心跳来监听它们的状态 TaskManager
在创建时会上报Slot
的信息- 负责给
Job Manager
分配Slot
,TaskManager
在槽使用完成之后回报Slot
可用 - 与DisPatcher运行在同一进程
SlotManager
ResourceManager
关于Slot
的操作都会交给SlotManager
管理- 将所有注册的
Slot
维护成一张视图来处理Slot
的分配和待处理的请求 - 当资源不够时,会向
ResourceManager
请求新的资源。
JobMaster
- master的运行是通过此类运行的,一个job会有一个对应的实例
- 在启动时会将
JobGrapth
转换成对应的ExecutionGraph
SchedulerNG
负责任务的调度、部署、执行SchedulerNG.ExecutionGrapth.CheckpointCoordinator
协调算子和状态的分布式快照。通过发送消息给相关的task来触发快照。- 监测
TaskManager
的状态
ClusterEntrypoint
集群启动的入口
HighAvailabilityServices
高可用服务的集合,这些服务的功能包括:高可用存储、注册表、leader选举、分布式计数器
每个高可用服务都会有一个响应的选举服务
LeaderElectionService
和提取服务LeaderRetrievalService
ResourceManager/JobManager
leader选举服务通过LeaderElectionService
实现,在new ResourceManager/JobManager()
时,也会创建一个相应的LeaderElectionService
, 当Manager启动后会调用选举服务的start(LeaderContender contender)
方法进行通知;leader变更服务通过LeaderRetrievalService
实现,在taskmanager 收到一个新的请求requestSlot()
时,会通过LeaderRetrievalService
来获取jobmaster的地址;
MiniCluster
1 | public void start() throws Exception { |
-
*
- ResourceManager leader election and leader retrieval *
- JobManager leader election and leader retrieval *
- Persistence for checkpoint metadata *
- Registering the latest completed checkpoint(s) *
- Persistence for the BLOB store *
- Registry that marks a job's status *
- Naming of RPC endpoints *