现象
现象1
最近在下线ES5.6.8集群节点时候, 发现了一个很奇怪的现象, 我先把cluster参数给贴出来看下:
1 | PUT _cluster/settings |
然后我通过如下命令下线4个节点:
1 | PUT _cluster/settings |
发现此时集群上这4个节点分别有一个分片开始进行move, what for? 按我们的理解, move操作的并发应该被我们通过参数cluster_concurrent_rebalance
控制为1, 可是为啥为4。好像这个下线节点时候shard的并发与下线节点个数一致, 而不是受我们控制。
现象2
首先向集群发送如下命令:
1 | PUT _cluster/settings |
然后我通过如下命令下线4个节点:
1 | PUT _cluster/settings |
集群突然有144个分片处于move中,按道理node_concurrent_recoveries=0后, 集群将不会再有分片move。实际原因是参数cluster.routing.allocation.node_concurrent_incoming_recoveries
和cluster.routing.allocation.node_concurrent_outgoing_recoveries
的初始化可以由cluster.routing.allocation.node_concurrent_recoveries
赋值, 但是之后对cluster.routing.allocation.node_concurrent_recoveries
的修改,将不会影响incoming和outgoing的取值, 此时这两个参数任然是50。所以就产生了node_concurrent_recoveries=0
、集群还有较大的move.
源码查看
带着上面的疑问, 试图从代码中找到原因, 我们知道, 任何集群元数据变动都会跳到BalancedShardsAllocator.allocate():
1 | public void allocate(RoutingAllocation allocation) { |
这里操作都是依据分片分配到各个节点的打分为依据来操作, 主要分为三步: 分配unassigned分片, move分片, 均衡分片。 在开始的现象, 明显不是第一步, 只可能是move分片、或者均衡分片。
move分片
我们首先进入move分片看下能否解释现象:
1 | public void moveShards() { |
这里我们需要注意一个事实: 这里轮循所有分片是有顺序的, 依次从每个节点上选择一个分片判断, 首先第一轮: 选择第一个节点上的第一个分片, 然后第二个节点上的第一个分片…, 最后一个节点的第一个分片, 再开始第二轮: 第一个节点上第二个分片……。
然后再进入decideMove看下具体的move的逻辑:
1 | /** |
该函数主要做了如下逻辑:
- 若该分片不处于Started, 那么将不进行move。
- 首先通过canRemain检查该分片是否还可以继续存留在该节点上, 主要由以下几个因素决定:
- awareness(机房)
- 磁盘空间
- ip排除(就是本次修改的参数)
- 每个节点上最多分配个数(total_shards_per_node)。
很显然, 本次过程中ip排除了,那么该分片不能再待在本节点上了。
- 那么在所有节点上找一个节点, 该分片可以迁移上去。找的依据是canAllocate, canAllocate决定因素并没有包含cluster_concurrent_rebalance, 但是增加了node_concurrent_outgoing_recoveries、node_concurrent_incoming_recoveries。这两个参数的含义是: 决定分片是否可以分配到某个节点上的依据是该节点上正在迁入/迁出的分片是否达到阈值, 若没有达到阈值, 就可以分配。
重点:
这就解释了为啥通过调整_ip时, 为啥不能通过cluster_concurrent_rebalance控制rebalance的并发了, ip调整时并发是通过迁入/迁出并发控制的, 也就是我们最开始观察到的现象, 正因为我们设置了node_concurrent_recoveries, 导致每个分片上迁出并发只能是1, 那么设置exclude.ip为4个时, 我们在前端看到有4个分片正处于rebalance。这里完全都没想过使用cluster_concurrent_rebalance来控制迁移并发的。这里不用cluster_concurrent_rebalance, 官方也给出了原因:1
2#14259 added a check to honor rebalancing policies (i.e., rebalance only on green state) when moving shards due to changes in allocation filtering rules. The rebalancing policy is there to make sure that we don't try to even out the number of shards per node when we are still missing shards. However, it should not interfere with explicit user commands (allocation filtering) or things like the disk threshold wanting to move shards because of a node hitting the high water mark.
#14259 was done to address #14057 where people reported that using allocation filtering caused many shards to be moved at once. This is however a none issue - with 1.7 (where the issue was reported) and 2.x, we protect recovery source nodes by limitting the number of concurrent data streams they can open (i.e., we can have many recoveries, but they will be throttled). In 5.0 we came up with a simpler and more understandable approach where we have a hard limit on the number of outgoing recoveries per node (on top of the incoming recoveries we already had).
大致就是说, rebalance策略主要是为了解决的是: 当我们未对全局分片有足够了解的时候(当全局分片并未处于完全均衡的时候), 我们并不会去干扰每个节点的分片个数。当然, rebalance更不应该去干扰显示的用户命令比如分片排除, 或者达到磁盘阈值这样的情况。这里并发是通过Incoming/OutComing这样的并发去控制的。原因清楚了吧。
- 找到一个可以迁移到分片后, 然后通过routingNodes.relocateShard修改relocatingShards值, 这个值就是我们在前端看到的正在rebalance的个数。
均衡分片
既然现象是由move分片来解释了, 那么我们也来了解均衡分片大致做了那些事情呢?
- 通过allocation.deciders().canRebalance(allocation).type() 首先检查是否可以进行rebalance。通过ClusterRebalanceType来控制:
- 若设置为ALWAYS, 那么是可以进行rebalance的。
- 若设置为INDICES_PRIMARIES_ACTIVE, 那么当只有所有主分片处于active的时候才可以进行rebalance的。
- 若设置为INDICES_ALL_ACTIVE, 那么是禁止rebalance的。
- 若可以进行rebalance, 然后进入balanceByWeights()。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103private void balanceByWeights() {
final AllocationDeciders deciders = allocation.deciders();
final ModelNode[] modelNodes = sorter.modelNodes;
final float[] weights = sorter.weights;
//轮循所有索引
for (String index : buildWeightOrderedIndices()) {
IndexMetaData indexMetaData = metaData.index(index);
// find nodes that have a shard of this index or where shards of this index are allowed to be allocated to,
// move these nodes to the front of modelNodes so that we can only balance based on these nodes
// 首先选择与该索引有关的节点, 这些节点, 那么该索引有分配在上面分配, 要么该索引可以分配到该节点上。默认情况下, 除了exclude外, 所有节点都是相关的。
int relevantNodes = 0;
for (int i = 0; i < modelNodes.length; i++) {
ModelNode modelNode = modelNodes[i];
if (modelNode.getIndex(index) != null
|| deciders.canAllocate(indexMetaData, modelNode.getRoutingNode(), allocation).type() != Type.NO) {
// swap nodes at position i and relevantNodes
modelNodes[i] = modelNodes[relevantNodes];
modelNodes[relevantNodes] = modelNode;
relevantNodes++;
}
}
// 若相关节点少于2个,就没有rebalance的必要了
if (relevantNodes < 2) {
continue;
}
// 对相关节点与该索引之间进行打分, 分值从最小到最大进行排序(分值越大说明分配越不合理)
sorter.reset(index, 0, relevantNodes);
int lowIdx = 0;
int highIdx = relevantNodes - 1;
while (true) {
final ModelNode minNode = modelNodes[lowIdx];
final ModelNode maxNode = modelNodes[highIdx];
advance_range:
// 假如最大分值的节点, 该索引有分片存在
if (maxNode.numShards(index) > 0) {
//计算最大和最小分值差大于阈值
final float delta = absDelta(weights[lowIdx], weights[highIdx]);
//若差值小于阈值1
if (lessThan(delta, threshold)) {
if (lowIdx > 0 && highIdx-1 > 0 // is there a chance for a higher delta?
&& (absDelta(weights[0], weights[highIdx-1]) > threshold) // check if we need to break at all
) { //low和high差距大于阈值,那么还是可以继续找的
/* This is a special case if allocations from the "heaviest" to the "lighter" nodes is not possible
* due to some allocation decider restrictions like zone awareness. if one zone has for instance
* less nodes than another zone. so one zone is horribly overloaded from a balanced perspective but we
* can't move to the "lighter" shards since otherwise the zone would go over capacity.
*
* This break jumps straight to the condition below were we start moving from the high index towards
* the low index to shrink the window we are considering for balance from the other direction.
* (check shrinking the window from MAX to MIN)
* See #3580
*/
break advance_range;
}
if (logger.isTraceEnabled()) {
logger.trace("Stop balancing index [{}] min_node [{}] weight: [{}] max_node [{}] weight: [{}] delta: [{}]",
index, maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
}
break; //low和high差距小于阈值,那么完全不用找了。直接退出当前索引的rebalance,进行下一个索引的rebalance
}
if (logger.isTraceEnabled()) {
logger.trace("Balancing from node [{}] weight: [{}] to node [{}] weight: [{}] delta: [{}]",
maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
}
/* pass the delta to the replication function to prevent relocations that only swap the weights of the two nodes.
* a relocation must bring us closer to the balance if we only achieve the same delta the relocation is useless */
if (tryRelocateShard(minNode, maxNode, index, delta)) {
/*
* TODO we could be a bit smarter here, we don't need to fully sort necessarily
* we could just find the place to insert linearly but the win might be minor
* compared to the added complexity
*/
weights[lowIdx] = sorter.weight(modelNodes[lowIdx]);
weights[highIdx] = sorter.weight(modelNodes[highIdx]);
sorter.sort(0, relevantNodes);
lowIdx = 0;
highIdx = relevantNodes - 1;
// 再继续查找当前索引在别的节点上是都有更合适的分配。
continue;
}
}
// 若没有分配, 则开始low+1 进行第二轮重新开始。
//第一轮:首先第一轮从high不变, low每次增加1,向higt靠近,直到low和high一样, 第二轮:然后high--, 继续low向higt靠近;再第三轮,这样实际循环次数是(high-low)(high-low)/2, 很像一个倒着的乘法表
if (lowIdx < highIdx - 1) {
/* Shrinking the window from MIN to MAX
* we can't move from any shard from the min node lets move on to the next node
* and see if the threshold still holds. We either don't have any shard of this
* index on this node of allocation deciders prevent any relocation.*/
lowIdx++;
} else if (lowIdx > 0) {
/* Shrinking the window from MAX to MIN
* now we go max to min since obviously we can't move anything to the max node
* lets pick the next highest */
lowIdx = 0;
highIdx--;
} else {
/* we are done here, we either can't relocate anymore or we are balanced */
break;
}
}
}
}
rebalance时, 以index来循环, 大概逻辑就是:
- 针对每个索引在每个节点上分配进行一个打分。打分依据是(该索引是否在所有节点是否分配均衡&&所有shard是否在所有节点分配均衡), 分支越低, 说明分配越合理。
- 从分值高低差的阈值来判断是否需要rebalance。阈值为1。
总结
若仅仅增加节点, 那么就是rebalance操作, 此时cluster_concurrent_rebalance是生效的。 若做了exclude操作, 那么就变成了move操作, 此时node_concurrent_recoveries就该生效了。 move, rebalance, allocation等操作都是由多个决策器一起决定如何分配的, 只要合理使用各种决策器, 那么分片分配就能被我们合理的掌握了。