Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x] Make SearchResponseSections Releasable instead of RefCounted (#116404) #119707

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ private void moveToNextPhase(
) {
context.executeNextPhase(this, () -> {
var resp = SearchPhaseController.merge(context.getRequest().scroll() != null, reducedQueryPhase, fetchResultsArr);
context.addReleasable(resp::decRef);
context.addReleasable(resp);
return nextPhaseFactory.apply(resp, searchPhaseShardResults);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@

package org.elasticsearch.action.search;

import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.SimpleRefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.profile.SearchProfileResults;
import org.elasticsearch.search.profile.SearchProfileShardResult;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.transport.LeakTracker;

import java.util.Collections;
import java.util.Map;
Expand All @@ -25,7 +23,7 @@
* Holds some sections that a search response is composed of (hits, aggs, suggestions etc.) during some steps of the search response
* building.
*/
public class SearchResponseSections implements RefCounted {
public class SearchResponseSections implements Releasable {

public static final SearchResponseSections EMPTY_WITH_TOTAL_HITS = new SearchResponseSections(
SearchHits.EMPTY_WITH_TOTAL_HITS,
Expand Down Expand Up @@ -53,8 +51,6 @@ public class SearchResponseSections implements RefCounted {
protected final Boolean terminatedEarly;
protected final int numReducePhases;

private final RefCounted refCounted;

public SearchResponseSections(
SearchHits hits,
InternalAggregations aggregations,
Expand All @@ -72,7 +68,6 @@ public SearchResponseSections(
this.timedOut = timedOut;
this.terminatedEarly = terminatedEarly;
this.numReducePhases = numReducePhases;
refCounted = hits.getHits().length > 0 ? LeakTracker.wrap(new SimpleRefCounted()) : ALWAYS_REFERENCED;
}

public final SearchHits hits() {
Expand All @@ -97,26 +92,7 @@ public final Map<String, SearchProfileShardResult> profile() {
}

@Override
public void incRef() {
refCounted.incRef();
}

@Override
public boolean tryIncRef() {
return refCounted.tryIncRef();
}

@Override
public boolean decRef() {
if (refCounted.decRef()) {
hits.decRef();
return true;
}
return false;
}

@Override
public boolean hasReferences() {
return refCounted.hasReferences();
public void close() {
hits.decRef();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,7 @@ protected final void sendResponse(
if (request.scroll() != null) {
scrollId = request.scrollId();
}
var sections = SearchPhaseController.merge(true, queryPhase, fetchResults);
try {
try (var sections = SearchPhaseController.merge(true, queryPhase, fetchResults)) {
ActionListener.respondAndRelease(
listener,
new SearchResponse(
Expand All @@ -262,8 +261,6 @@ protected final void sendResponse(
null
)
);
} finally {
sections.decRef();
}
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "inner finish failed", e, buildShardFailures()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,10 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL

List<MultiSearchResponse.Item> mSearchResponses = new ArrayList<>(numInnerHits);
for (int innerHitNum = 0; innerHitNum < numInnerHits; innerHitNum++) {
var sections = new SearchResponseSections(collapsedHits.get(innerHitNum), null, null, false, null, null, 1);
try {
try (
var sections = new SearchResponseSections(collapsedHits.get(innerHitNum), null, null, false, null, null, 1)
) {
mockSearchPhaseContext.sendSearchResponse(sections, null);
} finally {
sections.decRef();
}
mSearchResponses.add(new MultiSearchResponse.Item(mockSearchPhaseContext.searchResponse.get(), null));
// transferring ownership to the multi-search response so no need to release here
Expand All @@ -121,11 +120,8 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, hits, () -> new SearchPhase("test") {
@Override
public void run() {
var sections = new SearchResponseSections(hits, null, null, false, null, null, 1);
try {
try (var sections = new SearchResponseSections(hits, null, null, false, null, null, 1)) {
mockSearchPhaseContext.sendSearchResponse(sections, null);
} finally {
sections.decRef();
}
}
});
Expand Down Expand Up @@ -215,11 +211,8 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, hits, () -> new SearchPhase("test") {
@Override
public void run() {
var sections = new SearchResponseSections(hits, null, null, false, null, null, 1);
try {
try (var sections = new SearchResponseSections(hits, null, null, false, null, null, 1)) {
mockSearchPhaseContext.sendSearchResponse(sections, null);
} finally {
sections.decRef();
}
}
});
Expand Down Expand Up @@ -254,11 +247,8 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, hits, () -> new SearchPhase("test") {
@Override
public void run() {
var sections = new SearchResponseSections(hits, null, null, false, null, null, 1);
try {
try (var sections = new SearchResponseSections(hits, null, null, false, null, null, 1)) {
mockSearchPhaseContext.sendSearchResponse(sections, null);
} finally {
sections.decRef();
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,10 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL
searchHits[i] = SearchHitTests.createTestItem(randomBoolean(), randomBoolean());
}
SearchHits hits = new SearchHits(searchHits, new TotalHits(numHits, TotalHits.Relation.EQUAL_TO), 1.0f);
var sections = new SearchResponseSections(hits, null, null, false, null, null, 1);
try {
try (var sections = new SearchResponseSections(hits, null, null, false, null, null, 1)) {
FetchLookupFieldsPhase phase = new FetchLookupFieldsPhase(searchPhaseContext, sections, null);
phase.run();
} finally {
sections.decRef();
hits.decRef();
}
searchPhaseContext.assertNoFailure();
Expand Down Expand Up @@ -189,12 +187,10 @@ void sendExecuteMultiSearch(
new TotalHits(2, TotalHits.Relation.EQUAL_TO),
1.0f
);
var sections = new SearchResponseSections(searchHits, null, null, false, null, null, 1);
try {
try (var sections = new SearchResponseSections(searchHits, null, null, false, null, null, 1)) {
FetchLookupFieldsPhase phase = new FetchLookupFieldsPhase(searchPhaseContext, sections, null);
phase.run();
} finally {
sections.decRef();
searchHits.decRef();
}
assertTrue(requestSent.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,7 @@ public void testMerge() {
reducedQueryPhase.suggest(),
profile
);
final SearchResponseSections mergedResponse = SearchPhaseController.merge(false, reducedQueryPhase, fetchResults);
try {
try (SearchResponseSections mergedResponse = SearchPhaseController.merge(false, reducedQueryPhase, fetchResults)) {
if (trackTotalHits == SearchContext.TRACK_TOTAL_HITS_DISABLED) {
assertNull(mergedResponse.hits.getTotalHits());
} else {
Expand Down Expand Up @@ -346,7 +345,6 @@ public void testMerge() {
assertThat(mergedResponse.profile(), is(anEmptyMap()));
}
} finally {
mergedResponse.decRef();
fetchResults.asList().forEach(TransportMessage::decRef);
}
} finally {
Expand Down Expand Up @@ -410,8 +408,7 @@ protected boolean lessThan(RankDoc a, RankDoc b) {
reducedQueryPhase.suggest(),
false
);
SearchResponseSections mergedResponse = SearchPhaseController.merge(false, reducedQueryPhase, fetchResults);
try {
try (SearchResponseSections mergedResponse = SearchPhaseController.merge(false, reducedQueryPhase, fetchResults)) {
if (trackTotalHits == SearchContext.TRACK_TOTAL_HITS_DISABLED) {
assertNull(mergedResponse.hits.getTotalHits());
} else {
Expand All @@ -427,7 +424,6 @@ protected boolean lessThan(RankDoc a, RankDoc b) {
assertThat(mergedResponse.hits().getHits().length, equalTo(reducedQueryPhase.sortedTopDocs().scoreDocs().length));
assertThat(mergedResponse.profile(), is(anEmptyMap()));
} finally {
mergedResponse.decRef();
fetchResults.asList().forEach(TransportMessage::decRef);
}
} finally {
Expand Down