本文将从ExecutionGraph开始向后讲起, ExecutionGraph定义了Job的并发逻辑结构, 作为任务执行的以后一层逻辑结构, 也是最核心数据结构。为了让大家有全局的了解, 先盗一张广为引用的Graph转换图:
具体来说, 本文讲述在JobManager端subTask申请slot以及部署到TaskManager上的过程。
Task分配slot及部署
代码将从ExecutionGraph.scheduleExecutionGraph()开始讲解, 进入:
1 | public void scheduleForExecution() throws JobException { |
其中, scheduleMode分EAGER和LAZY_FROM_SOURCES, EAGER表示立刻去调度部署所有的Task。实际scheduleMode是从JobGraph.getScheduleMode()取值的, 为eager。
我们再进入scheduleEager看是如何调度task的。
1 | private CompletableFuture<Void> scheduleEager(SlotProvider slotProvider, final Time timeout) { |
scheduleEager主要做了两件事:
- 通过allocateResourcesForAll确定每个subTask将要部署的slot。若没有合适的TaskManager, 那么通过yarn去申请TaskManager。
- 当所有的subTask都确定好部署TaskManager的slot后, 通过execution.deploy()将subTask部署上去。
接下来, 将分别围绕这两件事讲解。确定subTask分配的slot
通过getVerticesTopologically()获取所有的ExecutionJobVertex, 然后依次轮询给每个ExecutionJobVertex都分配一个slot, 其中轮询的ExecutionJobVertex是有先后顺序的, 从source开始分配slot, 直到sink。后面可以看到, 上游分配到哪个tm上, 会影响下游的slot分配。 我们进入allocateResourcesForAll看下是如何给一个ExecutionJobVertex所有的subTask分配slot的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public Collection<CompletableFuture<Execution>> allocateResourcesForAll(
SlotProvider resourceProvider, // SlotPool$ProviderAndOwner
boolean queued, //true
LocationPreferenceConstraint locationPreferenceConstraint, //ALL
@Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds,
Time allocationTimeout) {
final ExecutionVertex[] vertices = this.taskVertices;
final CompletableFuture<Execution>[] slots = new CompletableFuture[vertices.length];
for (int i = 0; i < vertices.length; i++) {
final Execution exec = vertices[i].getCurrentExecutionAttempt();
final CompletableFuture<Execution> allocationFuture = exec.allocateAndAssignSlotForExecution(
resourceProvider,
queued,
locationPreferenceConstraint,
allPreviousExecutionGraphAllocationIds,
allocationTimeout);
slots[i] = allocationFuture;
}
return Arrays.asList(slots);
}
每一个ExecutionJobVertex都对应着一批ExecutionVertex(也就是subTask), 可以看到, 这里轮询每个ExecutionVertex进行申请一个slot。
1 | public CompletableFuture<Execution> allocateAndAssignSlotForExecution( |
该函数主要做了两件事情:
- 在calculatePreferredLocations中确定从该subTask对应ExecutionJobVertex的所有上游中找到最合适的上游”偏向位置集合”。
- 通过SlotPool$ProviderAndOwner.allocateSlot继续确定从”偏向位置集合”找到一个共享slot。
我们知道, 多个subTask允许共享slot, 细节后面会详细描述。 那么当前subTask与哪些已经分配的subTask共享slot呢? 下游subTask与哪个上游subTask共享slot呢?
flink会根据subTask上游slot的分配来确定当前slot的分配:
1 | public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnInputs() { |
处理过程如下:
- 若该ExecutionVertex没有上游(例如source), 那么返回为空, 没有”偏好位置集合”, 之后将申请新的slot。
- 若当前ExecutionVertex有属于不同JobVertex多个ExecutionJobVertex的上游, 那么当前sub分配到哪些共享slot的可选路径只能是: 属于同一个JobVertex的上游节点个数最少。上图的话, 就会选择source2的所有subTask作为”偏好位置集合”。我们接下来看第二步, 最终会进入到allocateSharedSlot决定subTask分配到哪些”偏好位置集合”里slot上。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39private CompletableFuture<LogicalSlot> allocateSharedSlot(
SlotRequestId slotRequestId,
ScheduledUnit task,
SlotProfile slotProfile,
boolean allowQueuedScheduling,
Time allocationTimeout) {
// allocate slot with slot sharing
final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent(
//默认都是一个, default对应的那个
task.getSlotSharingGroupId(),
id -> new SlotSharingManager(
id,
this,
providerAndOwner));
final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality;
try {
if (task.getCoLocationConstraint() != null) {
multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot(
task.getCoLocationConstraint(),
multiTaskSlotManager,
slotProfile,
allowQueuedScheduling,
allocationTimeout);
} else {
// 跑到这里
multiTaskSlotLocality = allocateMultiTaskSlot(
task.getJobVertexId(),
multiTaskSlotManager,
slotProfile,
allowQueuedScheduling,
allocationTimeout);
}
}
final SlotSharingManager.SingleTaskSlot leaf = multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot(
slotRequestId,
task.getJobVertexId(),
multiTaskSlotLocality.getLocality());
return leaf.getLogicalSlotFuture();
}
该函数主要做了两件事:
- 通过allocateMultiTaskSlot产生MultiTaskSlotLocality, 里面包含从”偏向位置集合”中选取的部署当前subTask共享的slot。
- 产生SingleTaskSlot, 当前SingleTaskSlot作为MultiTaskSlot的一个子叶子节点。
再继续跟进代码前, 我们需要了解两个变量resolvedRootSlots、unresolvedRootSlots。共享slot都会从这两个变量中获取, 这两个变量为共享组所拥有, 默认共享组为default。
unresolvedRootSlots: 当当前subTask正在确认部署到那个slot中时, 会将该slot保存在unresolvedRootSlots; 当确定好部署到哪个slot时, 会将该信息从unresolvedRootSlots中移除, 并放入resolvedRootSlots中 当我们查找是否有可利用的slot时, 会从这些变量中查找。
我们再进入正题, 看allocateMultiTaskSlot看是如何给subTask分配slot的:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot(
//groupId指的同一个JobVertex的id
AbstractID groupId,
SlotSharingManager slotSharingManager,
SlotProfile slotProfile,
boolean allowQueuedScheduling,
Time allocationTimeout) throws NoResourceAvailableException {
//过滤"偏好位置集合"的位置中不属于相同groupId的位置, 这里主要是为了避免同一个ExecutionJobVertex中不同的SubTask分配到同一个slot中。
// check first whether we have a resolved root slot which we can use
SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = slotSharingManager.getResolvedRootSlot(
groupId,
// LocationPreferenceSchedulingStrategy
schedulingStrategy,
slotProfile);
//从"偏好位置集合"中到合适的slot后, 就直接返回了。
if (multiTaskSlotLocality != null && multiTaskSlotLocality.getLocality() == Locality.LOCAL) {
return multiTaskSlotLocality;
}
......
if (allowQueuedScheduling) {
//在unresolvedRootSlots中查找不属于同一个JobVertex的slot
SlotSharingManager.MultiTaskSlot multiTaskSlotFuture = slotSharingManager.getUnresolvedRootSlot(groupId); // 为null
if (multiTaskSlotFuture == null) {
//没有找到合适的可利用的的slot, 那么将去向ResurceNameger申请新的TaskManger, 这是最后一步
final CompletableFuture<AllocatedSlot> futureSlot = requestNewAllocatedSlot(
allocatedSlotRequestId,
slotProfile.getResourceProfile(),
allocationTimeout); //300s
//将新产生的futureSlot, 放入resolvedRootSlots中, 这样之后申请slot时, 该slot可以被共享。
multiTaskSlotFuture = slotSharingManager.createRootSlot(
multiTaskSlotRequestId,
futureSlot,
allocatedSlotRequestId);
futureSlot.whenComplete(
(AllocatedSlot allocatedSlot, Throwable throwable) -> {
final SlotSharingManager.TaskSlot taskSlot = slotSharingManager.getTaskSlot(multiTaskSlotRequestId);
if (taskSlot != null) {
// still valid
if (!(taskSlot instanceof SlotSharingManager.MultiTaskSlot) || throwable != null) {
taskSlot.release(throwable);
} else {
if (!allocatedSlot.tryAssignPayload(((SlotSharingManager.MultiTaskSlot) taskSlot))) {
taskSlot.release(new FlinkException("Could not assign payload to allocated slot " +
allocatedSlot.getAllocationId() + '.'));
}
}
}
});
}
return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlotFuture, Locality.UNKNOWN);
}
}
该函数主要逻辑如下:
- 从resolvedRootSlots、unresolvedRootSlots中查找是否有可共享的slot。
- 若没有, 向ResourceManager申请TaskManager以获取slot。
- 将申请的slot信息也存放入unresolvedRootSlots中, 等成功申请后再存放入resolvedRootSlots。
我们再接着看是如何向ResourceManager申请TaskManager的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24private CompletableFuture<AllocatedSlot> requestNewAllocatedSlot(
SlotRequestId slotRequestId,
ResourceProfile resourceProfile,
Time allocationTimeout) {
final PendingRequest pendingRequest = new PendingRequest(
slotRequestId,
resourceProfile);
// register request timeout
FutureUtils //30s超时
.orTimeout(pendingRequest.getAllocatedSlotFuture(), allocationTimeout.toMilliseconds(), TimeUnit.MILLISECONDS)
.whenCompleteAsync( //当结束完成时需要做的事情
(AllocatedSlot ignored, Throwable throwable) -> {
if (throwable instanceof TimeoutException) {
timeoutPendingSlotRequest(slotRequestId);
}
},
getMainThreadExecutor());
if (resourceManagerGateway == null) { // 为null
stashRequestWaitingForResourceManager(pendingRequest); // 会跑到这里
} else {
requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
}
return pendingRequest.getAllocatedSlotFuture();
}
可以看到:
- 首先查看resourceManagerGateway是否连接上, 若没有连接上, 将请求暂时缓存起来, 待连接上之后再申请。
- 若已经初始化之后, 会去向ResourceManager申请TaskManager。
缓存申请Slot的请求
大致思路是先缓存申请slot的请求, 直到flink ResourceManager注册完成后, 再去申请, 我们看下整体细节。首先去查看哪里开始对resourceManagerGateway进行初始化的。 首先回到最开始准备执行ExecutionGraph的时候:1
2
3
4
5
6
7
8
9
10
11
12private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
//这里会开始尝试连接rm, 会去和resourceManager建立联系
startJobMasterServices();
resetAndScheduleExecutionGraph();
}
private void startJobMasterServices() throws Exception {
slotPool.start(getFencingToken(), getAddress());
// 这里比较重要,会进去启动申请tm的请求
reconnectToResourceManager(new FlinkException("Starting JobMaster component."));
// StandaloneLeaderRetrievalService
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
}
在reconnectToResourceManager中, 会去尝试初始化, 调用connectToResourceManager:
1 | private void connectToResourceManager() { |
在ResourceManagerConnection中定义了onRegistrationSuccess, 会去调用establishResourceManagerConnection()函数, 我们进入resourceManagerConnection.start()看下如何建立注册的。
1 | public void start() { |
在createNewRegistration中, 新建注册:
1 | private RetryingRegistration<F, G, S> createNewRegistration() { |
当注册完成并且没有抛出异常时, 说明注册完成了, 则调用之前的ResourceManagerConnection.onRegistrationSuccess()进行连接, 最终会进去SlotPool.connectToResourceManager()
1 | public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) { |
当完成flink ResourceManager注册、连接后, 我们会逐个申请之前被挂起的请求。然后开始走之后描述的正常申请slot流程。
向ResourceManager申请slot
从requestSlotFromResourceManager()中最终会进入registerSlotRequest
1 | public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException { |
internalRequestSlot做了如下逻辑:
- 通过findMatchingSlot检查是否有现成可用的slot, 其中freeSlots包含着availiable slot. 比如在每当有新的TaskManager向JobManager注册时, 就会调用SlotManager.registerSlotRequest(), 在freeSlots中注册该TM可用的slot。若有可用slot时候, 就会调用allocateSlot进行分配。
- 若没有可用空闲slot, 通过allocateResource申请TM, 最终会调用YarnResourceManager.requestYarnContainer进行申请。
我们再分别以这两种情况继续介绍。JobManager端有某个TM注册的可用slot
若JM端有某个TM注册的可用slot, 那么就会进入allocateSlot来将这个slot分配给这个SubTask:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) {
TaskExecutorConnection taskExecutorConnection = taskManagerSlot.getTaskManagerConnection();
TaskExecutorGateway gateway = taskExecutorConnection.getTaskExecutorGateway();
final CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>();
final AllocationID allocationId = pendingSlotRequest.getAllocationId();
final SlotID slotId = taskManagerSlot.getSlotId();
final InstanceID instanceID = taskManagerSlot.getInstanceId();
taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);
pendingSlotRequest.setRequestFuture(completableFuture);
TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID);
// 既然这个TM上报的slot, 那么这个TM一定已经有注册信息了
if (taskManagerRegistration == null) {
throw new IllegalStateException("Could not find a registered task manager for instance id " +
instanceID + '.');
}
taskManagerRegistration.markUsed();
去向TM通信, 告诉TM这个slot已经被请求了
// RPC call to the task manager
CompletableFuture<Acknowledge> requestFuture = gateway.requestSlot(
slotId,
pendingSlotRequest.getJobId(),
allocationId,
pendingSlotRequest.getTargetAddress(),
resourceManagerId,
taskManagerRequestTimeout);
requestFuture.whenComplete(
(Acknowledge acknowledge, Throwable throwable) -> {
if (acknowledge != null) {
completableFuture.complete(acknowledge);
}
});
completableFuture.whenCompleteAsync(
(Acknowledge acknowledge, Throwable throwable) -> {
try { //去更新本地slot状态, 从可用空闲slot中删掉
if (acknowledge != null) {
updateSlot(slotId, allocationId, pendingSlotRequest.getJobId());
}
} catch (Exception e) {
LOG.error("Error while completing the slot allocation.", e);
}
},
mainThreadExecutor);
}
JM请求某个slot逻辑也比较简单:
- JM直接告诉slot对应TM, 这个slot将被申请
- JM修改这个slot的状态, 并且从本地可用slot中删掉。然后等待subTask被部署到这个TM的slot上
我们看下第一步JM是怎么告诉TM这个slot被申请的, gateway.requestSlot直接通过RPC(通信逻辑参考)直接向TM的TaskExecutor.requestSlot去了, 我们看下TM是如何做处理的:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29/**
* Add the given job to be monitored. This means that the service tries to detect leaders for
* this job and then tries to establish a connection to it.
*/
public CompletableFuture<Acknowledge> requestSlot(
final SlotID slotId,
final JobID jobId,
final AllocationID allocationId,
final String targetAddress,
final ResourceManagerId resourceManagerId,
final Time timeout) {
log.info("Receive slot request {} for job {} from resource manager with leader id {}.",
allocationId, jobId, resourceManagerId);
try {
if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, taskManagerConfiguration.getTimeout())) {
log.info("Allocated slot for {}.", allocationId);
}
}
if (jobManagerTable.contains(jobId)) {
offerSlotsToJobManager(jobId);
} else {
try {
jobLeaderService.addJob(jobId, targetAddress); // 跑进去
}
}
}
return CompletableFuture.completedFuture(Acknowledge.get());
}
主要逻辑如下:
- TM接收到JM的请求后, TM首先检查这个slot是否是空闲的, 若空闲的话, 就开始调用taskSlotTable.allocateSlot(), 将这个slot置为已分配。
- TM调用jobLeaderService.addJob将这个Job监控起来(每当有新的Job请求slot, 就会去检测job的leader, 并去和这个job leader建立链接),最终调用JobManagerLeaderListener.notifyLeaderAddress()->JobManagerRegisteredRpcConnection.start():
1
2
3
4
5
6
7
8
9
10
11public void start() {
checkState(!closed, "The RPC connection is already closed");
checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started");
final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();
if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
newRegistration.startRegistration();
} else {
// concurrent start operation
newRegistration.cancel();
}
}
这里是不是有点熟悉, 可以参考下这里, startRegistration主要是向JM发送申请成功通知, TM成功后回调JobManagerRegisteredRpcConnection.onRegistrationSuccess, 最终调用TaskExecutor.establishJobManagerConnection:
1 | private void establishJobManagerConnection(JobID jobId, final JobMasterGateway jobMasterGateway, JMTMRegistrationSuccess registrationSuccess) { |
主要做了如下事情:
- 检查这个job是否已经在TM端注册了, 若注册了, 那么就直接返回
- 否则建立job->JobManagerConnection, 将映射关系放入TaskExecutor的jobManagerTable中, 然后监控这个job master。
- 调用offerSlotsToJobManager, 告诉JM, 分配给Task这个slot。
JobManager端没有TM注册的可用slot
若没有可用slot的话, 那么就只能去申请TM, 申请的TM会上报可用slot, 然后再向这个TM申请部署SubTask, 此时就回到了有可用slot的情况了。我们看下是如何申请TM的。
1 | // //resource 是当前申请的container情况,比如<memory:6552, vCores:4> |
申请container成功
当向yarn成功申请到container之后, 会回调YarnResourceManager.onContainersAllocated通知jobManager。
1 | public void onContainersAllocated(List<Container> containers) { |
回调函数主要做了如下逻辑:
- 确定启动taskManager的命令。
- 通过yarn启动taskManager。
我们来放一张整体JobManager端分配Slot的流程图:
部署subTask到对应的slot
当确定好subTask部署到一个TaskManager的slot上之后, 在scheduleEager中就开始调用Execution.deploy()进行部署。
1 | public void deploy() throws JobException { |
可以看到:
- 首先生成TaskDeploymentDescriptor, 包含部署subTask的所有信息。
- 调用taskManagerGateway.submitTask(deployment, rpcTimeout)进行部署subTask, JM接收RPC可参考:link原理-Akka通信原理 。
TaskManager端通过TaskExecutor.subTask()接受到JobManager发出的部署SubTask的申请, 这样就完成SubTask部署了。
SlotSharingGroup及共享slot
Flink 允许相同SlotSharingGroup的subTask共享同一个slot, 好处主要有俩:
- A Flink cluster needs exactly as many task slots as the highest parallelism used in the job. No need to calculate how many tasks (with varying parallelism) a program contains in total.
- It is easier to get better resource utilization. Without slot sharing, the non-intensive source/map() subtasks would block as many resources as the resource intensive window subtasks. With slot sharing, increasing the base parallelism in our example from two to six yields full utilization of the slotted resources, while making sure that the heavy subtasks are fairly distributed among the TaskManagers.
默认情况下, SubTask使用相同的slot共享组: Default, task共享slot过程可以参考:如何共享slot
这里将阐述SlotSharingGroup是如何生成并起作用的:
在JobGraph产生的时候调用setSlotSharingAndCoLocation()函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14private void setSlotSharingAndCoLocation() {
final HashMap<String, SlotSharingGroup> slotSharingGroups = new HashMap<>();
for (Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
final StreamNode node = streamGraph.getStreamNode(entry.getKey());
final JobVertex vertex = entry.getValue();// slotSharingGroupKey为默认值default
final String slotSharingGroupKey = node.getSlotSharingGroup();
final SlotSharingGroup sharingGroup;
if (slotSharingGroupKey != null) {
// 可以看到, 所有的task的group都为default, 都将放入到同一个SlotSharingGroup中
sharingGroup = slotSharingGroups.computeIfAbsent(slotSharingGroupKey, (k) -> new SlotSharingGroup());
vertex.setSlotSharingGroup(sharingGroup);
} else {
sharingGroup = null;
}
这样, 所有的JobVertex都引用了同一个SlotSharingGroup。 而
1 | public void setSlotSharingGroup(SlotSharingGroup grp) { |
每个共享组的id都是相同的。
MultiTaskSlot及SingleTaskSlot
MultiTaskSlot及SingleTaskSlot都继承TaskSlot, 每当subTask申请到一个未被共享的slot时, 就会产生一个MultiTaskSlot, 它代表着一个TM上的slot, 管理着该slot被共享的情况。 实际分配给每个subTask时, 会单独产生一个SingleTaskSlot, 然后每次被MultiTaskSlot管理着。之后若共享slot时, 分配到的都是同一个MultiTaskSlot, 不同的是每次每次分配都产生新的SingleTaskSlot。
参考文档
https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html
https://ci.apache.org/projects/flink/flink-docs-release-1.8/internals/job_scheduling.html
http://wuchong.me/blog/2016/05/03/flink-internals-overview/
http://wuchong.me/blog/2016/05/09/flink-internals-understanding-execution-resources/