Skip to content

Commit

Permalink
Resume blocking Driver when any operator isFinished
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Jan 12, 2025
1 parent eefee72 commit 857b697
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* A driver operates single-threadedly on a simple chain of {@link Operator}s, passing
Expand Down Expand Up @@ -300,12 +299,7 @@ private IsBlockedResult runSingleLoopIteration() {
}

if (movedPage == false) {
return oneOf(
activeOperators.stream()
.map(Operator::isBlocked)
.filter(laf -> laf.listener().isDone() == false)
.collect(Collectors.toList())
);
return blockedResult();
}
return Operator.NOT_BLOCKED;
}
Expand Down Expand Up @@ -408,23 +402,37 @@ void onComplete(ActionListener<Void> listener) {
});
}

private static IsBlockedResult oneOf(List<IsBlockedResult> results) {
private IsBlockedResult blockedResult() {
List<IsBlockedResult> results = activeOperators.stream()
.map(Operator::isBlocked)
.filter(laf -> laf.listener().isDone() == false)
.toList();
if (results.isEmpty()) {
return Operator.NOT_BLOCKED;
}
final SubscribableListener<Void> oneOf = new SubscribableListener<>();
final String reason;
if (results.size() == 1) {
return results.get(0);
results.get(0).listener().addListener(oneOf);
reason = results.get(0).reason();
} else {
StringBuilder reasonStr = new StringBuilder();
for (IsBlockedResult r : results) {
r.listener().addListener(oneOf);
if (reasonStr.isEmpty() == false) {
reasonStr.append(" OR ");
}
reasonStr.append(r.reason());
}
reason = reasonStr.toString();
}
SubscribableListener<Void> oneOf = new SubscribableListener<>();
StringBuilder reason = new StringBuilder();
for (IsBlockedResult r : results) {
r.listener().addListener(oneOf);
if (reason.isEmpty() == false) {
reason.append(" OR ");
for (Operator operator : activeOperators) {
SubscribableListener<Void> onFinished = operator.onFinishedListener();
if (onFinished != null) {
onFinished.addListener(oneOf);
}
reason.append(r.reason());
}
return new IsBlockedResult(oneOf, reason.toString());
return new IsBlockedResult(oneOf, reason);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.compute.Describable;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.xcontent.ToXContentObject;

Expand Down Expand Up @@ -62,6 +63,14 @@ public interface Operator extends Releasable {
*/
boolean isFinished();

/**
* Returns a listener that is notified when the operator is finished. If the operator does not support this, it returns {@null}.
*/
@Nullable
default SubscribableListener<Void> onFinishedListener() {
return null;
}

/**
* returns non-null if output page available. Only called when isFinished() == false
* @throws UnsupportedOperationException if the operator is a {@link SinkOperator}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.compute.operator.exchange;

import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.IsBlockedResult;

Expand All @@ -26,12 +27,12 @@ public interface ExchangeSink {
void finish();

/**
* Whether the sink has received all pages
* Whether the sink is blocked on adding more pages
*/
boolean isFinished();
IsBlockedResult waitForWriting();

/**
* Whether the sink is blocked on adding more pages
* Returns a listener that is notified when the sink is finished
*/
IsBlockedResult waitForWriting();
SubscribableListener<Void> onFinished();
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ public ExchangeSinkHandler(BlockFactory blockFactory, int maxBufferSize, LongSup

private class ExchangeSinkImpl implements ExchangeSink {
boolean finished;
private final SubscribableListener<Void> onFinished = new SubscribableListener<>();

ExchangeSinkImpl() {
onChanged();
buffer.addCompletionListener(onFinished);
outstandingSinks.incrementAndGet();
}

Expand All @@ -68,6 +70,7 @@ public void addPage(Page page) {
public void finish() {
if (finished == false) {
finished = true;
onFinished.onResponse(null);
onChanged();
if (outstandingSinks.decrementAndGet() == 0) {
buffer.finish(false);
Expand All @@ -77,8 +80,8 @@ public void finish() {
}

@Override
public boolean isFinished() {
return finished || buffer.isFinished();
public SubscribableListener<Void> onFinished() {
return onFinished;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -56,7 +57,12 @@ public ExchangeSinkOperator(ExchangeSink sink, Function<Page, Page> transformer)

@Override
public boolean isFinished() {
return sink.isFinished();
return sink.onFinished().isDone();
}

@Override
public SubscribableListener<Void> onFinishedListener() {
return sink.onFinished();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void testBasic() throws Exception {
// sink buffer is full
assertFalse(randomFrom(sink1, sink2).waitForWriting().listener().isDone());
sink1.finish();
assertTrue(sink1.isFinished());
assertTrue(sink1.onFinished().isDone());
for (int i = 0; i < 5; i++) {
assertBusy(() -> assertTrue(source.waitForReading().listener().isDone()));
assertEquals(pages[2 + i], source.pollPage());
Expand All @@ -138,7 +138,7 @@ public void testBasic() throws Exception {
assertFalse(source.waitForReading().listener().isDone());
assertBusy(() -> assertTrue(sink2.waitForWriting().listener().isDone()));
sink2.finish();
assertTrue(sink2.isFinished());
assertTrue(sink2.onFinished().isDone());
assertTrue(source.isFinished());
assertFalse(sourceCompletion.isDone());
source.finish();
Expand Down Expand Up @@ -440,7 +440,7 @@ public void testClosingSinks() {
assertTrue(resp.finished());
assertNull(resp.takePage());
assertTrue(sink.waitForWriting().listener().isDone());
assertTrue(sink.isFinished());
assertTrue(sink.onFinished().isDone());
}

public void testFinishEarly() throws Exception {
Expand Down

0 comments on commit 857b697

Please sign in to comment.