Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] Wait while index is blocked #119542

Merged
merged 6 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/119542.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 119542
summary: Wait while index is blocked
area: Transform
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.xpack.core.transform.transforms.TimeRetentionPolicyConfig;
import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformStats;
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource;
import org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSource;
Expand Down Expand Up @@ -152,7 +153,7 @@ public void testBasicTransformStats() throws Exception {
public void testContinuousTransformCrud() throws Exception {
var transformId = "transform-continuous-crud";
var indexName = "continuous-crud-reviews";
createContinuousTransform(indexName, transformId);
createContinuousTransform(indexName, transformId, "reviews-by-user-business-day");
var transformStats = getBasicTransformStats(transformId);
assertThat(transformStats.get("state"), equalTo("started"));

Expand Down Expand Up @@ -181,7 +182,7 @@ public void testContinuousTransformCrud() throws Exception {
deleteTransform(transformId);
}

private void createContinuousTransform(String indexName, String transformId) throws Exception {
private void createContinuousTransform(String indexName, String transformId, String destinationIndex) throws Exception {
createReviewsIndex(indexName, 100, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);

var groups = Map.of(
Expand All @@ -197,8 +198,9 @@ private void createContinuousTransform(String indexName, String transformId) thr
.addAggregator(AggregationBuilders.avg("review_score").field("stars"))
.addAggregator(AggregationBuilders.max("timestamp").field("timestamp"));

var config = createTransformConfigBuilder(transformId, "reviews-by-user-business-day", QueryConfig.matchAll(), indexName)
.setPivotConfig(createPivotConfig(groups, aggs))
var config = createTransformConfigBuilder(transformId, destinationIndex, QueryConfig.matchAll(), indexName).setPivotConfig(
createPivotConfig(groups, aggs)
)
.setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)))
.setSettings(new SettingsConfig.Builder().setAlignCheckpoints(false).build())
.build();
Expand All @@ -216,7 +218,7 @@ private void createContinuousTransform(String indexName, String transformId) thr
@SuppressWarnings("unchecked")
public void testBasicContinuousTransformStats() throws Exception {
var transformId = "transform-continuous-basic-stats";
createContinuousTransform("continuous-basic-stats-reviews", transformId);
createContinuousTransform("continuous-basic-stats-reviews", transformId, "reviews-by-user-business-day");
var transformStats = getBasicTransformStats(transformId);

assertEquals("started", XContentMapValues.extractValue("state", transformStats));
Expand All @@ -230,6 +232,40 @@ public void testBasicContinuousTransformStats() throws Exception {
deleteTransform(transformId);
}

public void testDestinationIndexBlocked() throws Exception {
var transformId = "transform-continuous-blocked-destination";
var sourceIndexName = "source-reviews";
var destIndexName = "destination-reviews";

// create transform & indices, wait until 1st checkpoint is finished
createContinuousTransform(sourceIndexName, transformId, destIndexName);

// block destination index
Request request = new Request("PUT", destIndexName + "/_block/write");
assertAcknowledged(adminClient().performRequest(request));

// index more docs so the checkpoint tries to run, wait until transform stops
assertBusy(() -> {
indexDoc(42, sourceIndexName);
assertEquals(TransformStats.State.WAITING.value(), getTransformState(transformId));
}, 30, TimeUnit.SECONDS);

// unblock index
request = new Request("PUT", destIndexName + "/_settings");
request.setJsonEntity("""
{ "blocks.write": false }
""");
assertAcknowledged(adminClient().performRequest(request));

assertBusy(() -> {
indexDoc(42, sourceIndexName);
assertEquals(TransformStats.State.STARTED.value(), getTransformState(transformId));
}, 30, TimeUnit.SECONDS);

stopTransform(transformId);
deleteTransform(transformId);
}

public void testTransformLifecycleInALoop() throws Exception {
String transformId = "lifecycle-in-a-loop";
String indexName = transformId + "-src";
Expand Down Expand Up @@ -652,4 +688,17 @@ private void indexMoreDocs(long timestamp, long userId, String index) throws Exc
bulkBuilder.append("\r\n");
doBulk(bulkBuilder.toString(), true);
}

private void indexDoc(long userId, String index) throws Exception {
StringBuilder bulkBuilder = new StringBuilder();
bulkBuilder.append(format("""
{"create":{"_index":"%s"}}
""", index));
String source = format("""
{"user_id":"user_%s","count":%s,"business_id":"business_%s","stars":%s,"timestamp":%s}
""", userId, 1, 2, 5, Instant.now().toEpochMilli());
bulkBuilder.append(source);
bulkBuilder.append("\r\n");
doBulk(bulkBuilder.toString(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ static TransformStats deriveStats(TransformTask transformTask, @Nullable Transfo
&& derivedState.equals(TransformStats.State.FAILED) == false) {
derivedState = TransformStats.State.STOPPING;
reason = Strings.isNullOrEmpty(reason) ? "transform is set to stop at the next checkpoint" : reason;
} else if (derivedState.equals(TransformStats.State.STARTED) && transformTask.getContext().isWaitingForIndexToUnblock()) {
derivedState = TransformStats.State.WAITING;
reason = Strings.isNullOrEmpty(reason) ? "transform is paused while destination index is blocked" : reason;
}
return new TransformStats(
transformTask.getTransformId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ protected void taskOperation(
TransformTask transformTask,
ActionListener<Response> listener
) {
if (transformTask.getContext().isWaitingForIndexToUnblock()) {
logger.debug("[{}] Destination index is blocked. User requested a retry.", transformTask.getTransformId());
transformTask.getContext().setIsWaitingForIndexToUnblock(false);
}
transformScheduler.scheduleNow(request.getId());
listener.onResponse(Response.TRUE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.elasticsearch.action.search.TransportClosePointInTimeAction;
import org.elasticsearch.action.search.TransportOpenPointInTimeAction;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -443,9 +445,36 @@ public boolean maybeTriggerAsyncJob(long now) {
logger.debug("[{}] schedule was triggered but the Transform is upgrading. Ignoring trigger.", getJobId());
return false;
}
if (context.isWaitingForIndexToUnblock()) {
if (destinationIndexHasWriteBlock()) {
logger.debug("[{}] schedule was triggered but the destination index has a write block. Ignoring trigger.", getJobId());
return false;
}
logger.debug("[{}] destination index is no longer blocked.", getJobId());
context.setIsWaitingForIndexToUnblock(false);
}

return super.maybeTriggerAsyncJob(now);
}

private boolean destinationIndexHasWriteBlock() {
var clusterState = clusterService.state();
if (clusterState == null) {
// if we can't determine if the index is blocked, we assume it isn't, even though the bulk request may fail again
return false;
}

var destinationIndexName = transformConfig.getDestination().getIndex();
var destinationIndex = indexNameExpressionResolver.concreteWriteIndex(
clusterState,
IndicesOptions.lenientExpandOpen(),
destinationIndexName,
true,
false
);
return destinationIndex != null && clusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, destinationIndex.getName());
}

@Override
protected void onStop() {
closePointInTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ public interface Listener {
private volatile AuthorizationState authState;
private volatile int pageSize = 0;

/**
* If the destination index is blocked (e.g. during a reindex), the Transform will fail to write to it.
* {@link TransformFailureHandler} will silence the error so the Transform automatically retries.
* Every time the Transform runs, it will check if the index is unblocked and reset this to false.
* Users can override this via the `_schedule_now` API.
*/
private volatile boolean isWaitingForIndexToUnblock = false;

// the checkpoint of this transform, storing the checkpoint until data indexing from source to dest is _complete_
// Note: Each indexer run creates a new future checkpoint which becomes the current checkpoint only after the indexer run finished
private final AtomicLong currentCheckpoint;
Expand Down Expand Up @@ -183,6 +191,14 @@ public void setShouldRecreateDestinationIndex(boolean shouldRecreateDestinationI
this.shouldRecreateDestinationIndex = shouldRecreateDestinationIndex;
}

public boolean isWaitingForIndexToUnblock() {
return isWaitingForIndexToUnblock;
}

public void setIsWaitingForIndexToUnblock(boolean isWaitingForIndexToUnblock) {
this.isWaitingForIndexToUnblock = isWaitingForIndexToUnblock;
}

public AuthorizationState getAuthState() {
return authState;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ private void handleScriptException(ScriptException scriptException, boolean unat
*/
private void handleBulkIndexingException(BulkIndexingException bulkIndexingException, boolean unattended, int numFailureRetries) {
if (bulkIndexingException.getCause() instanceof ClusterBlockException) {
context.setIsWaitingForIndexToUnblock(true);
retryWithoutIncrementingFailureCount(
bulkIndexingException,
bulkIndexingException.getDetailedMessage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,46 @@ public void testDeriveStats() {
);
}

public void testDeriveStatsWithIndexBlock() {
String transformId = "transform-with-stats";
String reason = "transform is paused while destination index is blocked";
TransformIndexerStats stats = TransformIndexerStatsTests.randomStats();
TransformState runningState = new TransformState(
TransformTaskState.STARTED,
IndexerState.STARTED,
null,
0,
null,
null,
null,
false,
null
);

var context = new TransformContext(TransformTaskState.STARTED, "", 0, mock());
context.setIsWaitingForIndexToUnblock(true);
var task = mock(TransformTask.class);
when(task.getContext()).thenReturn(context);
when(task.getTransformId()).thenReturn(transformId);
when(task.getState()).thenReturn(runningState);
when(task.getStats()).thenReturn(stats);

assertThat(
TransportGetTransformStatsAction.deriveStats(task, null),
equalTo(
new TransformStats(
transformId,
TransformStats.State.WAITING,
reason,
null,
stats,
TransformCheckpointingInfo.EMPTY,
TransformHealth.GREEN
)
)
);
}

private void withIdStateAndStats(String transformId, TransformState state, TransformIndexerStats stats) {
when(task.getTransformId()).thenReturn(transformId);
when(task.getState()).thenReturn(state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.SearchHit;
Expand All @@ -44,6 +49,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.TransformMetadata;
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
Expand All @@ -53,6 +59,7 @@
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.elasticsearch.xpack.transform.TransformExtension;
import org.elasticsearch.xpack.transform.TransformNode;
Expand All @@ -77,6 +84,10 @@
import java.util.function.Consumer;
import java.util.stream.IntStream;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -467,6 +478,53 @@ public void testHandlePitIndexNotFound() throws InterruptedException {
}
}

public void testIndexBlocked() {
var service = serviceWithBlockCheck(true);
var context = new TransformContext(TransformTaskState.STARTED, "", 0, mock());

var indexer = createTestIndexer(mock(), service, resolver(), context);
context.setIsWaitingForIndexToUnblock(true);

assertFalse(indexer.maybeTriggerAsyncJob(Instant.now().toEpochMilli()));
assertTrue(context.isWaitingForIndexToUnblock());
}

public void testIndexUnblocked() {
var service = serviceWithBlockCheck(false);
// set state to failed so that TransformIndexer returns false
var context = new TransformContext(TransformTaskState.FAILED, "", 0, mock());

var indexer = createTestIndexer(mock(), service, resolver(), context);
context.setIsWaitingForIndexToUnblock(true);

assertFalse(indexer.maybeTriggerAsyncJob(Instant.now().toEpochMilli()));
// ClientTransformIndexer's maybeTriggerAsyncJob should reset isWaitingForIndexToUnblock to false
assertFalse(context.isWaitingForIndexToUnblock());
}

private ClusterService serviceWithBlockCheck(boolean checkResponse) {
var clusterBlocks = mock(ClusterBlocks.class);
when(clusterBlocks.indexBlocked(eq(ClusterBlockLevel.WRITE), anyString())).thenReturn(checkResponse);
var metadata = mock(Metadata.class);
when(metadata.custom(eq(TransformMetadata.TYPE))).thenReturn(TransformMetadata.EMPTY_METADATA);
var clusterState = mock(ClusterState.class);
when(clusterState.blocks()).thenReturn(clusterBlocks);
when(clusterState.getMetadata()).thenReturn(metadata);
var clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(clusterState);
return clusterService;
}

private IndexNameExpressionResolver resolver() {
var resolver = mock(IndexNameExpressionResolver.class);
when(resolver.concreteWriteIndex(any(), any(), any(), anyBoolean(), anyBoolean())).thenAnswer(ans -> {
Index destIndex = mock();
when(destIndex.getName()).thenReturn(ans.getArgument(2));
return destIndex;
});
return resolver;
}

private static class MockClientTransformIndexer extends ClientTransformIndexer {

MockClientTransformIndexer(
Expand Down Expand Up @@ -627,13 +685,22 @@ private ClientTransformIndexer createTestIndexer() {
}

private ClientTransformIndexer createTestIndexer(ParentTaskAssigningClient client) {
return createTestIndexer(client, mock(), mock(), mock(TransformContext.class));
}

private ClientTransformIndexer createTestIndexer(
ParentTaskAssigningClient client,
ClusterService service,
IndexNameExpressionResolver resolver,
TransformContext context
) {
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class));

return new ClientTransformIndexer(
mock(ThreadPool.class),
mock(ClusterService.class),
mock(IndexNameExpressionResolver.class),
service,
resolver,
mock(TransformExtension.class),
new TransformServices(
mock(IndexBasedTransformConfigManager.class),
Expand All @@ -652,7 +719,7 @@ private ClientTransformIndexer createTestIndexer(ParentTaskAssigningClient clien
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 0L, Collections.emptyMap(), Instant.now().toEpochMilli()),
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 2L, Collections.emptyMap(), Instant.now().toEpochMilli()),
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
mock(TransformContext.class),
context,
false
);
}
Expand Down
Loading