Skip to content

Commit 7dde9e5

Browse files
[8.x] Add sanity check to ReindexDatastreamIndexAction (#120231) (#120295)
* Add sanity check to ReindexDatastreamIndexAction (#120231) Add an assert that checks that source and destination index have the same doc count. This requires a refresh of the dest index and a search request against both the source and dest index, so will only be run if asserts are enabled. * Compilation error due to lucene totalHits change
1 parent c340ba5 commit 7dde9e5

File tree

2 files changed

+58
-15
lines changed

2 files changed

+58
-15
lines changed

docs/changelog/120231.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120231
2+
summary: Add sanity check to `ReindexDatastreamIndexAction`
3+
area: Data streams
4+
type: enhancement
5+
issues: []

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,17 @@
88

99
import org.apache.logging.log4j.LogManager;
1010
import org.apache.logging.log4j.Logger;
11+
import org.apache.lucene.search.TotalHits;
1112
import org.elasticsearch.ElasticsearchException;
1213
import org.elasticsearch.action.ActionListener;
1314
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
1415
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest;
1516
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse;
1617
import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction;
18+
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
19+
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
1720
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
21+
import org.elasticsearch.action.search.SearchRequest;
1822
import org.elasticsearch.action.support.ActionFilters;
1923
import org.elasticsearch.action.support.HandledTransportAction;
2024
import org.elasticsearch.action.support.IndicesOptions;
@@ -26,12 +30,14 @@
2630
import org.elasticsearch.cluster.service.ClusterService;
2731
import org.elasticsearch.common.settings.Setting;
2832
import org.elasticsearch.common.settings.Settings;
33+
import org.elasticsearch.core.Assertions;
2934
import org.elasticsearch.core.TimeValue;
3035
import org.elasticsearch.index.IndexSettings;
3136
import org.elasticsearch.index.reindex.BulkByScrollResponse;
3237
import org.elasticsearch.index.reindex.ReindexAction;
3338
import org.elasticsearch.index.reindex.ReindexRequest;
3439
import org.elasticsearch.injection.guice.Inject;
40+
import org.elasticsearch.search.builder.SearchSourceBuilder;
3541
import org.elasticsearch.tasks.Task;
3642
import org.elasticsearch.tasks.TaskId;
3743
import org.elasticsearch.threadpool.ThreadPool;
@@ -138,6 +144,7 @@ protected void doExecute(
138144
.<AcknowledgedResponse>andThen(l -> createIndex(sourceIndex, destIndexName, l, taskId))
139145
.<BulkByScrollResponse>andThen(l -> reindex(sourceIndexName, destIndexName, l, taskId))
140146
.<AcknowledgedResponse>andThen(l -> copyOldSourceSettingsToDest(settingsBefore, destIndexName, l, taskId))
147+
.<AcknowledgedResponse>andThen(l -> sanityCheck(sourceIndexName, destIndexName, l, taskId))
141148
.andThenApply(ignored -> new ReindexDataStreamIndexAction.Response(destIndexName))
142149
.addListener(listener);
143150
}
@@ -220,21 +227,6 @@ void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkBy
220227
client.execute(ReindexAction.INSTANCE, reindexRequest, listener);
221228
}
222229

223-
private void addBlockIfFromSource(
224-
IndexMetadata.APIBlock block,
225-
Settings settingsBefore,
226-
String destIndexName,
227-
ActionListener<AddIndexBlockResponse> listener,
228-
TaskId parentTaskId
229-
) {
230-
if (settingsBefore.getAsBoolean(block.settingName(), false)) {
231-
var errorMessage = String.format(Locale.ROOT, "Add [%s] block to index [%s] was not acknowledged", block.name(), destIndexName);
232-
addBlockToIndex(block, destIndexName, failIfNotAcknowledged(listener, errorMessage), parentTaskId);
233-
} else {
234-
listener.onResponse(null);
235-
}
236-
}
237-
238230
private void updateSettings(
239231
String index,
240232
Settings.Builder settings,
@@ -302,4 +294,50 @@ private void addBlockToIndex(
302294
addIndexBlockRequest.setParentTask(parentTaskId);
303295
client.admin().indices().execute(TransportAddIndexBlockAction.TYPE, addIndexBlockRequest, listener);
304296
}
297+
298+
private void getIndexDocCount(String index, TaskId parentTaskId, ActionListener<Long> listener) {
299+
SearchRequest countRequest = new SearchRequest(index);
300+
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0).trackTotalHits(true);
301+
countRequest.allowPartialSearchResults(false);
302+
countRequest.source(searchSourceBuilder);
303+
countRequest.setParentTask(parentTaskId);
304+
client.search(countRequest, listener.delegateFailure((delegate, response) -> {
305+
var totalHits = response.getHits().getTotalHits();
306+
assert totalHits.relation == TotalHits.Relation.EQUAL_TO;
307+
delegate.onResponse(totalHits.value);
308+
}));
309+
}
310+
311+
private void sanityCheck(
312+
String sourceIndexName,
313+
String destIndexName,
314+
ActionListener<AcknowledgedResponse> listener,
315+
TaskId parentTaskId
316+
) {
317+
if (Assertions.ENABLED) {
318+
logger.debug("Comparing source [{}] and dest [{}] doc counts", sourceIndexName, destIndexName);
319+
client.execute(
320+
RefreshAction.INSTANCE,
321+
new RefreshRequest(destIndexName),
322+
listener.delegateFailureAndWrap((delegate, ignored) -> {
323+
getIndexDocCount(sourceIndexName, parentTaskId, delegate.delegateFailureAndWrap((delegate1, sourceCount) -> {
324+
getIndexDocCount(destIndexName, parentTaskId, delegate1.delegateFailureAndWrap((delegate2, destCount) -> {
325+
assert sourceCount == destCount
326+
: String.format(
327+
Locale.ROOT,
328+
"source index [%s] has %d docs and dest [%s] has %d docs",
329+
sourceIndexName,
330+
sourceCount,
331+
destIndexName,
332+
destCount
333+
);
334+
delegate2.onResponse(null);
335+
}));
336+
}));
337+
})
338+
);
339+
} else {
340+
listener.onResponse(null);
341+
}
342+
}
305343
}

0 commit comments

Comments
 (0)