Skip to content

Commit

Permalink
Error handling improvements for frame channels. (apache#12895)
Browse files Browse the repository at this point in the history
* Error handling improvements for frame channels.

Two changes:

1) Send errors down in-memory channels (BlockingQueueFrameChannel) on
   failure. This ensures that in situations where a chain of processors
   has been set up on a single machine, all processors see the root
   cause error. In particular, this means the final processor in the
   chain reports the root cause error, which ensures that someone with
   a handle to the final processor will get the proper error.

2) Update FrameFileHttpResponseHandler to expect that the final fetch,
   rather than being simply empty, is also empty with a special header.
   This ensures that the handler is able to tell the difference between
   an empty fetch due to being at EOF, and an empty fetch due to a
   truncated HTTP response (after the 200 OK and headers are sent down,
   but before any content appears).

* Fix tests, imports.

* Checkstyle!
  • Loading branch information
gianm authored Aug 15, 2022
1 parent b26ab67 commit 8463456
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.java.util.common;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;

import javax.annotation.Nullable;
import java.util.Objects;
Expand Down Expand Up @@ -87,7 +86,8 @@ public R valueOrThrow()
if (isValue()) {
return value;
} else if (error instanceof Throwable) {
Throwables.propagateIfPossible((Throwable) error);
// Always wrap Throwable, even if we could throw it directly, to provide additional context
// about where the exception happened (we want the current stack frame in the trace).
throw new RuntimeException((Throwable) error);
} else {
throw new RuntimeException(error.toString());
Expand Down
5 changes: 3 additions & 2 deletions core/src/test/java/org/apache/druid/common/EitherTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ public void testErrorThrowable()
MatcherAssert.assertThat(either.error(), CoreMatchers.instanceOf(AssertionError.class));
MatcherAssert.assertThat(either.error().getMessage(), CoreMatchers.equalTo("oh no"));

final AssertionError e = Assert.assertThrows(AssertionError.class, either::valueOrThrow);
MatcherAssert.assertThat(e.getMessage(), CoreMatchers.equalTo("oh no"));
final RuntimeException e = Assert.assertThrows(RuntimeException.class, either::valueOrThrow);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(AssertionError.class));
MatcherAssert.assertThat(e.getCause().getMessage(), CoreMatchers.equalTo("oh no"));

// Test toString.
Assert.assertEquals("Error[java.lang.AssertionError: oh no]", either.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;

import javax.annotation.Nullable;
import java.util.ArrayDeque;
import java.util.NoSuchElementException;
import java.util.Optional;
Expand Down Expand Up @@ -162,12 +163,12 @@ public ListenableFuture<?> writabilityFuture()
}

@Override
public void fail()
public void fail(@Nullable Throwable cause)
{
synchronized (lock) {
queue.clear();

if (!queue.offer(Optional.of(Either.error(new ISE("Aborted"))))) {
if (!queue.offer(Optional.of(Either.error(cause != null ? cause : new ISE("Failed"))))) {
// If this happens, it's a bug, potentially due to incorrectly using this class with multiple writers.
throw new ISE("Could not write error to channel");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.frame.Frame;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;

Expand Down Expand Up @@ -54,16 +55,20 @@ default void write(Frame frame) throws IOException

/**
* Called prior to {@link #close()} if the writer has failed. Must be followed by a call to {@link #close()}.
*
* @param cause optional cause of failure. Used by the in-memory channel {@link BlockingQueueFrameChannel.Writable}
* to propagate exeptions to downstream processors. Most other channels ignore the provided cause.
*/
void fail() throws IOException;
void fail(@Nullable Throwable cause) throws IOException;

/**
* Finish writing to this channel.
*
* When this method is called without {@link #fail()} having previously been called, the writer is understood to have
* completed successfully.
* When this method is called without {@link #fail(Throwable)} having previously been called, the writer is
* understood to have completed successfully.
*
* After calling this method, no additional calls to {@link #write}, {@link #fail()}, or this method are permitted.
* After calling this method, no additional calls to {@link #write}, {@link #fail(Throwable)}, or this method
* are permitted.
*/
@Override
void close() throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.frame.file.FrameFileWriter;

import javax.annotation.Nullable;
import java.io.IOException;

/**
Expand All @@ -44,8 +45,9 @@ public void write(FrameWithPartition frame) throws IOException
}

@Override
public void fail() throws IOException
public void fail(@Nullable Throwable cause) throws IOException
{
// Cause is ignored when writing to frame files. Readers can tell the file is truncated, but they won't know why.
writer.abort();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,15 @@
* to back off from issuing the next request, if appropriate. However: the handler does not implement backpressure
* through the {@link HttpResponseHandler.TrafficCop} mechanism. Therefore, it is important that each request retrieve
* a modest amount of data.
*
* The last fetch must be empty (zero content bytes) and must have the header {@link #HEADER_LAST_FETCH_NAME} set to
* {@link #HEADER_LAST_FETCH_VALUE}. Under these conditions, {@link FrameFilePartialFetch#isLastFetch()} returns true.
*/
public class FrameFileHttpResponseHandler implements HttpResponseHandler<FrameFilePartialFetch, FrameFilePartialFetch>
{
public static final String HEADER_LAST_FETCH_NAME = "X-Druid-Frame-Last-Fetch";
public static final String HEADER_LAST_FETCH_VALUE = "yes";

private final ReadableByteChunksFrameChannel channel;

public FrameFileHttpResponseHandler(final ReadableByteChunksFrameChannel channel)
Expand All @@ -53,14 +59,17 @@ public FrameFileHttpResponseHandler(final ReadableByteChunksFrameChannel channel
@Override
public ClientResponse<FrameFilePartialFetch> handleResponse(final HttpResponse response, final TrafficCop trafficCop)
{
final ClientResponse<FrameFilePartialFetch> clientResponse = ClientResponse.unfinished(new FrameFilePartialFetch());

if (response.getStatus().getCode() != HttpResponseStatus.OK.getCode()) {
// Note: if the error body is chunked, we will discard all future chunks due to setting exceptionCaught here.
// This is OK because we don't need the body; just the HTTP status code.
final ClientResponse<FrameFilePartialFetch> clientResponse =
ClientResponse.unfinished(new FrameFilePartialFetch(false));
exceptionCaught(clientResponse, new ISE("Server for [%s] returned [%s]", channel.getId(), response.getStatus()));
return clientResponse;
} else {
final boolean lastFetchHeaderSet = HEADER_LAST_FETCH_VALUE.equals(response.headers().get(HEADER_LAST_FETCH_NAME));
final ClientResponse<FrameFilePartialFetch> clientResponse =
ClientResponse.unfinished(new FrameFilePartialFetch(lastFetchHeaderSet));
return response(clientResponse, response.getContent());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
*/
public class FrameFilePartialFetch
{
private final boolean lastFetchHeaderSet;
private long bytesRead;

@Nullable
Expand All @@ -41,13 +42,14 @@ public class FrameFilePartialFetch
@Nullable
private ListenableFuture<?> backpressureFuture;

FrameFilePartialFetch()
FrameFilePartialFetch(boolean lastFetchHeaderSet)
{
this.lastFetchHeaderSet = lastFetchHeaderSet;
}

public boolean isEmptyFetch()
public boolean isLastFetch()
{
return exceptionCaught == null && bytesRead == 0L;
return exceptionCaught == null && lastFetchHeaderSet && bytesRead == 0L;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -317,7 +318,7 @@ private void fail(Throwable e)
{
for (final WritableFrameChannel outputChannel : outputChannels) {
try {
outputChannel.fail();
outputChannel.fail(e);
}
catch (Throwable e1) {
e.addSuppressed(e1);
Expand Down Expand Up @@ -535,7 +536,7 @@ private void cancel(final Set<FrameProcessor<?>> processorsToCancel)
// Fail all output channels prior to calling cleanup.
for (final WritableFrameChannel outputChannel : processor.outputChannels()) {
try {
outputChannel.fail();
outputChannel.fail(new CancellationException("Canceled"));
}
catch (Throwable e) {
log.debug(e, "Exception encountered while marking output channel failed for processor [%s]", processor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ public void testNonChunkedResponse() throws Exception
Assert.assertFalse(response1.isFinished());
Assert.assertTrue(response1.isContinueReading());
Assert.assertFalse(response1.getObj().isExceptionCaught());
Assert.assertFalse(response1.getObj().isEmptyFetch());
Assert.assertFalse(response1.getObj().isLastFetch());

final ClientResponse<FrameFilePartialFetch> response2 = handler.done(response1);

Assert.assertTrue(response2.isFinished());
Assert.assertTrue(response2.isContinueReading());
Assert.assertFalse(response2.getObj().isExceptionCaught());
Assert.assertFalse(response2.getObj().isEmptyFetch());
Assert.assertFalse(response2.getObj().isLastFetch());

final ListenableFuture<?> backpressureFuture = response2.getObj().backpressureFuture();
Assert.assertFalse(backpressureFuture.isDone());
Expand All @@ -143,7 +143,7 @@ public void testNonChunkedResponse() throws Exception
}

@Test
public void testEmptyResponse()
public void testEmptyResponseWithoutLastFetchHeader()
{
final ClientResponse<FrameFilePartialFetch> response1 = handler.handleResponse(
makeResponse(HttpResponseStatus.OK, ByteArrays.EMPTY_ARRAY),
Expand All @@ -153,14 +153,42 @@ public void testEmptyResponse()
Assert.assertFalse(response1.isFinished());
Assert.assertTrue(response1.isContinueReading());
Assert.assertFalse(response1.getObj().isExceptionCaught());
Assert.assertTrue(response1.getObj().isEmptyFetch());
Assert.assertFalse(response1.getObj().isLastFetch());

final ClientResponse<FrameFilePartialFetch> response2 = handler.done(response1);

Assert.assertTrue(response2.isFinished());
Assert.assertTrue(response2.isContinueReading());
Assert.assertFalse(response2.getObj().isExceptionCaught());
Assert.assertTrue(response2.getObj().isEmptyFetch());
Assert.assertFalse(response2.getObj().isLastFetch());
Assert.assertTrue(response2.getObj().backpressureFuture().isDone());
}

@Test
public void testEmptyResponseWithLastFetchHeader()
{
final HttpResponse serverResponse = makeResponse(HttpResponseStatus.OK, ByteArrays.EMPTY_ARRAY);
serverResponse.headers().set(
FrameFileHttpResponseHandler.HEADER_LAST_FETCH_NAME,
FrameFileHttpResponseHandler.HEADER_LAST_FETCH_VALUE
);

final ClientResponse<FrameFilePartialFetch> response1 = handler.handleResponse(
serverResponse,
null
);

Assert.assertFalse(response1.isFinished());
Assert.assertTrue(response1.isContinueReading());
Assert.assertFalse(response1.getObj().isExceptionCaught());
Assert.assertTrue(response1.getObj().isLastFetch());

final ClientResponse<FrameFilePartialFetch> response2 = handler.done(response1);

Assert.assertTrue(response2.isFinished());
Assert.assertTrue(response2.isContinueReading());
Assert.assertFalse(response2.getObj().isExceptionCaught());
Assert.assertTrue(response2.getObj().isLastFetch());
Assert.assertTrue(response2.getObj().backpressureFuture().isDone());
}

Expand All @@ -186,15 +214,15 @@ public void testChunkedResponse() throws Exception

Assert.assertFalse(response.isFinished());
Assert.assertFalse(response.getObj().isExceptionCaught());
Assert.assertFalse(response.getObj().isEmptyFetch());
Assert.assertFalse(response.getObj().isLastFetch());
}

final ClientResponse<FrameFilePartialFetch> finalResponse = handler.done(response);

Assert.assertTrue(finalResponse.isFinished());
Assert.assertTrue(finalResponse.isContinueReading());
Assert.assertFalse(response.getObj().isExceptionCaught());
Assert.assertFalse(response.getObj().isEmptyFetch());
Assert.assertFalse(response.getObj().isLastFetch());

final ListenableFuture<?> backpressureFuture = response.getObj().backpressureFuture();
Assert.assertFalse(backpressureFuture.isDone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,18 +195,23 @@ public void test_runFully_errors() throws Exception
);

MatcherAssert.assertThat(
e.getCause(),
e.getCause().getCause(),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("failure!"))
);

final ReadableFrameChannel outReadableChannel = outChannel.readable();
Assert.assertTrue(outReadableChannel.canRead());

Assert.assertThrows(
IllegalStateException.class,
final RuntimeException readException = Assert.assertThrows(
RuntimeException.class,
outReadableChannel::read
);

MatcherAssert.assertThat(
readException.getCause(),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("failure!"))
);

Assert.assertTrue(outReadableChannel.isFinished()); // Finished now that we read the error
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,11 @@ public void test_runAllFully_failing()

final ExecutionException e = Assert.assertThrows(ExecutionException.class, future::get);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RuntimeException.class));
MatcherAssert.assertThat(e.getCause(), ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("failure!")));
MatcherAssert.assertThat(e.getCause().getCause(), CoreMatchers.instanceOf(RuntimeException.class));
MatcherAssert.assertThat(
e.getCause().getCause(),
ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("failure!"))
);
}

@Test
Expand Down

0 comments on commit 8463456

Please sign in to comment.