From 857b697aec7998f5bd2aedfd21998b69f04e5824 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 11 Jan 2025 22:33:45 -0800 Subject: [PATCH] Resume blocking Driver when any operator isFinished --- .../compute/operator/Driver.java | 42 +++++++++++-------- .../compute/operator/Operator.java | 9 ++++ .../operator/exchange/ExchangeSink.java | 9 ++-- .../exchange/ExchangeSinkHandler.java | 7 +++- .../exchange/ExchangeSinkOperator.java | 8 +++- .../exchange/ExchangeServiceTests.java | 6 +-- 6 files changed, 54 insertions(+), 27 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java index acbf8a17b31fd..dfe8e6ee5b843 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java @@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; import java.util.function.Supplier; -import java.util.stream.Collectors; /** * A driver operates single-threadedly on a simple chain of {@link Operator}s, passing @@ -300,12 +299,7 @@ private IsBlockedResult runSingleLoopIteration() { } if (movedPage == false) { - return oneOf( - activeOperators.stream() - .map(Operator::isBlocked) - .filter(laf -> laf.listener().isDone() == false) - .collect(Collectors.toList()) - ); + return blockedResult(); } return Operator.NOT_BLOCKED; } @@ -408,23 +402,37 @@ void onComplete(ActionListener listener) { }); } - private static IsBlockedResult oneOf(List results) { + private IsBlockedResult blockedResult() { + List results = activeOperators.stream() + .map(Operator::isBlocked) + .filter(laf -> laf.listener().isDone() == false) + .toList(); if (results.isEmpty()) { return Operator.NOT_BLOCKED; } + final SubscribableListener oneOf = new SubscribableListener<>(); + final String reason; if (results.size() == 1) { - return results.get(0); + results.get(0).listener().addListener(oneOf); + reason = results.get(0).reason(); + } else { + StringBuilder reasonStr = new StringBuilder(); + for (IsBlockedResult r : results) { + r.listener().addListener(oneOf); + if (reasonStr.isEmpty() == false) { + reasonStr.append(" OR "); + } + reasonStr.append(r.reason()); + } + reason = reasonStr.toString(); } - SubscribableListener oneOf = new SubscribableListener<>(); - StringBuilder reason = new StringBuilder(); - for (IsBlockedResult r : results) { - r.listener().addListener(oneOf); - if (reason.isEmpty() == false) { - reason.append(" OR "); + for (Operator operator : activeOperators) { + SubscribableListener onFinished = operator.onFinishedListener(); + if (onFinished != null) { + onFinished.addListener(oneOf); } - reason.append(r.reason()); } - return new IsBlockedResult(oneOf, reason.toString()); + return new IsBlockedResult(oneOf, reason); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java index 663e06756551b..4a8c0ba23491d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java @@ -13,6 +13,7 @@ import org.elasticsearch.compute.Describable; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.xcontent.ToXContentObject; @@ -62,6 +63,14 @@ public interface Operator extends Releasable { */ boolean isFinished(); + /** + * Returns a listener that is notified when the operator is finished. If the operator does not support this, it returns {@null}. + */ + @Nullable + default SubscribableListener onFinishedListener() { + return null; + } + /** * returns non-null if output page available. Only called when isFinished() == false * @throws UnsupportedOperationException if the operator is a {@link SinkOperator} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSink.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSink.java index e96ca9e39b7e5..0d7e8f24ffa8b 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSink.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSink.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.operator.exchange; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.IsBlockedResult; @@ -26,12 +27,12 @@ public interface ExchangeSink { void finish(); /** - * Whether the sink has received all pages + * Whether the sink is blocked on adding more pages */ - boolean isFinished(); + IsBlockedResult waitForWriting(); /** - * Whether the sink is blocked on adding more pages + * Returns a listener that is notified when the sink is finished */ - IsBlockedResult waitForWriting(); + SubscribableListener onFinished(); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java index 614c3fe0ecc5c..eca5a591358b4 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java @@ -52,9 +52,11 @@ public ExchangeSinkHandler(BlockFactory blockFactory, int maxBufferSize, LongSup private class ExchangeSinkImpl implements ExchangeSink { boolean finished; + private final SubscribableListener onFinished = new SubscribableListener<>(); ExchangeSinkImpl() { onChanged(); + buffer.addCompletionListener(onFinished); outstandingSinks.incrementAndGet(); } @@ -68,6 +70,7 @@ public void addPage(Page page) { public void finish() { if (finished == false) { finished = true; + onFinished.onResponse(null); onChanged(); if (outstandingSinks.decrementAndGet() == 0) { buffer.finish(false); @@ -77,8 +80,8 @@ public void finish() { } @Override - public boolean isFinished() { - return finished || buffer.isFinished(); + public SubscribableListener onFinished() { + return onFinished; } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java index dd89dfe480c36..6788a9cc621a0 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java @@ -9,6 +9,7 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; @@ -56,7 +57,12 @@ public ExchangeSinkOperator(ExchangeSink sink, Function transformer) @Override public boolean isFinished() { - return sink.isFinished(); + return sink.onFinished().isDone(); + } + + @Override + public SubscribableListener onFinishedListener() { + return sink.onFinished(); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java index 8f7532b582bc2..98a1226fc4005 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java @@ -129,7 +129,7 @@ public void testBasic() throws Exception { // sink buffer is full assertFalse(randomFrom(sink1, sink2).waitForWriting().listener().isDone()); sink1.finish(); - assertTrue(sink1.isFinished()); + assertTrue(sink1.onFinished().isDone()); for (int i = 0; i < 5; i++) { assertBusy(() -> assertTrue(source.waitForReading().listener().isDone())); assertEquals(pages[2 + i], source.pollPage()); @@ -138,7 +138,7 @@ public void testBasic() throws Exception { assertFalse(source.waitForReading().listener().isDone()); assertBusy(() -> assertTrue(sink2.waitForWriting().listener().isDone())); sink2.finish(); - assertTrue(sink2.isFinished()); + assertTrue(sink2.onFinished().isDone()); assertTrue(source.isFinished()); assertFalse(sourceCompletion.isDone()); source.finish(); @@ -440,7 +440,7 @@ public void testClosingSinks() { assertTrue(resp.finished()); assertNull(resp.takePage()); assertTrue(sink.waitForWriting().listener().isDone()); - assertTrue(sink.isFinished()); + assertTrue(sink.onFinished().isDone()); } public void testFinishEarly() throws Exception {