From 775663e72b077c0263f1f4f101f7598bde4ba8e7 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 5 Jan 2025 22:24:06 -0800 Subject: [PATCH] Fix realtime get nested fields with synthetic source --- muted-tests.yml | 3 - .../org/elasticsearch/get/GetActionIT.java | 98 ++++++++++++++++ .../index/engine/InternalEngine.java | 2 +- .../index/engine/TranslogDirectoryReader.java | 109 +++++++++++++++--- .../engine/TranslogOperationAsserter.java | 6 +- 5 files changed, 195 insertions(+), 23 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index fbeb35674701e..1d29a17797a11 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -265,9 +265,6 @@ tests: - class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT method: testMountSearchableSnapshot {p0=[9.0.0, 9.0.0, 8.18.0]} issue: https://github.com/elastic/elasticsearch/issues/119551 -- class: org.elasticsearch.index.engine.LuceneSyntheticSourceChangesSnapshotTests - method: testSkipNonRootOfNestedDocuments - issue: https://github.com/elastic/elasticsearch/issues/119553 - class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT method: testSearchableSnapshotUpgrade {p0=[9.0.0, 9.0.0, 8.18.0]} issue: https://github.com/elastic/elasticsearch/issues/119560 diff --git a/server/src/internalClusterTest/java/org/elasticsearch/get/GetActionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/get/GetActionIT.java index 6c7754932af68..f9595bfc5aff2 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/get/GetActionIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/get/GetActionIT.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; @@ -35,6 +36,7 @@ import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; @@ -932,6 +934,102 @@ public void testGetRemoteIndex() { ); } + public void testRealTimeGetNestedFields() { + String index = "test"; + SourceFieldMapper.Mode sourceMode = randomFrom(SourceFieldMapper.Mode.values()); + assertAcked( + prepareCreate(index).setMapping("title", "type=keyword", "author", "type=nested") + .setSettings( + indexSettings(1, 0).put("index.refresh_interval", -1) + .put(SourceFieldMapper.INDEX_MAPPER_SOURCE_MODE_SETTING.getKey(), sourceMode) + ) + ); + ensureGreen(); + String source0 = """ + { + "title": "t0", + "author": [ + { + "name": "a0" + } + ] + } + """; + prepareIndex(index).setRefreshPolicy(WriteRequest.RefreshPolicy.NONE).setId("0").setSource(source0, XContentType.JSON).get(); + // start tracking translog locations + assertTrue(client().prepareGet(index, "0").setRealtime(true).get().isExists()); + String source1 = """ + { + "title": ["t1"], + "author": [ + { + "name": "a1" + } + ] + } + """; + prepareIndex(index).setRefreshPolicy(WriteRequest.RefreshPolicy.NONE).setId("1").setSource(source1, XContentType.JSON).get(); + String source2 = """ + { + "title": ["t1", "t2"], + "author": [ + { + "name": "a1" + }, + { + "name": "a2" + } + ] + } + """; + prepareIndex(index).setRefreshPolicy(WriteRequest.RefreshPolicy.NONE).setId("2").setSource(source2, XContentType.JSON).get(); + String source3 = """ + { + "title": ["t1", "t3", "t2"] + } + """; + prepareIndex(index).setRefreshPolicy(WriteRequest.RefreshPolicy.NONE).setId("3").setSource(source3, XContentType.JSON).get(); + GetResponse translog1 = client().prepareGet(index, "1").setRealtime(true).get(); + GetResponse translog2 = client().prepareGet(index, "2").setRealtime(true).get(); + GetResponse translog3 = client().prepareGet(index, "3").setRealtime(true).get(); + assertTrue(translog1.isExists()); + assertTrue(translog2.isExists()); + assertTrue(translog3.isExists()); + switch (sourceMode) { + case STORED -> { + assertThat(translog1.getSourceAsBytesRef().utf8ToString(), equalTo(source1)); + assertThat(translog2.getSourceAsBytesRef().utf8ToString(), equalTo(source2)); + assertThat(translog3.getSourceAsBytesRef().utf8ToString(), equalTo(source3)); + } + case SYNTHETIC -> { + assertThat(translog1.getSourceAsBytesRef().utf8ToString(), equalTo(""" + {"author":{"name":"a1"},"title":"t1"}""")); + assertThat(translog2.getSourceAsBytesRef().utf8ToString(), equalTo(""" + {"author":[{"name":"a1"},{"name":"a2"}],"title":["t1","t2"]}""")); + assertThat(translog3.getSourceAsBytesRef().utf8ToString(), equalTo(""" + {"title":["t1","t2","t3"]}""")); + } + case DISABLED -> { + assertNull(translog1.getSourceAsBytesRef()); + assertNull(translog2.getSourceAsBytesRef()); + assertNull(translog3.getSourceAsBytesRef()); + } + } + assertFalse(client().prepareGet(index, "1").setRealtime(false).get().isExists()); + assertFalse(client().prepareGet(index, "2").setRealtime(false).get().isExists()); + assertFalse(client().prepareGet(index, "3").setRealtime(false).get().isExists()); + refresh(index); + GetResponse lucene1 = client().prepareGet(index, "1").setRealtime(randomBoolean()).get(); + GetResponse lucene2 = client().prepareGet(index, "2").setRealtime(randomBoolean()).get(); + GetResponse lucene3 = client().prepareGet(index, "3").setRealtime(randomBoolean()).get(); + assertTrue(lucene1.isExists()); + assertTrue(lucene2.isExists()); + assertTrue(lucene3.isExists()); + assertThat(translog1.getSourceAsBytesRef(), equalTo(lucene1.getSourceAsBytesRef())); + assertThat(translog2.getSourceAsBytesRef(), equalTo(lucene2.getSourceAsBytesRef())); + assertThat(translog3.getSourceAsBytesRef(), equalTo(lucene3.getSourceAsBytesRef())); + } + private void assertGetFieldsAlwaysWorks(String index, String docId, String[] fields) { assertGetFieldsAlwaysWorks(index, docId, fields, null); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 8d3d1bde316ea..cca3df168b7f8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -819,7 +819,7 @@ private GetResult getFromTranslog( ) throws IOException { assert get.isReadFromTranslog(); translogGetCount.incrementAndGet(); - final TranslogDirectoryReader inMemoryReader = new TranslogDirectoryReader( + final DirectoryReader inMemoryReader = TranslogDirectoryReader.create( shardId, index, mappingLookup, diff --git a/server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java b/server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java index 0928b4500e6da..72ef5c6497bb6 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java +++ b/server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java @@ -9,6 +9,7 @@ package org.elasticsearch.index.engine; +import org.apache.lucene.codecs.StoredFieldsReader; import org.apache.lucene.index.BaseTermsEnum; import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.index.ByteVectorValues; @@ -45,9 +46,12 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; +import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.core.IOUtils; import org.elasticsearch.index.fieldvisitor.FieldNamesProvidingStoredFieldsVisitor; import org.elasticsearch.index.mapper.DocumentParser; import org.elasticsearch.index.mapper.IdFieldMapper; @@ -76,9 +80,9 @@ * into an in-memory Lucene segment that is created on-demand. */ final class TranslogDirectoryReader extends DirectoryReader { - private final TranslogLeafReader leafReader; + private final LeafReader leafReader; - TranslogDirectoryReader( + static DirectoryReader create( ShardId shardId, Translog.Index operation, MappingLookup mappingLookup, @@ -86,11 +90,38 @@ final class TranslogDirectoryReader extends DirectoryReader { EngineConfig engineConfig, Runnable onSegmentCreated ) throws IOException { - this(new TranslogLeafReader(shardId, operation, mappingLookup, documentParser, engineConfig, onSegmentCreated)); + final Directory directory = new ByteBuffersDirectory(); + boolean success = false; + try { + final LeafReader leafReader; + // When using synthetic source, the translog operation must always be reindexed into an in-memory Lucene to ensure consistent + // output for realtime-get operations. However, this can degrade the performance of realtime-get and update operations. + // If slight inconsistencies in realtime-get operations are acceptable, the translog operation can be reindexed lazily. + if (mappingLookup.isSourceSynthetic()) { + leafReader = createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, false, operation); + } else { + leafReader = new TranslogLeafReader( + shardId, + operation, + mappingLookup, + documentParser, + engineConfig, + directory, + onSegmentCreated + ); + } + var directoryReader = ElasticsearchDirectoryReader.wrap(new TranslogDirectoryReader(directory, leafReader), shardId); + success = true; + return directoryReader; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(directory); + } + } } - private TranslogDirectoryReader(TranslogLeafReader leafReader) throws IOException { - super(leafReader.directory, new LeafReader[] { leafReader }, null); + private TranslogDirectoryReader(Directory directory, LeafReader leafReader) throws IOException { + super(directory, new LeafReader[] { leafReader }, null); this.leafReader = leafReader; } @@ -139,12 +170,13 @@ public CacheHelper getReaderCacheHelper() { return leafReader.getReaderCacheHelper(); } - static DirectoryReader createInMemoryReader( + private static LeafReader createInMemoryReader( ShardId shardId, EngineConfig engineConfig, Directory directory, DocumentParser documentParser, MappingLookup mappingLookup, + boolean rootDocOnly, Translog.Index operation ) { final ParsedDocument parsedDocs = documentParser.parseDocument( @@ -159,12 +191,21 @@ static DirectoryReader createInMemoryReader( IndexWriterConfig.OpenMode.CREATE ).setCodec(engineConfig.getCodec()); try (IndexWriter writer = new IndexWriter(directory, writeConfig)) { - writer.addDocument(parsedDocs.rootDoc()); + final int numDocs; + if (rootDocOnly) { + numDocs = 1; + writer.addDocument(parsedDocs.rootDoc()); + } else { + numDocs = parsedDocs.docs().size(); + writer.addDocuments(parsedDocs.docs()); + } final DirectoryReader reader = open(writer); - if (reader.leaves().size() != 1 || reader.leaves().get(0).reader().numDocs() != 1) { + if (reader.leaves().size() != 1 || reader.leaves().get(0).reader().numDocs() != numDocs) { reader.close(); throw new IllegalStateException( - "Expected a single document segment; " + "Expected a single segment with " + + numDocs + + " documents, " + "but [" + reader.leaves().size() + " segments with " @@ -172,7 +213,33 @@ static DirectoryReader createInMemoryReader( + " documents" ); } - return reader; + LeafReader leafReader = reader.leaves().get(0).reader(); + return new SequentialStoredFieldsLeafReader(leafReader) { + @Override + protected void doClose() throws IOException { + IOUtils.close(super::doClose, directory); + } + + @Override + public CacheHelper getCoreCacheHelper() { + return leafReader.getCoreCacheHelper(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return leafReader.getReaderCacheHelper(); + } + + @Override + public StoredFieldsReader getSequentialStoredFieldsReader() { + return Lucene.segmentReader(leafReader).getFieldsReader().getMergeInstance(); + } + + @Override + protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader) { + return reader; + } + }; } catch (IOException e) { throw new EngineException(shardId, "failed to create an in-memory segment for get [" + operation.id() + "]", e); } @@ -259,6 +326,7 @@ private static class TranslogLeafReader extends LeafReader { MappingLookup mappingLookup, DocumentParser documentParser, EngineConfig engineConfig, + Directory directory, Runnable onSegmentCreated ) { this.shardId = shardId; @@ -267,7 +335,7 @@ private static class TranslogLeafReader extends LeafReader { this.documentParser = documentParser; this.engineConfig = engineConfig; this.onSegmentCreated = onSegmentCreated; - this.directory = new ByteBuffersDirectory(); + this.directory = directory; this.uid = Uid.encodeId(operation.id()); } @@ -279,7 +347,15 @@ private LeafReader getDelegate() { ensureOpen(); reader = delegate.get(); if (reader == null) { - var indexReader = createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, operation); + var indexReader = createInMemoryReader( + shardId, + engineConfig, + directory, + documentParser, + mappingLookup, + true, + operation + ); reader = indexReader.leaves().get(0).reader(); final LeafReader existing = delegate.getAndSet(reader); assert existing == null; @@ -464,7 +540,12 @@ private void readStoredFieldsDirectly(StoredFieldVisitor visitor) throws IOExcep @Override protected synchronized void doClose() throws IOException { - IOUtils.close(delegate.get(), directory); + final LeafReader leaf = delegate.get(); + if (leaf != null) { + leaf.close(); + } else { + directory.close(); + } } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/TranslogOperationAsserter.java b/server/src/main/java/org/elasticsearch/index/engine/TranslogOperationAsserter.java index 00de13b8e8d8e..1aa79c3b3e176 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/TranslogOperationAsserter.java +++ b/server/src/main/java/org/elasticsearch/index/engine/TranslogOperationAsserter.java @@ -10,7 +10,6 @@ package org.elasticsearch.index.engine; import org.apache.lucene.search.similarities.BM25Similarity; -import org.apache.lucene.store.ByteBuffersDirectory; import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy; import org.elasticsearch.index.mapper.DocumentParser; import org.elasticsearch.index.mapper.MappingLookup; @@ -52,10 +51,7 @@ static Translog.Index synthesizeSource(EngineConfig engineConfig, Translog.Index final ShardId shardId = engineConfig.getShardId(); final MappingLookup mappingLookup = engineConfig.getMapperService().mappingLookup(); final DocumentParser documentParser = engineConfig.getMapperService().documentParser(); - try ( - var directory = new ByteBuffersDirectory(); - var reader = TranslogDirectoryReader.createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, op) - ) { + try (var reader = TranslogDirectoryReader.create(shardId, op, mappingLookup, documentParser, engineConfig, () -> {})) { final Engine.Searcher searcher = new Engine.Searcher( "assert_translog", reader,