ES集群中,master负责实施维护集群元数据的更新,然后再分发给data节点,分发的过程就是publish,也就是本文的重点。本文将以索引创建的流程来讲解这个过程。
master首先创建新的集群元数据
当master接收到创建索引的请求后,首先进入如下MetadataCreateIndexService.onlyCreateIndex()函数:
1 | private void onlyCreateIndex(final CreateIndexClusterStateUpdateRequest request, |
master进行任何更新State的操作时,都会调用submitStateUpdateTask() -> ClusterService.submitStateUpdateTasks() -> MasterService.submitStateUpdateTasks() -> TaskBatcher.submitTasks(),在submitTasks()中会对tasks分类合并,有些task可以合并执行以加快速度:
1 | public void submitTasks(List<? extends BatchedTask> tasks, @Nullable TimeValue timeout) throws EsRejectedExecutionException { |
可以看到,该函数会去检查task的batchingKey是否一致,若一致的话,放在相同的batchingKey下,最常见的startShard/failShard元数据更新就是可以合并执行,batchingKey相同的前提是调用ClusterService.submitStateUpdateTask时,使用了相同的ClusterStateTaskExecutor,而startShard使用了全局唯一的ShardStartedClusterStateTaskExecutor作为key。在create中,我们明显可以看到每个index创建都会产生新的AckedClusterStateUpdateTask作为batchingKey,索引创建流程只能逐个全局同步。这里会从线程池中产生我们熟悉的[node][masterService#updateTask][T#1]线程进行构造新的ClusterStae。
1 | private void runTasks(TaskInputs taskInputs) { |
这个函数主要做了如下事情:
1.调用calculateTaskOutputs以产生新的ClusterState。产生过程可以参考前面自定义类的AckedClusterStateUpdateTask.execute调用applyCreateIndexRequest产生新的集群状态,这里将不是本文重点。
2.调用publish()进行全局广播,全局广播包括主master本身。
master全局广播
预处理
进行广播前,master还会做如下预处理:
1 | public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener<Void> publishListener, AckListener ackListener) { |
在CoordinatorPublication初始化时,会针对整个广播设置cancel超时时间(cluster.publish.timeout publish,默认30s)+info超时(cluster.publish.info_timeout, 默认10s),info超时日志如下:
1 | [INFO ] after [10s] publication of cluster state version [407258] is still waiting for {node2}[SENT_APPLY_COMMIT], {node1}[SENT_APPLY_COMMIT] |
info超时会打印master还没接收到的commit响应的所有节点。cancel超时后,会将整个publish过程置为 cancelled+isCompleted。对于还未完成第二次响应的节点,直接置为失败,master会直接进入整个publish收尾阶段。
1 | public void cancel(String reason) { |
onPossibleCompletion()会在后面介绍。
向每个节点广播请求
开始对每个数据节点发送元数据请求:
1 | void sendPublishRequest() { |
master在对每个目标节点(包含master节点本身)发送publish前,会分别对每个目标节点构建PublicationTarget对象,来跟踪publish state,表明当前节点的publish进行到了哪个阶段:
1 | enum PublicationTargetState { |
当对目标节点置位SENT_PUBLISH_REQUEST后,进入PublicationContext.sendPublishRequest()进行clusterState的发送。同时定义了PublishResponseHandler作为master响应目前节点第一次response的处理类。在sendPublishRequest时,ES会判断是否向目标节点发送全量ClusterState还是仅仅发送diff的ClusterState。最常见的全量发布ClusterState的情况就是有新的节点加入到集群。
目标节点接收到maser发送的publish请求
目标节点的PublicationTransportHandler.handleIncomingPublishRequest首先接收到master发送的请求,做了以下三件事情:
1.然后第一步就是解析出最新的ClusterState。
2.其次进入acceptState()->CoordinationState.handlePublishRequest()构建响应master的response。
3.再次调用becomeFollower()变身Follow(非master本身)。
我们看下第二步构建Response时做了哪些事情:
1 | public PublishResponse handlePublishRequest(PublishRequest publishRequest) { |
该函数主要做了如下事情:将接受到的集群元数据进行落盘。针对不同角色的节点,persistedState也不同:
1.目标节点为masters属性的角色时,persistedState=LucenePersistedState。
2.目标节点为仅仅为data属性的角色时,persistedState=AsyncLucenePersistedState。
这里还有lastSeenClusterState元数据,它的作用仅仅值是为了第二次接受到commit请求时做版本等校验用的。并不会作为接收到的临时元数据使用。
这里字面上可以知道:对master节点,对于ClusterState落盘时同步操作,若IO压力大的话,对落盘相当耗时,会拖累整个集群publish耗时;对数据节点,采用异步落盘的方式,避免阻塞整个落盘导致的响应超时。我们看下如何落盘,以及落盘落了哪些信息:
1 | void writeIncrementalStateAndCommit(long currentTerm, ClusterState previousClusterState, |
需要知道的是:
1.在global Metadate中,并没有存储所有indexMetadata,而是作为单独一项存储的。
2.存储的时候回遍历每个data.path分别都存储一份。所以在线上环境,我们需要严格将data和master节点区分开,以免data角色将磁盘IO占用过多,而影响元数据的同步落盘操作。
注意:数据节点仅仅是将集群元数据保存在了本地,并更新了,但是还没有真正合并到data节点当前使用的ClusterState中。真正将新的ClusterState当成本地元数据,是在接收到master发送的commit请求后。
master接收到目标节点发送的publish响应
master接收到data响应的响应是在Publication$PublicationTarget$PublishResponseHandler.onResponse(),首先将对该节点publish请求状态置为WAITING_FOR_QUORUM,然后进入PublicationTarget.handlePublishResponse()
1 | void handlePublishResponse(PublishResponse publishResponse) { |
该函数主要做了如下事情:
1.首先检查是否已经对某些节点发送了applyCommitRequest请求。master可以对数据节点发送applyCommitRequest是有条件的:必须有一半的master属性的节点已经响应了。(raft协议的特性)。
2.若master还没有发送过applyCommitRequest请求,那么会检查是否有资格可以对data节点发送第二次commit请求了。若有资格发送了,那么对所有状态为WAITING_FOR_QUORUM的节发送commit请求。
master向目标节点发送二次commit请求
master收到过半master属性的第一次response请求后,开始对WAITING_FOR_QUORUM状态的节点发送commit请求:
1 | void sendApplyCommit() { |
目标节点接收到master发送的二次commit请求
目前目标节点收到master发送的commit请求后,首先进入了Coordinator.handleApplyCommit()
1 | private void handleApplyCommit(ApplyCommitRequest applyCommitRequest, ActionListener<Void> applyListener) { |
针对不同角色,目标节点做了不同的反应:
1.若本节点就是主master节点,那么调用PublicationTransportHandler.transportCommitCallback(),作用仅仅是响应回去。 master只有在整个publish()完成后,才会将新元数据作为本地的全局元数据(后面会讲)。
2.若本目标节点是非主master节点,则调用ClusterApplierService.onNewClusterState将新ClusterState节点作为本节点维持的最新全局ClusterState。
我们看下本目标节点在替换元数据的时候哪些事情,实际进入的是ClusterApplierService.runTask()
1 | private void runTask(UpdateTask task) { |
非主master节点主要做了如下事情:
1.获取的是最新ClusterState。
2.检查新旧ClusterState是否一致,若一致,则不做任何操作。
3.若旧ClusterState有变化,则调用applyChanges(),根据最新ClusterState适配本地。
我们再看下applyChanges()如何适配本地的
1 | private void applyChanges(UpdateTask task, ClusterState previousClusterState, ClusterState newClusterState, StopWatch stopWatch) { |
非主master节点主要做了如下事情:
1.检查是否有节点掉线&新增,对于新增节点主动进行connect,同时打印如下日志:
1 | [2020-08-10T12:12:22,781][INFO ][o.e.c.s.ClusterApplierService] [node1] added {{node2}}, term: 28, version: 483578, reason: ApplyCommitRequest{term=28, version=483578, sourceNode={master}} |
2.本地调用callClusterStateAppliers()根据最新的ClusterState做一些操作,比如创建IndexService,删除索引数据,分配分片等操作。我们看下其中重要实现:
1 | private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) { |
clusterStateAppliers={highPriorityStateAppliers, normalPriorityStateAppliers, lowPriorityStateAppliers},我们需要着重强调下highPriorityStateAppliers中的IndicesClusterStateService.applyClusterState():
1 | public synchronized void applyClusterState(final ClusterChangedEvent event) { |
非master节点的本地维护元数据将在可以根据全局ClusterState进行及时调整。
3.关闭掉线节点的连接。
4.设置本地维护的最新全局ClusteState,存放在ClusterApplierService.state对象中。
5.调用callClusterStateListeners()来进行回调响应,比如数据节点需要请求master时,发现本地找不到master,那么就会创建一个listener,等待本地维护的ClusteState发生变化时,再去retry;又比如本节点是master,更新元数据后不是master,会做一些收尾处理等。
master接收到目标节点发送二次commit响应
master收到目标节点的二次响应后,最先进入ApplyCommitResponseHandler.onResponse()函数:
1 | public void onResponse(TransportResponse.Empty ignored) { |
master主要做了如下操作:
1.确认针对目标节点的二次响应完成:修改目标节点的publish状态为APPLIED_COMMIT;进入CoordinatorPublication构造函数的lister中更新master维护的每个数据节点最新ClusterState version。
1 | public void onNodeAck(DiscoveryNode node, Exception e) { |
对每个目标节点二次commit响应做了如下操作:
1.1 若目标节点就是本主master节点,那么仅标记localNodeAckEvent状态为done(后面会用)
1.2 若目标节点是非主master节点,则更新本主master维护的其他节点的ClusterState version(若落后严重,会主动被master剔除集群,后面会介绍)。
2.调用onPossibleCompletion()检查整个publish是否完成了。
1 | private void onPossibleCompletion() { |
主要做了如下检查:
2.1 检查cancelled是否置为失败,若未失败,且还有至少一个目标节点未完成二次commit,那么就退出等待。
2.2 若applyCommitRequest为空,说明是超时导致的失败,代表整个publish已经失败的完成了,会进入onCompletion()。
2.3 此时所有节点已经完成二次commit响应,进入onCompletion()
再继续看下主master调用onCompletion()做了哪些事情:
1 | //master节点上,所有任务已经完成(isCompleted=true),可能任务全部失败了(超时30会设置),也可能任务全部成功了 |
主要做了如下事情:若主master收到本节点的二次commti响应(设置localNodeAckEvent为done),那么
1.调用ClusterApplierService.onNewClusterState将新的ClusterState融合到本地节点中(参考data融合新的全局元数据)
2.开始针对本地维护的数据节点ClusterState version,若再超时时间外仍然低于当前同步的version,则将数据节点从集群中剔除,超时时间90s(由cluster.follower_lag.timeout参数决定)
1 | void checkForLag(final long version) { |
会打印如下日志:
1 | [2020-08-10T14:12:24,781][WARN ][o.e.c.c.LagDetector] [master] node [node1] is lagging at cluster state version [483037], although publication of cluster state version [483038] completed [1.5m] ago |
然后直接调用Coordinator.removeNode()再次广播全局元数据。
3.打印publish超时未完成日志。
1 | [2020-08-10T12:12:24,781][WARN ][o.e.c.c.C.CoordinatorPublication] [master1] after [30.1s] publication of cluster state version [483038] is still waiting for {node1}[SENT_APPLY_COMMIT], {node2} [SENT_APPLY_COMMIT] |
总结
master广播全过程分为第一次广播+第二次commit请求,只有过半master节点响应才能继续第二次广播。在30s超时时间后,主动设置publish状态为true, 在规定时间内元数据更新较慢的节点,master会主动将其剔除集群。