Skip to content

Commit

Permalink
Fix realtime get nested fields with synthetic source
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Jan 6, 2025
1 parent c7b61bd commit 775663e
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 23 deletions.
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,6 @@ tests:
- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT
method: testMountSearchableSnapshot {p0=[9.0.0, 9.0.0, 8.18.0]}
issue: https://github.com/elastic/elasticsearch/issues/119551
- class: org.elasticsearch.index.engine.LuceneSyntheticSourceChangesSnapshotTests
method: testSkipNonRootOfNestedDocuments
issue: https://github.com/elastic/elasticsearch/issues/119553
- class: org.elasticsearch.lucene.RollingUpgradeSearchableSnapshotIndexCompatibilityIT
method: testSearchableSnapshotUpgrade {p0=[9.0.0, 9.0.0, 8.18.0]}
issue: https://github.com/elastic/elasticsearch/issues/119560
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
Expand All @@ -35,6 +36,7 @@
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
Expand Down Expand Up @@ -932,6 +934,102 @@ public void testGetRemoteIndex() {
);
}

public void testRealTimeGetNestedFields() {
String index = "test";
SourceFieldMapper.Mode sourceMode = randomFrom(SourceFieldMapper.Mode.values());
assertAcked(
prepareCreate(index).setMapping("title", "type=keyword", "author", "type=nested")
.setSettings(
indexSettings(1, 0).put("index.refresh_interval", -1)
.put(SourceFieldMapper.INDEX_MAPPER_SOURCE_MODE_SETTING.getKey(), sourceMode)
)
);
ensureGreen();
String source0 = """
{
"title": "t0",
"author": [
{
"name": "a0"
}
]
}
""";
prepareIndex(index).setRefreshPolicy(WriteRequest.RefreshPolicy.NONE).setId("0").setSource(source0, XContentType.JSON).get();
// start tracking translog locations
assertTrue(client().prepareGet(index, "0").setRealtime(true).get().isExists());
String source1 = """
{
"title": ["t1"],
"author": [
{
"name": "a1"
}
]
}
""";
prepareIndex(index).setRefreshPolicy(WriteRequest.RefreshPolicy.NONE).setId("1").setSource(source1, XContentType.JSON).get();
String source2 = """
{
"title": ["t1", "t2"],
"author": [
{
"name": "a1"
},
{
"name": "a2"
}
]
}
""";
prepareIndex(index).setRefreshPolicy(WriteRequest.RefreshPolicy.NONE).setId("2").setSource(source2, XContentType.JSON).get();
String source3 = """
{
"title": ["t1", "t3", "t2"]
}
""";
prepareIndex(index).setRefreshPolicy(WriteRequest.RefreshPolicy.NONE).setId("3").setSource(source3, XContentType.JSON).get();
GetResponse translog1 = client().prepareGet(index, "1").setRealtime(true).get();
GetResponse translog2 = client().prepareGet(index, "2").setRealtime(true).get();
GetResponse translog3 = client().prepareGet(index, "3").setRealtime(true).get();
assertTrue(translog1.isExists());
assertTrue(translog2.isExists());
assertTrue(translog3.isExists());
switch (sourceMode) {
case STORED -> {
assertThat(translog1.getSourceAsBytesRef().utf8ToString(), equalTo(source1));
assertThat(translog2.getSourceAsBytesRef().utf8ToString(), equalTo(source2));
assertThat(translog3.getSourceAsBytesRef().utf8ToString(), equalTo(source3));
}
case SYNTHETIC -> {
assertThat(translog1.getSourceAsBytesRef().utf8ToString(), equalTo("""
{"author":{"name":"a1"},"title":"t1"}"""));
assertThat(translog2.getSourceAsBytesRef().utf8ToString(), equalTo("""
{"author":[{"name":"a1"},{"name":"a2"}],"title":["t1","t2"]}"""));
assertThat(translog3.getSourceAsBytesRef().utf8ToString(), equalTo("""
{"title":["t1","t2","t3"]}"""));
}
case DISABLED -> {
assertNull(translog1.getSourceAsBytesRef());
assertNull(translog2.getSourceAsBytesRef());
assertNull(translog3.getSourceAsBytesRef());
}
}
assertFalse(client().prepareGet(index, "1").setRealtime(false).get().isExists());
assertFalse(client().prepareGet(index, "2").setRealtime(false).get().isExists());
assertFalse(client().prepareGet(index, "3").setRealtime(false).get().isExists());
refresh(index);
GetResponse lucene1 = client().prepareGet(index, "1").setRealtime(randomBoolean()).get();
GetResponse lucene2 = client().prepareGet(index, "2").setRealtime(randomBoolean()).get();
GetResponse lucene3 = client().prepareGet(index, "3").setRealtime(randomBoolean()).get();
assertTrue(lucene1.isExists());
assertTrue(lucene2.isExists());
assertTrue(lucene3.isExists());
assertThat(translog1.getSourceAsBytesRef(), equalTo(lucene1.getSourceAsBytesRef()));
assertThat(translog2.getSourceAsBytesRef(), equalTo(lucene2.getSourceAsBytesRef()));
assertThat(translog3.getSourceAsBytesRef(), equalTo(lucene3.getSourceAsBytesRef()));
}

private void assertGetFieldsAlwaysWorks(String index, String docId, String[] fields) {
assertGetFieldsAlwaysWorks(index, docId, fields, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ private GetResult getFromTranslog(
) throws IOException {
assert get.isReadFromTranslog();
translogGetCount.incrementAndGet();
final TranslogDirectoryReader inMemoryReader = new TranslogDirectoryReader(
final DirectoryReader inMemoryReader = TranslogDirectoryReader.create(
shardId,
index,
mappingLookup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.index.engine;

import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.index.BaseTermsEnum;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.ByteVectorValues;
Expand Down Expand Up @@ -45,9 +46,12 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.fieldvisitor.FieldNamesProvidingStoredFieldsVisitor;
import org.elasticsearch.index.mapper.DocumentParser;
import org.elasticsearch.index.mapper.IdFieldMapper;
Expand Down Expand Up @@ -76,21 +80,48 @@
* into an in-memory Lucene segment that is created on-demand.
*/
final class TranslogDirectoryReader extends DirectoryReader {
private final TranslogLeafReader leafReader;
private final LeafReader leafReader;

TranslogDirectoryReader(
static DirectoryReader create(
ShardId shardId,
Translog.Index operation,
MappingLookup mappingLookup,
DocumentParser documentParser,
EngineConfig engineConfig,
Runnable onSegmentCreated
) throws IOException {
this(new TranslogLeafReader(shardId, operation, mappingLookup, documentParser, engineConfig, onSegmentCreated));
final Directory directory = new ByteBuffersDirectory();
boolean success = false;
try {
final LeafReader leafReader;
// When using synthetic source, the translog operation must always be reindexed into an in-memory Lucene to ensure consistent
// output for realtime-get operations. However, this can degrade the performance of realtime-get and update operations.
// If slight inconsistencies in realtime-get operations are acceptable, the translog operation can be reindexed lazily.
if (mappingLookup.isSourceSynthetic()) {
leafReader = createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, false, operation);
} else {
leafReader = new TranslogLeafReader(
shardId,
operation,
mappingLookup,
documentParser,
engineConfig,
directory,
onSegmentCreated
);
}
var directoryReader = ElasticsearchDirectoryReader.wrap(new TranslogDirectoryReader(directory, leafReader), shardId);
success = true;
return directoryReader;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(directory);
}
}
}

private TranslogDirectoryReader(TranslogLeafReader leafReader) throws IOException {
super(leafReader.directory, new LeafReader[] { leafReader }, null);
private TranslogDirectoryReader(Directory directory, LeafReader leafReader) throws IOException {
super(directory, new LeafReader[] { leafReader }, null);
this.leafReader = leafReader;
}

Expand Down Expand Up @@ -139,12 +170,13 @@ public CacheHelper getReaderCacheHelper() {
return leafReader.getReaderCacheHelper();
}

static DirectoryReader createInMemoryReader(
private static LeafReader createInMemoryReader(
ShardId shardId,
EngineConfig engineConfig,
Directory directory,
DocumentParser documentParser,
MappingLookup mappingLookup,
boolean rootDocOnly,
Translog.Index operation
) {
final ParsedDocument parsedDocs = documentParser.parseDocument(
Expand All @@ -159,20 +191,55 @@ static DirectoryReader createInMemoryReader(
IndexWriterConfig.OpenMode.CREATE
).setCodec(engineConfig.getCodec());
try (IndexWriter writer = new IndexWriter(directory, writeConfig)) {
writer.addDocument(parsedDocs.rootDoc());
final int numDocs;
if (rootDocOnly) {
numDocs = 1;
writer.addDocument(parsedDocs.rootDoc());
} else {
numDocs = parsedDocs.docs().size();
writer.addDocuments(parsedDocs.docs());
}
final DirectoryReader reader = open(writer);
if (reader.leaves().size() != 1 || reader.leaves().get(0).reader().numDocs() != 1) {
if (reader.leaves().size() != 1 || reader.leaves().get(0).reader().numDocs() != numDocs) {
reader.close();
throw new IllegalStateException(
"Expected a single document segment; "
"Expected a single segment with "
+ numDocs
+ " documents, "
+ "but ["
+ reader.leaves().size()
+ " segments with "
+ reader.leaves().get(0).reader().numDocs()
+ " documents"
);
}
return reader;
LeafReader leafReader = reader.leaves().get(0).reader();
return new SequentialStoredFieldsLeafReader(leafReader) {
@Override
protected void doClose() throws IOException {
IOUtils.close(super::doClose, directory);
}

@Override
public CacheHelper getCoreCacheHelper() {
return leafReader.getCoreCacheHelper();
}

@Override
public CacheHelper getReaderCacheHelper() {
return leafReader.getReaderCacheHelper();
}

@Override
public StoredFieldsReader getSequentialStoredFieldsReader() {
return Lucene.segmentReader(leafReader).getFieldsReader().getMergeInstance();
}

@Override
protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader) {
return reader;
}
};
} catch (IOException e) {
throw new EngineException(shardId, "failed to create an in-memory segment for get [" + operation.id() + "]", e);
}
Expand Down Expand Up @@ -259,6 +326,7 @@ private static class TranslogLeafReader extends LeafReader {
MappingLookup mappingLookup,
DocumentParser documentParser,
EngineConfig engineConfig,
Directory directory,
Runnable onSegmentCreated
) {
this.shardId = shardId;
Expand All @@ -267,7 +335,7 @@ private static class TranslogLeafReader extends LeafReader {
this.documentParser = documentParser;
this.engineConfig = engineConfig;
this.onSegmentCreated = onSegmentCreated;
this.directory = new ByteBuffersDirectory();
this.directory = directory;
this.uid = Uid.encodeId(operation.id());
}

Expand All @@ -279,7 +347,15 @@ private LeafReader getDelegate() {
ensureOpen();
reader = delegate.get();
if (reader == null) {
var indexReader = createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, operation);
var indexReader = createInMemoryReader(
shardId,
engineConfig,
directory,
documentParser,
mappingLookup,
true,
operation
);
reader = indexReader.leaves().get(0).reader();
final LeafReader existing = delegate.getAndSet(reader);
assert existing == null;
Expand Down Expand Up @@ -464,7 +540,12 @@ private void readStoredFieldsDirectly(StoredFieldVisitor visitor) throws IOExcep

@Override
protected synchronized void doClose() throws IOException {
IOUtils.close(delegate.get(), directory);
final LeafReader leaf = delegate.get();
if (leaf != null) {
leaf.close();
} else {
directory.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
package org.elasticsearch.index.engine;

import org.apache.lucene.search.similarities.BM25Similarity;
import org.apache.lucene.store.ByteBuffersDirectory;
import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy;
import org.elasticsearch.index.mapper.DocumentParser;
import org.elasticsearch.index.mapper.MappingLookup;
Expand Down Expand Up @@ -52,10 +51,7 @@ static Translog.Index synthesizeSource(EngineConfig engineConfig, Translog.Index
final ShardId shardId = engineConfig.getShardId();
final MappingLookup mappingLookup = engineConfig.getMapperService().mappingLookup();
final DocumentParser documentParser = engineConfig.getMapperService().documentParser();
try (
var directory = new ByteBuffersDirectory();
var reader = TranslogDirectoryReader.createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, op)
) {
try (var reader = TranslogDirectoryReader.create(shardId, op, mappingLookup, documentParser, engineConfig, () -> {})) {
final Engine.Searcher searcher = new Engine.Searcher(
"assert_translog",
reader,
Expand Down

0 comments on commit 775663e

Please sign in to comment.