Skip to content

Commit

Permalink
Add retry to opening retrying stream (apache#14126)
Browse files Browse the repository at this point in the history
* Add retry to opening retrying stream
* Add retry to S3Entity for network issues

* Fix tests and clean up code
  • Loading branch information
adarshsanjeev authored Apr 27, 2023
1 parent 42c8c84 commit 5aa119d
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ public boolean apply(Throwable e)
// SdkClientException can be thrown for many reasons and the only way to distinguish it is to look at
// the message. This is not ideal, since the message may change, so it may need to be adjusted in the future.
return true;
} else if (e instanceof SdkClientException && e.getMessage().contains("Unable to execute HTTP request")) {
// This is likely due to a temporary DNS issue and can be retried.
return true;
} else if (e instanceof AmazonClientException) {
return AWSClientUtil.isClientExceptionRecoverable((AmazonClientException) e);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class RetryingInputStream<T> extends InputStream
private long startOffset;

// Used in tests to disable waiting.
private boolean doWait;
private final boolean doWait;

/**
* @param object The object entity to open
Expand All @@ -69,23 +69,62 @@ public RetryingInputStream(
Predicate<Throwable> retryCondition,
@Nullable Integer maxTries
) throws IOException
{
this(object, objectOpenFunction, retryCondition, maxTries, true);
}

@VisibleForTesting
RetryingInputStream(
T object,
ObjectOpenFunction<T> objectOpenFunction,
Predicate<Throwable> retryCondition,
@Nullable Integer maxTries,
boolean doWait
) throws IOException
{
this.object = Preconditions.checkNotNull(object, "object");
this.objectOpenFunction = Preconditions.checkNotNull(objectOpenFunction, "objectOpenFunction");
this.retryCondition = Preconditions.checkNotNull(retryCondition, "retryCondition");
this.maxTries = maxTries == null ? RetryUtils.DEFAULT_MAX_TRIES : maxTries;
this.delegate = new CountingInputStream(objectOpenFunction.open(object));
this.doWait = true;
this.doWait = doWait;

if (this.maxTries <= 1) {
throw new IAE("maxTries must be greater than 1");
}
openWithRetry(0);
}

private void openIfNeeded() throws IOException
{
if (delegate == null) {
delegate = new CountingInputStream(objectOpenFunction.open(object, startOffset));
openWithRetry(startOffset);
}
}

private void openWithRetry(final long offset) throws IOException
{
for (int nTry = 0; nTry < maxTries; nTry++) {
try {
delegate = new CountingInputStream(objectOpenFunction.open(object, offset));
break;
}
catch (Throwable t) {
final int nextTry = nTry + 1;
if (nextTry < maxTries && retryCondition.apply(t)) {
final String message = StringUtils.format("Stream interrupted at position [%d]", offset);
try {
if (doWait) {
RetryUtils.awaitNextRetry(t, message, nextTry, maxTries, false);
}
}
catch (InterruptedException e) {
t.addSuppressed(e);
throwAsIOException(t);
}
} else {
throwAsIOException(t);
}
}
}
}

Expand Down Expand Up @@ -115,8 +154,7 @@ private void waitOrThrow(Throwable t, int nTry) throws IOException
if (doWait) {
RetryUtils.awaitNextRetry(t, message, nextTry, maxTries, false);
}

delegate = new CountingInputStream(objectOpenFunction.open(object, startOffset));
openWithRetry(startOffset);
}
catch (InterruptedException | IOException e) {
t.addSuppressed(e);
Expand Down Expand Up @@ -233,10 +271,4 @@ public void close() throws IOException
delegate.close();
}
}

@VisibleForTesting
void setNoWait()
{
this.doWait = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import javax.annotation.Nonnull;
import java.io.DataInputStream;
Expand All @@ -44,6 +46,13 @@
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

public class RetryingInputStreamTest
{
private static final int MAX_RETRY = 5;
Expand All @@ -58,7 +67,7 @@ public class RetryingInputStreamTest
private int throwIOExceptions = 0;


private final ObjectOpenFunction<File> objectOpenFunction = new ObjectOpenFunction<File>()
private final ObjectOpenFunction<File> objectOpenFunction = spy(new ObjectOpenFunction<File>()
{
@Override
public InputStream open(File object) throws IOException
Expand All @@ -73,7 +82,7 @@ public InputStream open(File object, long start) throws IOException
Preconditions.checkState(fis.skip(start) == start);
return new TestInputStream(fis);
}
};
});

@Before
public void setup() throws IOException
Expand Down Expand Up @@ -107,10 +116,10 @@ public void testThrowsOnIOException() throws IOException
testFile,
objectOpenFunction,
t -> false, // will not retry
MAX_RETRY
MAX_RETRY,
false
);

retryingInputStream.setNoWait();
Assert.assertThrows(
IOException.class,
() -> retryHelper(retryingInputStream)
Expand All @@ -127,10 +136,10 @@ public void testRetryOnCustomException() throws IOException
testFile,
objectOpenFunction,
t -> t instanceof CustomException,
MAX_RETRY
MAX_RETRY,
false
);

retryingInputStream.setNoWait();
retryHelper(retryingInputStream);

Assert.assertEquals(0, throwCustomExceptions);
Expand All @@ -144,10 +153,10 @@ public void testThrowsOnCustomException() throws IOException
testFile,
objectOpenFunction,
t -> false, // will not retry
MAX_RETRY
MAX_RETRY,
false
);

retryingInputStream.setNoWait();
final IOException e = Assert.assertThrows(
IOException.class,
() -> retryHelper(retryingInputStream)
Expand All @@ -167,10 +176,10 @@ public void testResumeAfterExceptions() throws IOException
testFile,
objectOpenFunction,
t -> true, // always retry
MAX_RETRY
MAX_RETRY,
false
);

retryingInputStream.setNoWait();
retryHelper(retryingInputStream);

// Tried more than MAX_RETRY times because progress was being made. (MAX_RETRIES applies to each call individually.)
Expand All @@ -185,10 +194,10 @@ public void testTooManyExceptions() throws IOException
testFile,
objectOpenFunction,
t -> t instanceof IOException,
MAX_RETRY
MAX_RETRY,
false
);

retryingInputStream.setNoWait();
Assert.assertThrows(
IOException.class,
() -> retryHelper(retryingInputStream)
Expand All @@ -206,16 +215,48 @@ public void testIOExceptionNotRetriableRead() throws IOException
testFile,
objectOpenFunction,
t -> t instanceof IOException || t instanceof CustomException,
MAX_RETRY
MAX_RETRY,
false
);

retryingInputStream.setNoWait();
retryHelper(retryingInputStream);

Assert.assertEquals(0, throwCustomExceptions);
Assert.assertEquals(0, throwIOExceptions);
}

@Test
public void testRetryOnExceptionWhenOpeningStream() throws Exception
{
throwCustomExceptions = 2;

doAnswer(new Answer<InputStream>()
{
int retryCount = 0;
@Override
public InputStream answer(InvocationOnMock invocation) throws Throwable
{
if (retryCount < 2) {
retryCount += 1;
throwCustomExceptions -= 1;
throw new CustomException("I am a custom retryable exception", new RuntimeException());
} else {
return (InputStream) invocation.callRealMethod();
}
}
}).when(objectOpenFunction).open(any(), anyLong());

new RetryingInputStream<>(
testFile,
objectOpenFunction,
t -> t instanceof CustomException,
MAX_RETRY,
false
);
verify(objectOpenFunction, times(3)).open(any(), anyLong());
Assert.assertEquals(0, throwCustomExceptions);
}

private void retryHelper(RetryingInputStream<File> retryingInputStream) throws IOException
{
try (DataInputStream inputStream = new DataInputStream(new GZIPInputStream(retryingInputStream))) {
Expand Down

0 comments on commit 5aa119d

Please sign in to comment.