Flink对于JobManager和TaskManager之间通信采用Akka remote实现, 本文将以tm和jm之间的一次远程通信为示例进行讲解。为了读者更好地理解, 本文首先介绍AKKa相关基本知识。
Akka基础学习
本文首先以hello world为例进行讲解:
1 | public class hello { |
输出为:
1 | tell1 |
- 在使用actorSelection函数的时候, 我们需要了解一个关键字Identify, 它被定义为每个角色都知道其含义的消息, 当接收到Identify时, 角色自动回复ActorIdentity, 其中包含着对地址的角色。actorSelection可以获取该角色的引用, 这样就可以首先和该角色通信了。
- ask和tell的区别是, ask希望对方角色返回结果, 而tell不需要返回结果。
Akka RPC通信过程
AkkaRpcService和RpcServer
在正式介绍之前, 先介绍AkkaRpcService, 作为Flink Akka通信核心类, 它包含了akka通信系统ActorSystem, 任何角色产生, 都会调用RpcService.startServer
1 | public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) { |
可以看到:
- 角色接收到信息后, 处理的类为FencedAkkaRpcActor/AkkaRpcActor。
- 调用函数时的接口handler也分为FencedAkkaInvocationHandler/AkkaInvocationHandler。每次代理GateWay请求时, 首先会调用AkkaInvocationHandler.invoke()类。
我们再来介绍下: RpcServer, RpcServer作为任何一个请求终端, 每次都将从相同的AkkaRpcService中参数, 实际封装了ActorRef来进行内部数据传输。
TM向JM注册
我们以TaskManager启动后, 会去主动连接JobMaster的ResourceManager, 以它们之间的通信为例进行讲解。YarnTaskExecutorRunner在运行主函数时, 会去调用TaskExecutor.connectToResourceManager()主动连接JobManager:
1 | private void connectToResourceManager() { |
建立好TaskExecutorToResourceManagerConnection之后,看下是如何操作的:
1 | public void start() { |
该函数主要做了如下事情:
- 通过createNewRegistration构建ResourceManagerRegistration对象, 并没有做其他的事。同时在父类RetryingRegistration中定义了一个CompletableFuture, 当complete时候(成功向jm注册后), 会去调用TaskExecutorToResourceManagerConnection.onRegistrationSuccess(), 之后会详细介绍。
- 通过AkkaRpcService.startRegistration真正开始注册TaskManager。接下来看下如何向JobManager的ResourceManager注册该TaskManager。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public void startRegistration() {
try {
// trigger resolution of the resource manager address to a callable gateway
final CompletableFuture<G> resourceManagerFuture;
if (FencedRpcGateway.class.isAssignableFrom(targetType)) {
//返回的就是clazz对象ResourceManagerGateway的代理
resourceManagerFuture = (CompletableFuture<G>) rpcService.connect(
targetAddress,
fencingToken,
targetType.asSubclass(FencedRpcGateway.class));
} else {
resourceManagerFuture = rpcService.connect(targetAddress, targetType);
}
// upon success, start the registration attempts
CompletableFuture<Void> resourceManagerAcceptFuture = resourceManagerFuture.thenAcceptAsync(
(G result) -> {
log.info("Resolved {} address, beginning registration", targetName);
// 真正想远程JobManager进行注册
register(result, 1, initialRegistrationTimeout);
},
rpcService.getExecutor());
}
}
主要逻辑如下:
- 调用rpcService.connect, 获取代理, 代理中包含指向jobManager的ResourceManager的地址(akka.tcp://flink@jobmanager:port/user/resourcemanager)的角色, 以进行rpc通信。
- 当产生代理后, 再调用register进行真正RPC远程向JobManager的ResourceManager注册。
构建远程RPC调用的角色及代理
我们首先来看下是如何获取代理的:
1 | private <C extends RpcGateway> CompletableFuture<C> connectInternal( |
连接请求也比较简单, 每次RPC调用时, 都使用ask:
- 向ResourceManager发送Identify, 远程响应并发返回对应路径的角色
- 向ResourceManager发送RemoteHandshakeMessage, 再次和远程确认。
- 以上两个RPC调用完成后, 构建针对ResourceManagerGateway的代理, 其中handler为FencedAkkaInvocationHandler(rpcEndpoint=ActorRef)。
TM向JM远程RPC注册
当获取到通信的ActorRef后, 调用register进行注册:
1 | private void register(final G gateway, final int attempt, final long timeoutMillis) { |
可以看到:
- register中真正向ResourceManager通信的是invokeRegistration()。
- 将completionFuture置为完成, 那么将触发之前定义的TaskExecutorToResourceManagerConnection.onRegistrationSuccess()。
我们先看invokeRegistration的实现逻辑:1
2
3
4
5
6
7
8
9
10
11protected CompletableFuture<RegistrationResponse> invokeRegistration(
ResourceManagerGateway resourceManager, ResourceManagerId fencingToken, long timeoutMillis) throws Exception {
Time timeout = Time.milliseconds(timeoutMillis);
//resourceManager=FencedAkkaInvocationHandler,返回TaskExecutorRegistrationSuccess, 这里会跳到JobManager
return resourceManager.registerTaskExecutor(
taskExecutorAddress,
resourceID,
dataPort,
hardwareDescription,
timeout);
}
resourceManager.registerTaskExecutor将首先跑到FencedAkkaInvocationHandler.invoke, 最终真正发送rpc请求的是invokeRpc:
1 | private Object invokeRpc(Method method, Object[] args) throws Exception { |
最终通过ask将请求发送出去, 其中包括函数名, 参数等信息。
远程ResourceManager接收到请求
我们可以看到在构建YarnResourceManager时, resourceManagerEndpointId为resourcemanager
, 最终其ActorRef对应的直接地址为: akka.tcp://flink@jobmanager:port/user/resourcemanager, 印证了之前TM向JM注册时, JM通信的终端就是该类。在ActorRef构建过程中, 知道JM接受处理类为AkkaRpcActor.onReceive(FencedAkkaRpcActor父类), 继续调用的是handleRpcMessage()
1 | protected void handleRpcMessage(Object message) { |
flink针对不同类型的消息, 使用调用的函数, 很显然, 这里调用handleRpcInvocation:
1 | private void handleRpcInvocation(RpcInvocation rpcInvocation) { |
最终调用的是result = rpcMethod.invoke()来处理TM发送的请求。实际调用ResourceManager.registerTaskExecutor()->registerTaskExecutorInternal()
1 | private RegistrationResponse registerTaskExecutorInternal( |
ResourceManager注册主要做了如下事情:
- 从taskExecutors中删除旧的通信管道。
- 跑到YarnResourceManager.workerStarted()里面, 从JM端根据获取当初yarn分配的Container。
- 向taskExecutors添加新产生的管道WorkerRegistration。管道里包含TaskExecutorGateway的代理, 其中handler为AkkaInvocationHandler, 且包含连接JobManager的ActorRef。
- JM对连接的TM添加监控。然后响应TM。
TM接收到JM响应
当TM接收到JM响应, 就会回调之前定义的TaskExecutorToResourceManagerConnection.onRegistrationSuccess()。
1 | private void establishResourceManagerConnection( |
TM当收到Response, 回调函数做了如下事情:
- 通过resourceManagerGateway.sendSlotReport向YarnResourceManager汇报当前TM可用slot, 可用slot都将保存在 JobManager ResourceManager.freeSlots里面。
- 开始向ResourceManager上报心跳。
总结
可以看到, Akka通信最底层依靠的是Patterns.ask来完成, 整个通信流程也是比较清晰的。