Skip to content

Commit

Permalink
Implement framework for migrating system indices (elastic#78951)
Browse files Browse the repository at this point in the history
This PR adds a framework for migrating system indices as necessary prior
to Elasticsearch upgrades. This framework uses REST APIs added in
another commit:
- GET _migration/system_features

This API, which gets the status of "features" (plugins which own system
indices) with regards to whether they need to be upgraded or not. As of
this PR, this API also reports errors encountered while migrating system
indices alongside the index that was being processed when this occurred.

As an example of this error reporting:

```json
{
    "feature_name": "logstash_management",
    "minimum_index_version": "8.0.0",
    "upgrade_status": "ERROR",
    "indices": [
        {
            "index": ".logstash",
            "version": "8.0.0",
            "failure_cause": {
                "error": {
                    "root_cause": [
                        {
                            "type": "runtime_exception",
                            "reason": "whoopsie",
                            "stack_trace": "<omitted for brevity>"
                        }
                    ],
                    "type": "runtime_exception",
                    "reason": "whoopsie",
                    "stack_trace": "<omitted for brevity>"
                }
            }
        }
    ]
}
```

- POST _migration/system_features

This API starts the migration process. The API for this has no changes,
but when called, any system indices which need to be migrated will be
migrated, with status information stored in the cluster state for later
use by the GET _migration/system_features API.
  • Loading branch information
gwbrown authored Oct 20, 2021
1 parent 008c55b commit 1a90689
Show file tree
Hide file tree
Showing 28 changed files with 3,079 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

public class MigrationIT extends ESRestHighLevelClientTestCase {

Expand Down Expand Up @@ -60,11 +60,8 @@ public void testGetFeatureUpgradeStatus() throws IOException {
public void testPostFeatureUpgradeStatus() throws IOException {
PostFeatureUpgradeRequest request = new PostFeatureUpgradeRequest();
PostFeatureUpgradeResponse response = highLevelClient().migration().postFeatureUpgrade(request, RequestOptions.DEFAULT);
assertThat(response.isAccepted(), equalTo(true));
assertThat(response.getFeatures().size(), equalTo(1));
PostFeatureUpgradeResponse.Feature feature = response.getFeatures().get(0);
assertThat(feature.getFeatureName(), equalTo("security"));
assertThat(response.getReason(), nullValue());
assertThat(response.getElasticsearchException(), nullValue());
assertThat(response.isAccepted(), equalTo(false));
assertThat(response.getFeatures(), hasSize(0));
assertThat(response.getReason(), equalTo("No system indices require migration"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,14 @@ protected org.elasticsearch.action.admin.cluster.migration.GetFeatureUpgradeStat
randomAlphaOfLengthBetween(3, 20),
randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()),
randomFrom(org.elasticsearch.action.admin.cluster.migration.GetFeatureUpgradeStatusResponse.UpgradeStatus.values()),
randomList(4,
() -> new org.elasticsearch.action.admin.cluster.migration.GetFeatureUpgradeStatusResponse.IndexVersion(
randomList(
4,
() -> new org.elasticsearch.action.admin.cluster.migration.GetFeatureUpgradeStatusResponse.IndexInfo(
randomAlphaOfLengthBetween(3, 20),
randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion())))
randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()),
null
)
)
)),
randomFrom(org.elasticsearch.action.admin.cluster.migration.GetFeatureUpgradeStatusResponse.UpgradeStatus.values())
);
Expand Down Expand Up @@ -78,12 +82,12 @@ protected void assertInstances(
assertThat(clientStatus.getIndexVersions(), hasSize(serverTestStatus.getIndexVersions().size()));

for (int j = 0; i < clientStatus.getIndexVersions().size(); i++) {
org.elasticsearch.action.admin.cluster.migration.GetFeatureUpgradeStatusResponse.IndexVersion serverIndexVersion
org.elasticsearch.action.admin.cluster.migration.GetFeatureUpgradeStatusResponse.IndexInfo serverIndexInfo
= serverTestStatus.getIndexVersions().get(j);
GetFeatureUpgradeStatusResponse.IndexVersion clientIndexVersion = clientStatus.getIndexVersions().get(j);

assertThat(clientIndexVersion.getIndexName(), equalTo(serverIndexVersion.getIndexName()));
assertThat(clientIndexVersion.getVersion(), equalTo(serverIndexVersion.getVersion().toString()));
assertThat(clientIndexVersion.getIndexName(), equalTo(serverIndexInfo.getIndexName()));
assertThat(clientIndexVersion.getVersion(), equalTo(serverIndexInfo.getVersion().toString()));
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion docs/reference/migration/apis/feature_upgrade.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ and to trigger an automated system upgrade that might potentially involve downti
==== {api-prereq-title}

* If the {es} {security-features} are enabled, you must have the `manage`
<<privileges-list-cluster,cluster privilege>> to use this API. (TODO: true?)
<<privileges-list-cluster,cluster privilege>> to use this API.

[[feature-upgrade-api-example]]
==== {api-examples-title}
Expand Down Expand Up @@ -144,6 +144,7 @@ Example response:
]
}
--------------------------------------------------
// TESTRESPONSE[skip: can't actually upgrade system indices in these tests]

This tells us that the security index is being upgraded. To check the
overall status of the upgrade, call the endpoint with GET.
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.migration;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.migration.GetFeatureUpgradeStatusAction;
import org.elasticsearch.action.admin.cluster.migration.GetFeatureUpgradeStatusRequest;
import org.elasticsearch.action.admin.cluster.migration.GetFeatureUpgradeStatusResponse;
import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeAction;
import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeRequest;
import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.upgrades.FeatureMigrationResults;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class MultiFeatureMigrationIT extends FeatureMigrationIT {

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings)).build();
}

@Override
protected boolean forbidPrivateIndexSettings() {
// We need to be able to set the index creation version manually.
return false;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(FeatureMigrationIT.TestPlugin.class);
plugins.add(SecondPlugin.class);
plugins.add(ReindexPlugin.class);
return plugins;
}

// Sorts alphabetically after the feature from MultiFeatureMigrationIT
private static final String SECOND_FEATURE_NAME = "B-test-feature";
private static final String ORIGIN = MultiFeatureMigrationIT.class.getSimpleName();
private static final String VERSION_META_KEY = "version";
static final int SECOND_FEATURE_IDX_FLAG_VALUE = 0;

public void testMultipleFeatureMigration() throws Exception {
// All the indices from FeatureMigrationIT
createSystemIndexForDescriptor(INTERNAL_MANAGED);
createSystemIndexForDescriptor(INTERNAL_UNMANAGED);
createSystemIndexForDescriptor(EXTERNAL_MANAGED);
createSystemIndexForDescriptor(EXTERNAL_UNMANAGED);
// And our new one
createSystemIndexForDescriptor(SECOND_FEATURE_IDX_DESCIPTOR);

ensureGreen();

SetOnce<Boolean> preMigrationHookCalled = new SetOnce<>();
SetOnce<Boolean> postMigrationHookCalled = new SetOnce<>();
SetOnce<Boolean> secondPluginPreMigrationHookCalled = new SetOnce<>();
SetOnce<Boolean> secondPluginPostMigrationHookCalled = new SetOnce<>();

TestPlugin.preMigrationHook.set(clusterState -> {
// None of the other hooks should have been called yet.
assertThat(postMigrationHookCalled.get(), nullValue());
assertThat(secondPluginPreMigrationHookCalled.get(), nullValue());
assertThat(secondPluginPostMigrationHookCalled.get(), nullValue());
Map<String, Object> metadata = new HashMap<>();
metadata.put("stringKey", "first plugin value");

// We shouldn't have any results in the cluster state given no features have finished yet.
FeatureMigrationResults currentResults = clusterState.metadata().custom(FeatureMigrationResults.TYPE);
assertThat(currentResults, nullValue());

preMigrationHookCalled.set(true);
return metadata;
});

TestPlugin.postMigrationHook.set((clusterState, metadata) -> {
// Check that the hooks have been called or not as expected.
assertThat(preMigrationHookCalled.get(), is(true));
assertThat(secondPluginPreMigrationHookCalled.get(), nullValue());
assertThat(secondPluginPostMigrationHookCalled.get(), nullValue());

assertThat(
metadata,
hasEntry("stringKey", "first plugin value")
);

// We shouldn't have any results in the cluster state given no features have finished yet.
FeatureMigrationResults currentResults = clusterState.metadata().custom(FeatureMigrationResults.TYPE);
assertThat(currentResults, nullValue());

postMigrationHookCalled.set(true);
});

SecondPlugin.preMigrationHook.set(clusterState -> {
// Check that the hooks have been called or not as expected.
assertThat(preMigrationHookCalled.get(), is(true));
assertThat(postMigrationHookCalled.get(), is(true));
assertThat(secondPluginPostMigrationHookCalled.get(), nullValue());

Map<String, Object> metadata = new HashMap<>();
metadata.put("stringKey", "second plugin value");

// But now, we should have results, as we're in a new feature!
FeatureMigrationResults currentResults = clusterState.metadata().custom(FeatureMigrationResults.TYPE);
assertThat(currentResults, notNullValue());
assertThat(currentResults.getFeatureStatuses(), allOf(aMapWithSize(1), hasKey(FEATURE_NAME)));
assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).succeeded(), is(true));
assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).getFailedIndexName(), nullValue());
assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).getException(), nullValue());

secondPluginPreMigrationHookCalled.set(true);
return metadata;
});

SecondPlugin.postMigrationHook.set((clusterState, metadata) -> {
// Check that the hooks have been called or not as expected.
assertThat(preMigrationHookCalled.get(), is(true));
assertThat(postMigrationHookCalled.get(), is(true));
assertThat(secondPluginPreMigrationHookCalled.get(), is(true));

assertThat(
metadata,
hasEntry("stringKey", "second plugin value")
);

// And here, the results should be the same, as we haven't updated the state with this feature's status yet.
FeatureMigrationResults currentResults = clusterState.metadata().custom(FeatureMigrationResults.TYPE);
assertThat(currentResults, notNullValue());
assertThat(currentResults.getFeatureStatuses(), allOf(aMapWithSize(1), hasKey(FEATURE_NAME)));
assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).succeeded(), is(true));
assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).getFailedIndexName(), nullValue());
assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).getException(), nullValue());

secondPluginPostMigrationHookCalled.set(true);
});

PostFeatureUpgradeRequest migrationRequest = new PostFeatureUpgradeRequest();
PostFeatureUpgradeResponse migrationResponse = client().execute(PostFeatureUpgradeAction.INSTANCE, migrationRequest).get();
assertThat(migrationResponse.getReason(), nullValue());
assertThat(migrationResponse.getElasticsearchException(), nullValue());
final Set<String> migratingFeatures = migrationResponse.getFeatures()
.stream()
.map(PostFeatureUpgradeResponse.Feature::getFeatureName)
.collect(Collectors.toSet());
assertThat(migratingFeatures, hasItems(FEATURE_NAME, SECOND_FEATURE_NAME));

GetFeatureUpgradeStatusRequest getStatusRequest = new GetFeatureUpgradeStatusRequest();
assertBusy(() -> {
GetFeatureUpgradeStatusResponse statusResponse = client().execute(GetFeatureUpgradeStatusAction.INSTANCE, getStatusRequest)
.get();
logger.info(Strings.toString(statusResponse));
assertThat(statusResponse.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.NO_MIGRATION_NEEDED));
});

assertTrue("the first plugin's pre-migration hook wasn't actually called", preMigrationHookCalled.get());
assertTrue("the first plugin's post-migration hook wasn't actually called", postMigrationHookCalled.get());

assertTrue("the second plugin's pre-migration hook wasn't actually called", secondPluginPreMigrationHookCalled.get());
assertTrue("the second plugin's post-migration hook wasn't actually called", secondPluginPostMigrationHookCalled.get());

Metadata finalMetadata = client().admin().cluster().prepareState().get().getState().metadata();
// Check that the results metadata is what we expect
FeatureMigrationResults currentResults = finalMetadata.custom(FeatureMigrationResults.TYPE);
assertThat(currentResults, notNullValue());
assertThat(currentResults.getFeatureStatuses(), allOf(aMapWithSize(2), hasKey(FEATURE_NAME), hasKey(SECOND_FEATURE_NAME)));
assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).succeeded(), is(true));
assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).getFailedIndexName(), nullValue());
assertThat(currentResults.getFeatureStatuses().get(FEATURE_NAME).getException(), nullValue());
assertThat(currentResults.getFeatureStatuses().get(SECOND_FEATURE_NAME).succeeded(), is(true));
assertThat(currentResults.getFeatureStatuses().get(SECOND_FEATURE_NAME).getFailedIndexName(), nullValue());
assertThat(currentResults.getFeatureStatuses().get(SECOND_FEATURE_NAME).getException(), nullValue());

// Finally, verify that all the indices exist and have the properties we expect.
assertIndexHasCorrectProperties(
finalMetadata,
".int-man-old-reindexed-for-8",
INTERNAL_MANAGED_FLAG_VALUE,
true,
true,
Arrays.asList(".int-man-old", ".internal-managed-alias")
);
assertIndexHasCorrectProperties(
finalMetadata,
".int-unman-old-reindexed-for-8",
INTERNAL_UNMANAGED_FLAG_VALUE,
false,
true,
Collections.singletonList(".int-unman-old")
);
assertIndexHasCorrectProperties(
finalMetadata,
".ext-man-old-reindexed-for-8",
EXTERNAL_MANAGED_FLAG_VALUE,
true,
false,
Arrays.asList(".ext-man-old", ".external-managed-alias")
);
assertIndexHasCorrectProperties(
finalMetadata,
".ext-unman-old-reindexed-for-8",
EXTERNAL_UNMANAGED_FLAG_VALUE,
false,
false,
Collections.singletonList(".ext-unman-old")
);

assertIndexHasCorrectProperties(
finalMetadata,
".second-int-man-old-reindexed-for-8",
SECOND_FEATURE_IDX_FLAG_VALUE,
true,
true,
Arrays.asList(".second-int-man-old", ".second-internal-managed-alias")
);
}

private static final SystemIndexDescriptor SECOND_FEATURE_IDX_DESCIPTOR = SystemIndexDescriptor.builder()
.setIndexPattern(".second-int-man-*")
.setAliasName(".second-internal-managed-alias")
.setPrimaryIndex(".second-int-man-old")
.setType(SystemIndexDescriptor.Type.INTERNAL_MANAGED)
.setSettings(createSimpleSettings(Version.V_7_0_0, 0))
.setMappings(createSimpleMapping(true, true))
.setOrigin(ORIGIN)
.setVersionMetaKey(VERSION_META_KEY)
.setAllowedElasticProductOrigins(Collections.emptyList())
.setMinimumNodeVersion(Version.V_7_0_0)
.setPriorSystemIndexDescriptors(Collections.emptyList())
.build();

public static class SecondPlugin extends Plugin implements SystemIndexPlugin {

private static final AtomicReference<Function<ClusterState, Map<String, Object>>> preMigrationHook = new AtomicReference<>();
private static final AtomicReference<BiConsumer<ClusterState, Map<String, Object>>> postMigrationHook = new AtomicReference<>();

public SecondPlugin() {

}

@Override public String getFeatureName() {
return SECOND_FEATURE_NAME;
}

@Override public String getFeatureDescription() {
return "a plugin for test system index migration with multiple features";
}

@Override public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return Collections.singletonList(SECOND_FEATURE_IDX_DESCIPTOR);
}

@Override public void prepareForIndicesMigration(
ClusterService clusterService, Client client, ActionListener<Map<String, Object>> listener) {
listener.onResponse(preMigrationHook.get().apply(clusterService.state()));
}

@Override public void indicesMigrationComplete(
Map<String, Object> preUpgradeMetadata, ClusterService clusterService, Client client, ActionListener<Boolean> listener) {
postMigrationHook.get().accept(clusterService.state(), preUpgradeMetadata);
listener.onResponse(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void testGetFeatureUpgradeStatus() throws Exception {

assertThat(feature.size(), equalTo(4));
assertThat(feature.get("minimum_index_version"), equalTo(UPGRADE_FROM_VERSION.toString()));
if (UPGRADE_FROM_VERSION.before(Version.CURRENT.minimumIndexCompatibilityVersion())) {
if (UPGRADE_FROM_VERSION.before(Version.V_8_0_0)) {
assertThat(feature.get("migration_status"), equalTo("MIGRATION_NEEDED"));
} else {
assertThat(feature.get("migration_status"), equalTo("NO_MIGRATION_NEEDED"));
Expand Down
Loading

0 comments on commit 1a90689

Please sign in to comment.