Skip to content

Reset relocation/allocation failure counter on node join/shutdown #119968

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/119968.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 119968
summary: Reset relocation/allocation failure counter on node join/shutdown
area: Allocation
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,29 @@

package org.elasticsearch.cluster.routing.allocation;

import org.apache.logging.log4j.Level;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.MockIndexEventListener;
import org.elasticsearch.test.MockLog;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;

@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class AllocationFailuresResetIT extends ESIntegTestCase {
Expand Down Expand Up @@ -49,7 +61,7 @@ private void removeAllocationFailuresInjection(String node) {
private void awaitShardAllocMaxRetries() throws Exception {
var maxRetries = MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.get(internalCluster().getDefaultSettings());
assertBusy(() -> {
var state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
var state = safeGet(clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).execute()).getState();
var index = state.getRoutingTable().index(INDEX);
assertNotNull(index);
var shard = index.shard(SHARD).primaryShard();
Expand All @@ -62,7 +74,7 @@ private void awaitShardAllocMaxRetries() throws Exception {

private void awaitShardAllocSucceed() throws Exception {
assertBusy(() -> {
var state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
var state = safeGet(clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).execute()).getState();
var index = state.getRoutingTable().index(INDEX);
assertNotNull(index);
var shard = index.shard(SHARD).primaryShard();
Expand All @@ -72,14 +84,77 @@ private void awaitShardAllocSucceed() throws Exception {
});
}

public void testResetFailuresOnNodeJoin() throws Exception {
public void testResetAllocationFailuresOnNodeJoin() throws Exception {
var node1 = internalCluster().startNode();
injectAllocationFailures(node1);
prepareCreate(INDEX, indexSettings(1, 0)).execute();
awaitShardAllocMaxRetries();
removeAllocationFailuresInjection(node1);
internalCluster().startNode();
awaitShardAllocSucceed();
try (var mockLog = MockLog.capture(RoutingNodes.class)) {
var shardId = internalCluster().clusterService().state().routingTable().index(INDEX).shard(SHARD).shardId();
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"log resetting failed allocations",
RoutingNodes.class.getName(),
Level.INFO,
Strings.format(RoutingNodes.RESET_FAILED_ALLOCATION_COUNTER_LOG_MSG, 1, List.of(shardId))
)
);
internalCluster().startNode();
awaitShardAllocSucceed();
mockLog.assertAllExpectationsMatched();
}
}

public void testResetRelocationFailuresOnNodeJoin() throws Exception {
String node1 = internalCluster().startNode();
createIndex(INDEX, 1, 0);
ensureGreen(INDEX);
final var failRelocation = new AtomicBoolean(true);
String node2 = internalCluster().startNode();
internalCluster().getInstance(MockIndexEventListener.TestEventListener.class, node2).setNewDelegate(new IndexEventListener() {
@Override
public void beforeIndexCreated(Index index, Settings indexSettings) {
if (failRelocation.get()) {
throw new RuntimeException("FAIL");
}
}
});
updateIndexSettings(Settings.builder().put(INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._name", node1), INDEX);
ensureGreen(INDEX);
// await all relocation attempts are exhausted
var maxAttempts = MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.get(Settings.EMPTY);
assertBusy(() -> {
var state = safeGet(clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).execute()).getState();
var shard = state.routingTable().index(INDEX).shard(SHARD).primaryShard();
assertThat(shard, notNullValue());
assertThat(shard.relocationFailureInfo().failedRelocations(), equalTo(maxAttempts));
});
// ensure the shard remain started
var state = safeGet(clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).execute()).getState();
var shard = state.routingTable().index(INDEX).shard(SHARD).primaryShard();
assertThat(shard, notNullValue());
assertThat(shard.state(), equalTo(ShardRoutingState.STARTED));
assertThat(state.nodes().get(shard.currentNodeId()).getName(), equalTo(node1));
failRelocation.set(false);
// A new node joining should reset the counter and allow more relocation retries
try (var mockLog = MockLog.capture(RoutingNodes.class)) {
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"log resetting failed relocations",
RoutingNodes.class.getName(),
Level.INFO,
Strings.format(RoutingNodes.RESET_FAILED_RELOCATION_COUNTER_LOG_MSG, 1, List.of(shard.shardId()))
)
);
internalCluster().startNode();
assertBusy(() -> {
var stateAfterNodeJoin = internalCluster().clusterService().state();
var relocatedShard = stateAfterNodeJoin.routingTable().index(INDEX).shard(SHARD).primaryShard();
assertThat(relocatedShard, notNullValue());
assertThat(stateAfterNodeJoin.nodes().get(relocatedShard.currentNodeId()).getName(), not(equalTo(node1)));
});
mockLog.assertAllExpectationsMatched();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;

import java.util.ArrayDeque;
import java.util.ArrayList;
Expand All @@ -44,6 +49,8 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;

/**
* {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}.
* It can be either initialized as mutable or immutable allowing or disallowing changes to its elements.
Expand All @@ -60,6 +67,13 @@
*/
public class RoutingNodes implements Iterable<RoutingNode> {

private static final Logger logger = LogManager.getLogger(RoutingNodes.class);
public static final String RESET_FAILED_ALLOCATION_COUNTER_LOG_MSG =
"Resetting failure counter for %d shard(s) that have reached their max allocation retires (%s)";
public static final String RESET_FAILED_RELOCATION_COUNTER_LOG_MSG =
"Resetting failure counter for %d shard(s) that have reached their max relocation retries (%s)";
Comment on lines +72 to +74
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
"Resetting failure counter for %d shard(s) that have reached their max allocation retires (%s)";
public static final String RESET_FAILED_RELOCATION_COUNTER_LOG_MSG =
"Resetting failure counter for %d shard(s) that have reached their max relocation retries (%s)";
"Resetting failure counter for [%d] shard(s) that have reached their max allocation retires (%s)";
public static final String RESET_FAILED_RELOCATION_COUNTER_LOG_MSG =
"Resetting failure counter for [%d] shard(s) that have reached their max relocation retries (%s)";

private static final int MAX_SHARDS_IN_LOG_MSG = 20;

private final Map<String, RoutingNode> nodesToShards;

private final UnassignedShards unassignedShards;
Expand Down Expand Up @@ -1298,14 +1312,47 @@ public boolean hasAllocationFailures() {
}));
}

public void resetFailedCounter(RoutingChangesObserver routingChangesObserver) {
public boolean hasRelocationFailures() {
for (var shardRoutings : assignedShards.values()) {
for (var routing : shardRoutings) {
if (routing.relocationFailureInfo() != null && routing.relocationFailureInfo().failedRelocations() > 0) {
return true;
}
}
}
return false;
}

public void resetFailedCounter(RoutingAllocation allocation) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've chosen to add the logging here, to be able to produce some reliable information and also it integrates with the existing code of going through the failures. SETTING_ALLOCATION_MAX_RETRY is an index scoped setting so I'm not referring to any value, just picking those reaching max. Anything under max is not mentioned, as was the case before.

final var observer = allocation.changes();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can we keep the old name of routingChangesObserver?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I very much prefer the observer since it returns a RoutingChangesObserver.

int shardsWithMaxFailedAllocations = 0;
int shardsWithMaxFailedRelocations = 0;
List<ShardId> topShardIdsWithFailedAllocations = new ArrayList<>();
List<ShardId> topShardIdsWithFailedRelocations = new ArrayList<>();

final var unassignedIterator = unassigned().iterator();
while (unassignedIterator.hasNext()) {
ShardRouting shardRouting = unassignedIterator.next();
UnassignedInfo unassignedInfo = shardRouting.unassignedInfo();
int failedAllocations = unassignedInfo.failedAllocations();
if (failedAllocations > 0) {
try {
final var maxRetry = SETTING_ALLOCATION_MAX_RETRY.get(
allocation.metadata().getIndexSafe(shardRouting.index()).getSettings()
);
if (failedAllocations >= maxRetry) {
shardsWithMaxFailedAllocations++;
if (topShardIdsWithFailedAllocations.size() <= MAX_SHARDS_IN_LOG_MSG) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think we can check the size along with failedAllocations > 0 to short-circuit a bit earlier and avoid resolving the setting value.

Copy link
Member Author

@pxsalehi pxsalehi Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so,shardsWithMaxFailedAllocations counts all of the ones that have reached max, while topShardIdsWithFailedAllocations stores only a portion of the shard IDs.

topShardIdsWithFailedAllocations.add(shardRouting.shardId());
}
}
} catch (IndexNotFoundException e) {
// ignore
}
}
unassignedIterator.updateUnassigned(
new UnassignedInfo(
unassignedInfo.failedAllocations() > 0 ? UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.reason(),
failedAllocations > 0 ? UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.reason(),
unassignedInfo.message(),
unassignedInfo.failure(),
0,
Expand All @@ -1317,7 +1364,7 @@ public void resetFailedCounter(RoutingChangesObserver routingChangesObserver) {
unassignedInfo.lastAllocatedNodeId()
),
shardRouting.recoverySource(),
routingChangesObserver
observer
);
}

Expand All @@ -1326,6 +1373,20 @@ public void resetFailedCounter(RoutingChangesObserver routingChangesObserver) {
for (ShardRouting shardRouting : routingNode) {
if (shardRouting.relocationFailureInfo() != null && shardRouting.relocationFailureInfo().failedRelocations() > 0) {
shardsWithRelocationFailures.add(shardRouting);
try {
int failedRelocations = shardRouting.relocationFailureInfo().failedRelocations();
final var maxRetry = SETTING_ALLOCATION_MAX_RETRY.get(
allocation.metadata().getIndexSafe(shardRouting.index()).getSettings()
);
if (failedRelocations >= maxRetry) {
shardsWithMaxFailedRelocations++;
if (topShardIdsWithFailedRelocations.size() <= MAX_SHARDS_IN_LOG_MSG) {
topShardIdsWithFailedRelocations.add(shardRouting.shardId());
}
Comment on lines +1383 to +1385
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar nit: checking size before resolving setting value would be my preference.

}
} catch (IndexNotFoundException e) {
// ignore
}
}
}

Expand All @@ -1336,6 +1397,17 @@ public void resetFailedCounter(RoutingChangesObserver routingChangesObserver) {
assignedShardsAdd(updated);
}
}

if (shardsWithMaxFailedAllocations > 0) {
logger.info(
Strings.format(RESET_FAILED_ALLOCATION_COUNTER_LOG_MSG, shardsWithMaxFailedAllocations, topShardIdsWithFailedAllocations)
);
}
if (shardsWithMaxFailedRelocations > 0) {
logger.info(
Strings.format(RESET_FAILED_RELOCATION_COUNTER_LOG_MSG, shardsWithMaxFailedRelocations, topShardIdsWithFailedRelocations)
);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.AutoExpandReplicas;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
Expand Down Expand Up @@ -573,15 +575,71 @@ public void addAllocFailuresResetListenerTo(ClusterService clusterService) {
});

clusterService.addListener((changeEvent) -> {
if (changeEvent.nodesAdded() && changeEvent.state().getRoutingNodes().hasAllocationFailures()) {
if (shouldResetAllocationFailures(changeEvent)) {
taskQueue.submitTask("reset-allocation-failures", (e) -> { assert MasterService.isPublishFailureException(e); }, null);
}
});
}

/**
* We should reset allocation/relocation failure count to allow further retries when:
*
* 1. A new node joins the cluster.
* 2. A node shutdown metadata is added that could lead to a node being removed or replaced in the cluster.
*
* Note that removing a non-RESTART shutdown metadata from a node that is still in the cluster is treated similarly and
* will cause resetting the allocation/relocation failures.
*/
private boolean shouldResetAllocationFailures(ClusterChangedEvent changeEvent) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the PR allocation and relocation are kind of teated similarly and sometimes I've used "allocation" to mean both.

final var clusterState = changeEvent.state();

if (clusterState.getRoutingNodes().hasAllocationFailures() == false
&& clusterState.getRoutingNodes().hasRelocationFailures() == false) {
return false;
}
if (changeEvent.nodesAdded()) {
return true;
}

final var currentNodeShutdowns = clusterState.metadata().nodeShutdowns();
final var previousNodeShutdowns = changeEvent.previousState().metadata().nodeShutdowns();

if (currentNodeShutdowns.equals(previousNodeShutdowns)) {
return false;
}

for (var currentShutdown : currentNodeShutdowns.getAll().entrySet()) {
var previousNodeShutdown = previousNodeShutdowns.get(currentShutdown.getKey());
if (currentShutdown.equals(previousNodeShutdown)) {
continue;
}
// A RESTART doesn't necessarily move around shards, so no need to consider it for a reset.
// Furthermore, once the node rejoins after restarting, there will be a reset if necessary.
if (currentShutdown.getValue().getType() == SingleNodeShutdownMetadata.Type.RESTART) {
continue;
}
// A node with no shutdown marker or a RESTART marker receives a non-RESTART shutdown marker
if (previousNodeShutdown == null || previousNodeShutdown.getType() == Type.RESTART) {
return true;
}
}

for (var previousShutdown : previousNodeShutdowns.getAll().entrySet()) {
var nodeId = previousShutdown.getKey();
// A non-RESTART marker is removed but the node is still in the cluster. We could re-attempt failed relocations/allocations.
if (currentNodeShutdowns.get(nodeId) == null
&& previousShutdown.getValue().getType() != SingleNodeShutdownMetadata.Type.RESTART
&& clusterState.nodes().get(nodeId) != null) {
return true;
}
}

return false;
}

private ClusterState rerouteWithResetFailedCounter(ClusterState clusterState) {
RoutingAllocation allocation = createRoutingAllocation(clusterState, currentNanoTime());
allocation.routingNodes().resetFailedCounter(allocation.changes());
allocation.routingNodes().resetFailedCounter(allocation);
reroute(allocation, routingAllocation -> shardsAllocator.allocate(routingAllocation, ActionListener.noop()));
return buildResultAndLogHealthChange(clusterState, allocation, "reroute with reset failed counter");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ default RoutingExplanations execute(RoutingAllocation allocation, AllocationComm

try {
if (retryFailed) {
allocation.routingNodes().resetFailedCounter(allocation.changes());
allocation.routingNodes().resetFailedCounter(allocation);
}
return commands.execute(allocation, explain);
} finally {
Expand Down
Loading