-
Notifications
You must be signed in to change notification settings - Fork 25.4k
Reset relocation/allocation failure counter on node join/shutdown #119968
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
cd6a896
dd28da3
f215455
d4140e5
f6918de
f2d951e
9615d7f
2f04afa
82533cc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 119968 | ||
summary: Reset relocation/allocation failure counter on node join/shutdown | ||
area: Allocation | ||
type: enhancement | ||
issues: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,14 +16,19 @@ | |
import org.elasticsearch.cluster.node.DiscoveryNodes; | ||
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; | ||
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator; | ||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; | ||
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics; | ||
import org.elasticsearch.common.Strings; | ||
import org.elasticsearch.common.collect.Iterators; | ||
import org.elasticsearch.common.util.Maps; | ||
import org.elasticsearch.core.Assertions; | ||
import org.elasticsearch.core.Nullable; | ||
import org.elasticsearch.core.Tuple; | ||
import org.elasticsearch.index.Index; | ||
import org.elasticsearch.index.IndexNotFoundException; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.logging.LogManager; | ||
import org.elasticsearch.logging.Logger; | ||
|
||
import java.util.ArrayDeque; | ||
import java.util.ArrayList; | ||
|
@@ -44,6 +49,8 @@ | |
import java.util.stream.Stream; | ||
import java.util.stream.StreamSupport; | ||
|
||
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY; | ||
|
||
/** | ||
* {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}. | ||
* It can be either initialized as mutable or immutable allowing or disallowing changes to its elements. | ||
|
@@ -60,6 +67,13 @@ | |
*/ | ||
public class RoutingNodes implements Iterable<RoutingNode> { | ||
|
||
private static final Logger logger = LogManager.getLogger(RoutingNodes.class); | ||
public static final String RESET_FAILED_ALLOCATION_COUNTER_LOG_MSG = | ||
"Resetting failure counter for %d shard(s) that have reached their max allocation retires (%s)"; | ||
public static final String RESET_FAILED_RELOCATION_COUNTER_LOG_MSG = | ||
"Resetting failure counter for %d shard(s) that have reached their max relocation retries (%s)"; | ||
private static final int MAX_SHARDS_IN_LOG_MSG = 20; | ||
|
||
private final Map<String, RoutingNode> nodesToShards; | ||
|
||
private final UnassignedShards unassignedShards; | ||
|
@@ -1298,14 +1312,47 @@ public boolean hasAllocationFailures() { | |
})); | ||
} | ||
|
||
public void resetFailedCounter(RoutingChangesObserver routingChangesObserver) { | ||
public boolean hasRelocationFailures() { | ||
for (var shardRoutings : assignedShards.values()) { | ||
for (var routing : shardRoutings) { | ||
if (routing.relocationFailureInfo() != null && routing.relocationFailureInfo().failedRelocations() > 0) { | ||
return true; | ||
} | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
public void resetFailedCounter(RoutingAllocation allocation) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've chosen to add the logging here, to be able to produce some reliable information and also it integrates with the existing code of going through the failures. |
||
final var observer = allocation.changes(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: can we keep the old name of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I very much prefer the |
||
int shardsWithMaxFailedAllocations = 0; | ||
int shardsWithMaxFailedRelocations = 0; | ||
List<ShardId> topShardIdsWithFailedAllocations = new ArrayList<>(); | ||
List<ShardId> topShardIdsWithFailedRelocations = new ArrayList<>(); | ||
|
||
final var unassignedIterator = unassigned().iterator(); | ||
while (unassignedIterator.hasNext()) { | ||
ShardRouting shardRouting = unassignedIterator.next(); | ||
UnassignedInfo unassignedInfo = shardRouting.unassignedInfo(); | ||
int failedAllocations = unassignedInfo.failedAllocations(); | ||
if (failedAllocations > 0) { | ||
try { | ||
final var maxRetry = SETTING_ALLOCATION_MAX_RETRY.get( | ||
allocation.metadata().getIndexSafe(shardRouting.index()).getSettings() | ||
); | ||
if (failedAllocations >= maxRetry) { | ||
shardsWithMaxFailedAllocations++; | ||
if (topShardIdsWithFailedAllocations.size() <= MAX_SHARDS_IN_LOG_MSG) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: I think we can check the size along with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think so, |
||
topShardIdsWithFailedAllocations.add(shardRouting.shardId()); | ||
} | ||
} | ||
} catch (IndexNotFoundException e) { | ||
// ignore | ||
} | ||
} | ||
unassignedIterator.updateUnassigned( | ||
new UnassignedInfo( | ||
unassignedInfo.failedAllocations() > 0 ? UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.reason(), | ||
failedAllocations > 0 ? UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.reason(), | ||
unassignedInfo.message(), | ||
unassignedInfo.failure(), | ||
0, | ||
|
@@ -1317,7 +1364,7 @@ public void resetFailedCounter(RoutingChangesObserver routingChangesObserver) { | |
unassignedInfo.lastAllocatedNodeId() | ||
), | ||
shardRouting.recoverySource(), | ||
routingChangesObserver | ||
observer | ||
); | ||
} | ||
|
||
|
@@ -1326,6 +1373,20 @@ public void resetFailedCounter(RoutingChangesObserver routingChangesObserver) { | |
for (ShardRouting shardRouting : routingNode) { | ||
if (shardRouting.relocationFailureInfo() != null && shardRouting.relocationFailureInfo().failedRelocations() > 0) { | ||
shardsWithRelocationFailures.add(shardRouting); | ||
try { | ||
int failedRelocations = shardRouting.relocationFailureInfo().failedRelocations(); | ||
final var maxRetry = SETTING_ALLOCATION_MAX_RETRY.get( | ||
allocation.metadata().getIndexSafe(shardRouting.index()).getSettings() | ||
); | ||
if (failedRelocations >= maxRetry) { | ||
shardsWithMaxFailedRelocations++; | ||
if (topShardIdsWithFailedRelocations.size() <= MAX_SHARDS_IN_LOG_MSG) { | ||
topShardIdsWithFailedRelocations.add(shardRouting.shardId()); | ||
} | ||
Comment on lines
+1383
to
+1385
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar nit: checking size before resolving setting value would be my preference. |
||
} | ||
} catch (IndexNotFoundException e) { | ||
// ignore | ||
} | ||
} | ||
} | ||
|
||
|
@@ -1336,6 +1397,17 @@ public void resetFailedCounter(RoutingChangesObserver routingChangesObserver) { | |
assignedShardsAdd(updated); | ||
} | ||
} | ||
|
||
if (shardsWithMaxFailedAllocations > 0) { | ||
logger.info( | ||
Strings.format(RESET_FAILED_ALLOCATION_COUNTER_LOG_MSG, shardsWithMaxFailedAllocations, topShardIdsWithFailedAllocations) | ||
); | ||
} | ||
if (shardsWithMaxFailedRelocations > 0) { | ||
logger.info( | ||
Strings.format(RESET_FAILED_RELOCATION_COUNTER_LOG_MSG, shardsWithMaxFailedRelocations, topShardIdsWithFailedRelocations) | ||
); | ||
} | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,13 +12,15 @@ | |
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.cluster.ClusterChangedEvent; | ||
import org.elasticsearch.cluster.ClusterInfoService; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.RestoreInProgress; | ||
import org.elasticsearch.cluster.health.ClusterHealthStatus; | ||
import org.elasticsearch.cluster.metadata.AutoExpandReplicas; | ||
import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
import org.elasticsearch.cluster.metadata.Metadata; | ||
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; | ||
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.cluster.routing.IndexRoutingTable; | ||
|
@@ -573,15 +575,71 @@ public void addAllocFailuresResetListenerTo(ClusterService clusterService) { | |
}); | ||
|
||
clusterService.addListener((changeEvent) -> { | ||
if (changeEvent.nodesAdded() && changeEvent.state().getRoutingNodes().hasAllocationFailures()) { | ||
if (shouldResetAllocationFailures(changeEvent)) { | ||
taskQueue.submitTask("reset-allocation-failures", (e) -> { assert MasterService.isPublishFailureException(e); }, null); | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
* We should reset allocation/relocation failure count to allow further retries when: | ||
pxsalehi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* | ||
* 1. A new node joins the cluster. | ||
* 2. A node shutdown metadata is added that could lead to a node being removed or replaced in the cluster. | ||
* | ||
* Note that removing a non-RESTART shutdown metadata from a node that is still in the cluster is treated similarly and | ||
* will cause resetting the allocation/relocation failures. | ||
*/ | ||
private boolean shouldResetAllocationFailures(ClusterChangedEvent changeEvent) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the PR allocation and relocation are kind of teated similarly and sometimes I've used "allocation" to mean both. |
||
final var clusterState = changeEvent.state(); | ||
|
||
if (clusterState.getRoutingNodes().hasAllocationFailures() == false | ||
&& clusterState.getRoutingNodes().hasRelocationFailures() == false) { | ||
return false; | ||
} | ||
if (changeEvent.nodesAdded()) { | ||
return true; | ||
} | ||
|
||
final var currentNodeShutdowns = clusterState.metadata().nodeShutdowns(); | ||
final var previousNodeShutdowns = changeEvent.previousState().metadata().nodeShutdowns(); | ||
|
||
if (currentNodeShutdowns.equals(previousNodeShutdowns)) { | ||
return false; | ||
} | ||
|
||
for (var currentShutdown : currentNodeShutdowns.getAll().entrySet()) { | ||
var previousNodeShutdown = previousNodeShutdowns.get(currentShutdown.getKey()); | ||
if (currentShutdown.equals(previousNodeShutdown)) { | ||
continue; | ||
} | ||
// A RESTART doesn't necessarily move around shards, so no need to consider it for a reset. | ||
// Furthermore, once the node rejoins after restarting, there will be a reset if necessary. | ||
if (currentShutdown.getValue().getType() == SingleNodeShutdownMetadata.Type.RESTART) { | ||
continue; | ||
} | ||
// A node with no shutdown marker or a RESTART marker receives a non-RESTART shutdown marker | ||
if (previousNodeShutdown == null || previousNodeShutdown.getType() == Type.RESTART) { | ||
return true; | ||
} | ||
} | ||
|
||
for (var previousShutdown : previousNodeShutdowns.getAll().entrySet()) { | ||
var nodeId = previousShutdown.getKey(); | ||
// A non-RESTART marker is removed but the node is still in the cluster. We could re-attempt failed relocations/allocations. | ||
if (currentNodeShutdowns.get(nodeId) == null | ||
&& previousShutdown.getValue().getType() != SingleNodeShutdownMetadata.Type.RESTART | ||
&& clusterState.nodes().get(nodeId) != null) { | ||
return true; | ||
} | ||
} | ||
|
||
return false; | ||
} | ||
|
||
private ClusterState rerouteWithResetFailedCounter(ClusterState clusterState) { | ||
RoutingAllocation allocation = createRoutingAllocation(clusterState, currentNanoTime()); | ||
allocation.routingNodes().resetFailedCounter(allocation.changes()); | ||
allocation.routingNodes().resetFailedCounter(allocation); | ||
reroute(allocation, routingAllocation -> shardsAllocator.allocate(routingAllocation, ActionListener.noop())); | ||
return buildResultAndLogHealthChange(clusterState, allocation, "reroute with reset failed counter"); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: