Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,6 @@ void onShardFailure(final int shardIndex, SearchShardTarget shardTarget, Excepti
successfulOps.decrementAndGet(); // if this shard was successful before (initial phase) we have to adjust the counter
}
}
results.consumeShardFailure(shardIndex);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CanMatch had the only non-noop implementation of this method so it can go away now :)

}

private static boolean isTaskCancelledException(Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.core.Types.forciblyCast;
Expand Down Expand Up @@ -74,7 +73,9 @@ final class CanMatchPreFilterSearchPhase {
private final Executor executor;
private final boolean requireAtLeastOneMatch;

private final CanMatchSearchPhaseResults results;
private final FixedBitSet possibleMatches;
private final MinAndMax<?>[] minAndMaxes;
private int numPossibleMatches;
private final CoordinatorRewriteContextProvider coordinatorRewriteContextProvider;

CanMatchPreFilterSearchPhase(
Expand Down Expand Up @@ -105,12 +106,13 @@ final class CanMatchPreFilterSearchPhase {
this.requireAtLeastOneMatch = requireAtLeastOneMatch;
this.coordinatorRewriteContextProvider = coordinatorRewriteContextProvider;
this.executor = executor;
results = new CanMatchSearchPhaseResults(shardsIts.size());

final int size = shardsIts.size();
possibleMatches = new FixedBitSet(size);
minAndMaxes = new MinAndMax<?>[size];
// we compute the shard index based on the natural order of the shards
// that participate in the search request. This means that this number is
// consistent between two requests that target the same shards.
final SearchShardIterator[] naturalOrder = new SearchShardIterator[shardsIts.size()];
final SearchShardIterator[] naturalOrder = new SearchShardIterator[size];
int i = 0;
for (SearchShardIterator shardsIt : shardsIts) {
naturalOrder[i++] = shardsIt;
Expand Down Expand Up @@ -138,7 +140,7 @@ private void runCoordinatorRewritePhase() {
request,
searchShardIterator.getOriginalIndices().indicesOptions(),
Collections.emptyList(),
getNumShards(),
shardsIts.size(),
timeProvider.absoluteStartMillis(),
searchShardIterator.getClusterAlias()
);
Expand Down Expand Up @@ -177,7 +179,27 @@ private void runCoordinatorRewritePhase() {
private void consumeResult(boolean canMatch, ShardSearchRequest request) {
CanMatchShardResponse result = new CanMatchShardResponse(canMatch, null);
result.setShardIndex(request.shardRequestIndex());
results.consumeResult(result, () -> {});
consumeResult(result, () -> {});
}

private void consumeResult(CanMatchShardResponse result, Runnable next) {
try {
final boolean canMatch = result.canMatch();
final MinAndMax<?> minAndMax = result.estimatedMinAndMax();
if (canMatch || minAndMax != null) {
consumeResult(result.getShardIndex(), canMatch, minAndMax);
}
} finally {
next.run();
}
}

private synchronized void consumeResult(int shardIndex, boolean canMatch, MinAndMax<?> minAndMax) {
if (canMatch) {
possibleMatches.set(shardIndex);
numPossibleMatches++;
}
minAndMaxes[shardIndex] = minAndMax;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we canMatch and have minAndMax==null? Can we override a previous minAndMaxes? I don't think so since is per shard but wanted to make sure.
And do we need to store minAndMax if canMatch is false? Not sure that's useful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct on the minmax != null only for true but the code both before and after this PR does some redundant work here. This is all fixed in #116881, this PR is just meant to shorten that other PR and make those logical fixes easy to follow (in fact the min-max situation you point out is leveraged in that PR :) as an optimization).

}

private void checkNoMissingShards(GroupShardsIterator<SearchShardIterator> shards) {
Expand Down Expand Up @@ -235,32 +257,38 @@ protected void doRun() {
continue;
}

var sendingTarget = entry.getKey();
try {
searchTransportService.sendCanMatch(getConnection(entry.getKey()), canMatchNodeRequest, task, new ActionListener<>() {
@Override
public void onResponse(CanMatchNodeResponse canMatchNodeResponse) {
assert canMatchNodeResponse.getResponses().size() == canMatchNodeRequest.getShardLevelRequests().size();
for (int i = 0; i < canMatchNodeResponse.getResponses().size(); i++) {
CanMatchNodeResponse.ResponseOrFailure response = canMatchNodeResponse.getResponses().get(i);
if (response.getResponse() != null) {
CanMatchShardResponse shardResponse = response.getResponse();
shardResponse.setShardIndex(shardLevelRequests.get(i).getShardRequestIndex());
onOperation(shardResponse.getShardIndex(), shardResponse);
} else {
Exception failure = response.getException();
assert failure != null;
onOperationFailed(shardLevelRequests.get(i).getShardRequestIndex(), failure);
searchTransportService.sendCanMatch(
nodeIdToConnection.apply(sendingTarget.clusterAlias, sendingTarget.nodeId),
canMatchNodeRequest,
task,
new ActionListener<>() {
@Override
public void onResponse(CanMatchNodeResponse canMatchNodeResponse) {
assert canMatchNodeResponse.getResponses().size() == canMatchNodeRequest.getShardLevelRequests().size();
for (int i = 0; i < canMatchNodeResponse.getResponses().size(); i++) {
CanMatchNodeResponse.ResponseOrFailure response = canMatchNodeResponse.getResponses().get(i);
if (response.getResponse() != null) {
CanMatchShardResponse shardResponse = response.getResponse();
shardResponse.setShardIndex(shardLevelRequests.get(i).getShardRequestIndex());
onOperation(shardResponse.getShardIndex(), shardResponse);
} else {
Exception failure = response.getException();
assert failure != null;
onOperationFailed(shardLevelRequests.get(i).getShardRequestIndex(), failure);
}
}
}
}

@Override
public void onFailure(Exception e) {
for (CanMatchNodeRequest.Shard shard : shardLevelRequests) {
onOperationFailed(shard.getShardRequestIndex(), e);
@Override
public void onFailure(Exception e) {
for (CanMatchNodeRequest.Shard shard : shardLevelRequests) {
onOperationFailed(shard.getShardRequestIndex(), e);
}
}
}
});
);
} catch (Exception e) {
for (CanMatchNodeRequest.Shard shard : shardLevelRequests) {
onOperationFailed(shard.getShardRequestIndex(), e);
Expand All @@ -271,7 +299,7 @@ public void onFailure(Exception e) {

private void onOperation(int idx, CanMatchShardResponse response) {
failedResponses.set(idx, null);
results.consumeResult(response, () -> {
consumeResult(response, () -> {
if (countDown.countDown()) {
finishRound();
}
Expand All @@ -280,7 +308,8 @@ private void onOperation(int idx, CanMatchShardResponse response) {

private void onOperationFailed(int idx, Exception e) {
failedResponses.set(idx, e);
results.consumeShardFailure(idx);
// we have to carry over shard failures in order to account for them in the response.
consumeResult(idx, true, null);
if (countDown.countDown()) {
finishRound();
}
Expand Down Expand Up @@ -334,14 +363,14 @@ private CanMatchNodeRequest createCanMatchRequest(Map.Entry<SendingTarget, List<
request,
first.getOriginalIndices().indicesOptions(),
shardLevelRequests,
getNumShards(),
shardsIts.size(),
timeProvider.absoluteStartMillis(),
first.getClusterAlias()
);
}

private void finishPhase() {
listener.onResponse(getIterator(results, shardsIts));
listener.onResponse(getIterator(shardsIts));
}

private static final float DEFAULT_INDEX_BOOST = 1.0f;
Expand All @@ -364,7 +393,7 @@ public CanMatchNodeRequest.Shard buildShardLevelRequest(SearchShardIterator shar
}

public void start() {
if (getNumShards() == 0) {
if (shardsIts.size() == 0) {
finishPhase();
return;
}
Expand All @@ -386,85 +415,13 @@ protected void doRun() {
});
}

public void onPhaseFailure(String msg, Exception cause) {
private void onPhaseFailure(String msg, Exception cause) {
listener.onFailure(new SearchPhaseExecutionException("can_match", msg, cause, ShardSearchFailure.EMPTY_ARRAY));
}

public Transport.Connection getConnection(SendingTarget sendingTarget) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was single use, no need to be tricky just inline it. This was a remnant of when can-match was an extension of the abstract async phase.

return nodeIdToConnection.apply(sendingTarget.clusterAlias, sendingTarget.nodeId);
}

private int getNumShards() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this, just inline it to the 3 uses, it's rather counterproductive to hide how we determine shard count to begin with IMO.

return shardsIts.size();
}

private static final class CanMatchSearchPhaseResults extends SearchPhaseResults<CanMatchShardResponse> {
private final FixedBitSet possibleMatches;
private final MinAndMax<?>[] minAndMaxes;
private int numPossibleMatches;

CanMatchSearchPhaseResults(int size) {
super(size);
possibleMatches = new FixedBitSet(size);
minAndMaxes = new MinAndMax<?>[size];
}

@Override
void consumeResult(CanMatchShardResponse result, Runnable next) {
try {
final boolean canMatch = result.canMatch();
final MinAndMax<?> minAndMax = result.estimatedMinAndMax();
if (canMatch || minAndMax != null) {
consumeResult(result.getShardIndex(), canMatch, minAndMax);
}
} finally {
next.run();
}
}

@Override
boolean hasResult(int shardIndex) {
return false; // unneeded
}

@Override
void consumeShardFailure(int shardIndex) {
// we have to carry over shard failures in order to account for them in the response.
consumeResult(shardIndex, true, null);
}

private synchronized void consumeResult(int shardIndex, boolean canMatch, MinAndMax<?> minAndMax) {
if (canMatch) {
possibleMatches.set(shardIndex);
numPossibleMatches++;
}
minAndMaxes[shardIndex] = minAndMax;
}

synchronized int getNumPossibleMatches() {
return numPossibleMatches;
}

synchronized FixedBitSet getPossibleMatches() {
return possibleMatches;
}

@Override
Stream<CanMatchShardResponse> getSuccessfulResults() {
return Stream.empty();
}

@Override
public void close() {}
}

private GroupShardsIterator<SearchShardIterator> getIterator(
CanMatchSearchPhaseResults results,
GroupShardsIterator<SearchShardIterator> shardsIts
) {
FixedBitSet possibleMatches = results.getPossibleMatches();
private synchronized GroupShardsIterator<SearchShardIterator> getIterator(GroupShardsIterator<SearchShardIterator> shardsIts) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably doesn't even need to be synchronized because we only call this after we passed a sync block for setting the last rresult, but given that we have one more step to go here that removes all sync anyway and with this not being contended to begin with, no need to take any risks here :)

// TODO: pick the local shard when possible
if (requireAtLeastOneMatch && results.getNumPossibleMatches() == 0) {
if (requireAtLeastOneMatch && numPossibleMatches == 0) {
// this is a special case where we have no hit but we need to get at least one search response in order
// to produce a valid search result with all the aggs etc.
// Since it's possible that some of the shards that we're skipping are
Expand All @@ -491,11 +448,11 @@ private GroupShardsIterator<SearchShardIterator> getIterator(
iter.skip(true);
}
}
if (shouldSortShards(results.minAndMaxes) == false) {
if (shouldSortShards(minAndMaxes) == false) {
return shardsIts;
}
FieldSortBuilder fieldSort = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source());
return new GroupShardsIterator<>(sortShards(shardsIts, results.minAndMaxes, fieldSort.order()));
return new GroupShardsIterator<>(sortShards(shardsIts, minAndMaxes, fieldSort.order()));
}

private static List<SearchShardIterator> sortShards(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ final int getNumShards() {
*/
abstract boolean hasResult(int shardIndex);

void consumeShardFailure(int shardIndex) {}

AtomicArray<Result> getAtomicArray() {
throw new UnsupportedOperationException();
}
Expand Down