Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not serialize EsIndex in plan #119580

Merged
merged 29 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2ff1d76
EsQueryExec
idegtiarenko Jan 6, 2025
e3fe769
EsStatsQueryExec
idegtiarenko Jan 6, 2025
ce0fbde
cleanup serialization
idegtiarenko Jan 6, 2025
b6c17b8
EsSourceExec
idegtiarenko Jan 6, 2025
cb41a7a
EsRelation
idegtiarenko Jan 6, 2025
c283fd3
format
idegtiarenko Jan 6, 2025
d45ed5a
Update docs/changelog/119580.yaml
idegtiarenko Jan 6, 2025
cfc22d9
fix sizing tests
idegtiarenko Jan 6, 2025
611ccae
Merge branch 'main' into do_not_serialize_es_index
idegtiarenko Jan 7, 2025
d1effd7
Merge branch 'main' into do_not_serialize_es_index
idegtiarenko Jan 9, 2025
93c93da
fix typo
idegtiarenko Jan 9, 2025
bdfa4f3
remove frozen flag
idegtiarenko Jan 9, 2025
a09ddc0
fix test
idegtiarenko Jan 10, 2025
13437ca
Merge branch 'main' into do_not_serialize_es_index
idegtiarenko Jan 13, 2025
3685545
remove unused maps
idegtiarenko Jan 13, 2025
60548d3
Merge branch 'main' into do_not_serialize_es_index
idegtiarenko Jan 13, 2025
be29c6a
Merge branch 'main' into do_not_serialize_es_index
idegtiarenko Jan 13, 2025
4bdfb6c
Merge branch 'main' into do_not_serialize_es_index
idegtiarenko Jan 14, 2025
945b1a3
remove ESQL_REMOVE_ES_RELATION_FROZEN
idegtiarenko Jan 14, 2025
ab26077
Revert "remove unused maps"
idegtiarenko Jan 15, 2025
179c0a4
Merge branch 'main' into do_not_serialize_es_index
idegtiarenko Jan 15, 2025
b1e959b
rename field
idegtiarenko Jan 15, 2025
fa92d65
add a test case when pointing multiple indices
idegtiarenko Jan 15, 2025
b408a30
Merge branch 'main' into do_not_serialize_es_index
idegtiarenko Jan 15, 2025
6d16184
Merge branch 'main' into do_not_serialize_es_index
idegtiarenko Jan 17, 2025
7a7f309
fix pattern
idegtiarenko Jan 17, 2025
abe15c6
Merge branch 'main' into do_not_serialize_es_index
idegtiarenko Jan 20, 2025
d63db73
align parameters order
idegtiarenko Jan 20, 2025
c86eb30
Merge branch 'main' into do_not_serialize_es_index
idegtiarenko Jan 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
EsSourceExec
  • Loading branch information
idegtiarenko committed Jan 6, 2025
commit b6c17b85d8bad766826970c9f0f7912c7995f0f8
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ protected PhysicalPlan rule(EsSourceExec plan) {
attributes.add(ma);
}
});
return new EsQueryExec(plan.source(), plan.index().name(), plan.indexMode(), plan.index().indexNameWithModes(), attributes, plan.query());
return new EsQueryExec(plan.source(), plan.indexName(), plan.indexMode(), plan.indexNameWithModes(), attributes, plan.query());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,45 +22,64 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.TransportVersions.ESQ_SKIP_ES_INDEX_SERIALIZATION;

public class EsSourceExec extends LeafExec {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
PhysicalPlan.class,
"EsSourceExec",
EsSourceExec::readFrom
);

private final EsIndex index;
private final List<Attribute> attributes;
private final QueryBuilder query;
private final String indexName;
private final IndexMode indexMode;
private final Map<String, IndexMode> indexNameWithModes;
private final QueryBuilder query;
private final List<Attribute> attributes;

public EsSourceExec(EsRelation relation) {
this(relation.source(), relation.index(), relation.output(), null, relation.indexMode());
this(relation.source(), relation.index().name(), relation.indexMode(), relation.index().indexNameWithModes(), null, relation.output());
}

public EsSourceExec(Source source, EsIndex index, List<Attribute> attributes, QueryBuilder query, IndexMode indexMode) {
public EsSourceExec(Source source, String indexName, IndexMode indexMode, Map<String, IndexMode> indexNameWithModes, QueryBuilder query, List<Attribute> attributes) {
super(source);
this.index = index;
this.attributes = attributes;
this.query = query;
this.indexName = indexName;
this.indexMode = indexMode;
this.indexNameWithModes = indexNameWithModes;
this.query = query;
this.attributes = attributes;
}

private static EsSourceExec readFrom(StreamInput in) throws IOException {
var source = Source.readFrom((PlanStreamInput) in);
var index = new EsIndex(in);
String indexName;
Map<String, IndexMode> indexNameWithModes;
if (in.getTransportVersion().onOrAfter(ESQ_SKIP_ES_INDEX_SERIALIZATION)) {
indexName = in.readString();
indexNameWithModes = in.readMap(IndexMode::readFrom);
} else {
var index = new EsIndex(in);
indexName = index.name();
indexNameWithModes = index.indexNameWithModes();
}
var attributes = in.readNamedWriteableCollectionAsList(Attribute.class);
var query = in.readOptionalNamedWriteable(QueryBuilder.class);
var indexMode = EsRelation.readIndexMode(in);
return new EsSourceExec(source, index, attributes, query, indexMode);
return new EsSourceExec(source, indexName, indexMode, indexNameWithModes, query, attributes);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
Source.EMPTY.writeTo(out);
index().writeTo(out);
if (out.getTransportVersion().onOrAfter(ESQ_SKIP_ES_INDEX_SERIALIZATION)) {
out.writeString(indexName);
out.writeMap(indexNameWithModes, (o, v) -> IndexMode.writeTo(v, out));
} else {
new EsIndex(indexName, Map.of(), indexNameWithModes).writeTo(out);
}
out.writeNamedWriteableCollection(output());
out.writeOptionalNamedWriteable(query());
EsRelation.writeIndexMode(out, indexMode());
Expand All @@ -71,31 +90,35 @@ public String getWriteableName() {
return ENTRY.name;
}

public EsIndex index() {
return index;
}

public QueryBuilder query() {
return query;
public String indexName() {
return indexName;
}

public IndexMode indexMode() {
return indexMode;
}

public Map<String, IndexMode> indexNameWithModes() {
return indexNameWithModes;
}

public QueryBuilder query() {
return query;
}

@Override
public List<Attribute> output() {
return attributes;
}

@Override
protected NodeInfo<? extends PhysicalPlan> info() {
return NodeInfo.create(this, EsSourceExec::new, index, attributes, query, indexMode);
return NodeInfo.create(this, EsSourceExec::new, indexName, indexMode, indexNameWithModes, query, attributes);
}

@Override
public int hashCode() {
return Objects.hash(index, attributes, query, indexMode);
return Objects.hash(indexName, indexMode, indexNameWithModes, query, attributes);
}

@Override
Expand All @@ -109,14 +132,15 @@ public boolean equals(Object obj) {
}

EsSourceExec other = (EsSourceExec) obj;
return Objects.equals(index, other.index)
&& Objects.equals(attributes, other.attributes)
return Objects.equals(indexName, other.indexName)
&& Objects.equals(indexMode, other.indexMode)
&& Objects.equals(indexNameWithModes, other.indexNameWithModes)
&& Objects.equals(query, other.query)
&& Objects.equals(indexMode, other.indexMode);
&& Objects.equals(attributes, other.attributes);
}

@Override
public String nodeString() {
return nodeName() + "[" + index + "]" + NodeUtils.limitedToString(attributes);
return nodeName() + "[" + indexName + "]" + NodeUtils.limitedToString(attributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public static PhysicalPlan localPlan(
if (filter != null) {
physicalFragment = physicalFragment.transformUp(
EsSourceExec.class,
query -> new EsSourceExec(Source.EMPTY, query.index(), query.output(), filter, query.indexMode())
query -> new EsSourceExec(Source.EMPTY, query.indexName(), query.indexMode(), query.indexNameWithModes(), filter, query.output())
);
}
var localOptimized = physicalOptimizer.localOptimize(physicalFragment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2576,10 +2576,9 @@ public void testFieldExtractWithoutSourceAttributes() {
| where round(emp_no) > 10
"""));
// Transform the verified plan so that it is invalid (i.e. no source attributes)
List<Attribute> emptyAttrList = List.of();
var badPlan = verifiedPlan.transformDown(
EsQueryExec.class,
node -> new EsSourceExec(node.source(), null /*TODO 112998*/, emptyAttrList, node.query(), IndexMode.STANDARD)
node -> new EsSourceExec(node.source(), node.indexName(), IndexMode.STANDARD, node.indexNameWithModes(), node.query(), List.of())
);

var e = expectThrows(VerificationException.class, () -> physicalPlanOptimizer.verify(badPlan));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,26 @@
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.index.EsIndex;
import org.elasticsearch.xpack.esql.index.EsIndexSerializationTests;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.xpack.esql.plan.logical.AbstractLogicalPlanSerializationTests.randomFieldAttributes;
import static org.elasticsearch.xpack.esql.index.EsIndexSerializationTests.randomIndexNameWithModes;

public class EsSourceExecSerializationTests extends AbstractPhysicalPlanSerializationTests<EsSourceExec> {
public static EsSourceExec randomEsSourceExec() {
Source source = randomSource();
EsIndex index = EsIndexSerializationTests.randomEsIndex();
List<Attribute> attributes = randomFieldAttributes(1, 10, false);
QueryBuilder query = new TermQueryBuilder(randomAlphaOfLength(5), randomAlphaOfLength(5));
IndexMode indexMode = randomFrom(IndexMode.values());
return new EsSourceExec(source, index, attributes, query, indexMode);
return new EsSourceExec(
randomSource(),
randomIdentifier(),
randomFrom(IndexMode.values()),
randomIndexNameWithModes(),
new TermQueryBuilder(randomAlphaOfLength(5), randomAlphaOfLength(5)),
randomFieldAttributes(1, 10, false)
);
}

@Override
Expand All @@ -37,18 +39,20 @@ protected EsSourceExec createTestInstance() {

@Override
protected EsSourceExec mutateInstance(EsSourceExec instance) throws IOException {
EsIndex index = instance.index();
List<Attribute> attributes = instance.output();
QueryBuilder query = instance.query();
String indexName = instance.indexName();
IndexMode indexMode = instance.indexMode();
switch (between(0, 3)) {
case 0 -> index = randomValueOtherThan(index, EsIndexSerializationTests::randomEsIndex);
case 1 -> attributes = randomValueOtherThan(attributes, () -> randomFieldAttributes(1, 10, false));
case 2 -> query = randomValueOtherThan(query, () -> new TermQueryBuilder(randomAlphaOfLength(5), randomAlphaOfLength(5)));
case 3 -> indexMode = randomValueOtherThan(indexMode, () -> randomFrom(IndexMode.values()));
Map<String, IndexMode> indexNameWithModes = instance.indexNameWithModes();
QueryBuilder query = instance.query();
List<Attribute> attributes = instance.output();
switch (between(0, 4)) {
case 0 -> indexName = randomValueOtherThan(indexName, ESTestCase::randomIdentifier);
case 1 -> indexMode = randomValueOtherThan(indexMode, () -> randomFrom(IndexMode.values()));
case 2 -> indexNameWithModes = randomValueOtherThan(indexNameWithModes, EsIndexSerializationTests::randomIndexNameWithModes);
case 3 -> query = randomValueOtherThan(query, () -> new TermQueryBuilder(randomAlphaOfLength(5), randomAlphaOfLength(5)));
case 4 -> attributes = randomValueOtherThan(attributes, () -> randomFieldAttributes(1, 10, false));
default -> throw new IllegalStateException();
}
return new EsSourceExec(instance.source(), index, attributes, query, indexMode);
return new EsSourceExec(instance.source(), indexName, indexMode, indexNameWithModes, query, attributes);
}

@Override
Expand Down