Skip to content

Commit

Permalink
Limit size of range used when requesting deposit logs (Consensys#3182)
Browse files Browse the repository at this point in the history
Reduce request size if logs are not returned because the result set is too large or times out.
  • Loading branch information
ajsutton authored Nov 9, 2020
1 parent 79bd563 commit 00af3b8
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 32 deletions.
108 changes: 88 additions & 20 deletions pow/src/main/java/tech/pegasys/teku/pow/DepositFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;

import com.google.common.base.Throwables;
import java.math.BigInteger;
import java.net.SocketTimeoutException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
Expand All @@ -37,12 +39,15 @@
import tech.pegasys.teku.pow.api.Eth1EventsChannel;
import tech.pegasys.teku.pow.contract.DepositContract;
import tech.pegasys.teku.pow.contract.DepositContract.DepositEventEventResponse;
import tech.pegasys.teku.pow.contract.RejectedRequestException;
import tech.pegasys.teku.pow.event.Deposit;
import tech.pegasys.teku.pow.event.DepositsFromBlockEvent;
import tech.pegasys.teku.util.config.Constants;

public class DepositFetcher {

static final int DEFAULT_BATCH_SIZE = 500_000;

private static final Logger LOG = LogManager.getLogger();

private final Eth1Provider eth1Provider;
Expand Down Expand Up @@ -73,45 +78,73 @@ public synchronized SafeFuture<Void> fetchDepositsInRange(
fromBlockNumber,
toBlockNumber);

return getDepositEventsInRangeFromContract(fromBlockNumber, toBlockNumber)
.thenApply(this::groupDepositEventResponsesByBlockHash)
.thenCompose(
eventResponsesByBlockHash ->
postDepositEvents(
getListOfEthBlockFutures(eventResponsesByBlockHash.keySet()),
eventResponsesByBlockHash,
fromBlockNumber,
toBlockNumber));
final DepositFetchState fetchState = new DepositFetchState(fromBlockNumber, toBlockNumber);
return sendNextBatchRequest(fetchState);
}

private SafeFuture<List<DepositContract.DepositEventEventResponse>>
getDepositEventsInRangeFromContract(BigInteger fromBlockNumber, BigInteger toBlockNumber) {

DefaultBlockParameter fromBlock = DefaultBlockParameter.valueOf(fromBlockNumber);
DefaultBlockParameter toBlock = DefaultBlockParameter.valueOf(toBlockNumber);

return depositContract
.depositEventInRange(fromBlock, toBlock)
private SafeFuture<Void> sendNextBatchRequest(final DepositFetchState fetchState) {
final BigInteger nextBatchEnd = fetchState.getNextBatchEnd();
LOG.info(
"Requesting deposits between {} and {}. Batch size: {}",
fetchState.nextBatchStart,
nextBatchEnd,
fetchState.batchSize);
return processDepositsInBatch(fetchState.nextBatchStart, nextBatchEnd)
.exceptionallyCompose(
(err) -> {
LOG.debug(
"Failed to request deposit events for block numbers in the range ({}, {}). Retrying.",
fromBlockNumber,
toBlockNumber,
fetchState.nextBatchStart,
nextBatchEnd,
err);

final Throwable rootCause = Throwables.getRootCause(err);
if (rootCause instanceof SocketTimeoutException
|| rootCause instanceof RejectedRequestException) {
LOG.debug("Request timed out or was rejected, reduce the batch size and retry");
fetchState.reduceBatchSize();
}

return asyncRunner.runAfterDelay(
() -> getDepositEventsInRangeFromContract(fromBlockNumber, toBlockNumber),
() -> sendNextBatchRequest(fetchState),
Constants.ETH1_DEPOSIT_REQUEST_RETRY_TIMEOUT,
TimeUnit.SECONDS);
})
.thenCompose(
__ -> {
fetchState.moveToNextBatch();
LOG.trace("Batch request completed. Done? {}", fetchState.isDone());
if (fetchState.isDone()) {
return SafeFuture.COMPLETE;
} else {
return sendNextBatchRequest(fetchState);
}
});
}

private SafeFuture<Void> processDepositsInBatch(
final BigInteger fromBlockNumber, final BigInteger toBlockNumber) {

return depositContract
.depositEventInRange(
DefaultBlockParameter.valueOf(fromBlockNumber),
DefaultBlockParameter.valueOf(toBlockNumber))
.thenApply(this::groupDepositEventResponsesByBlockHash)
.thenCompose(
eventResponsesByBlockHash ->
postDepositEvents(
getListOfEthBlockFutures(eventResponsesByBlockHash.keySet()),
eventResponsesByBlockHash,
fromBlockNumber,
toBlockNumber));
}

private SafeFuture<Void> postDepositEvents(
List<SafeFuture<EthBlock.Block>> blockRequests,
Map<BlockNumberAndHash, List<DepositContract.DepositEventEventResponse>> depositEventsByBlock,
BigInteger fromBlock,
BigInteger toBlock) {
LOG.trace("Posting deposit events for {} blocks", depositEventsByBlock.size());
BigInteger from = fromBlock;
// First process completed requests using iteration.
// Avoid StackOverflowException when there is a long string of requests already completed.
Expand Down Expand Up @@ -189,6 +222,41 @@ private void postDeposits(DepositsFromBlockEvent event) {
eth1EventsChannel.onDepositsFromBlock(event);
}

private static class DepositFetchState {
// Both inclusive
BigInteger nextBatchStart;

final BigInteger lastBlock;
int batchSize = DEFAULT_BATCH_SIZE;

public DepositFetchState(final BigInteger fromBlockNumber, final BigInteger toBlockNumber) {
this.nextBatchStart = fromBlockNumber;
this.lastBlock = toBlockNumber;
}

public void moveToNextBatch() {
nextBatchStart = getNextBatchEnd().add(BigInteger.ONE);
if (batchSize < DEFAULT_BATCH_SIZE) {
// Grow the batch size slowly as we may be past a large blob of logs that caused trouble
// +1 to guarantee it grows by at least 1
batchSize = Math.min(DEFAULT_BATCH_SIZE, (int) (batchSize * 1.1 + 1));
}
}

private BigInteger getNextBatchEnd() {
return lastBlock.min(nextBatchStart.add(BigInteger.valueOf(batchSize)));
}

public boolean isDone() {
return nextBatchStart.compareTo(lastBlock) >= 0;
}

public void reduceBatchSize() {
batchSize = Math.max(1, batchSize / 2);
LOG.debug("Reduced batch size to {}", batchSize);
}
}

private static class BlockNumberAndHash implements Comparable<BlockNumberAndHash> {
private static final Comparator<BlockNumberAndHash> COMPARATOR =
Comparator.comparing(BlockNumberAndHash::getNumber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,13 @@ private void fetchLatestSubscriptionDeposits() {
}

private synchronized void fetchLatestSubscriptionDepositsOverRange() {
final BigInteger toBlock;
final BigInteger fromBlock;

if (active || latestCanonicalBlockNumber.equals(latestSuccessfullyQueriedBlock)) {
return;
}
active = true;

fromBlock = latestSuccessfullyQueriedBlock.add(BigInteger.ONE);
toBlock = latestCanonicalBlockNumber.min(fromBlock.add(BigInteger.valueOf(500_000)));
final BigInteger fromBlock = latestSuccessfullyQueriedBlock.add(BigInteger.ONE);
final BigInteger toBlock = latestCanonicalBlockNumber;

depositFetcher
.fetchDepositsInRange(fromBlock, toBlock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,19 @@ public SafeFuture<List<DepositEventEventResponse>> depositEventInRange(
.ethGetLogs(filter)
.sendAsync()
.thenApply(
logs ->
logs.getLogs().stream()
.map(log -> (Log) log.get())
.map(this::convertLogToDepositEventEventResponse)
.collect(Collectors.toList())));
logs -> {
if (logs.getLogs() == null) {
// We got a response from the node but it didn't include even an empty list
// of logs. This happens with Infura when more than 10,000 log entries match
// so treat as an explicit rejection of the request to allow the requested block
// range to be reduced.
throw new RejectedRequestException("No logs returned by ETH1 node");
}
return logs.getLogs().stream()
.map(log -> (Log) log.get())
.map(this::convertLogToDepositEventEventResponse)
.collect(Collectors.toList());
}));
}

private DepositEventEventResponse convertLogToDepositEventEventResponse(final Log log) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2020 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.pow.contract;

public class RejectedRequestException extends RuntimeException {

public RejectedRequestException(final String message) {
super(message);
}
}
114 changes: 112 additions & 2 deletions pow/src/test/java/tech/pegasys/teku/pow/DepositsFetcherTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,18 @@

package tech.pegasys.teku.pow;

import static java.util.Collections.emptyList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.refEq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static tech.pegasys.teku.pow.DepositFetcher.DEFAULT_BATCH_SIZE;

import com.google.common.primitives.Longs;
import java.math.BigInteger;
Expand All @@ -26,14 +33,15 @@
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.InOrder;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.methods.response.EthBlock;
import org.web3j.protocol.core.methods.response.Log;
import org.web3j.utils.Numeric;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.StubAsyncRunner;
import tech.pegasys.teku.pow.api.Eth1EventsChannel;
import tech.pegasys.teku.pow.contract.DepositContract;
import tech.pegasys.teku.pow.contract.RejectedRequestException;
import tech.pegasys.teku.pow.event.DepositsFromBlockEvent;

public class DepositsFetcherTest {
Expand All @@ -42,7 +50,7 @@ public class DepositsFetcherTest {
private final Eth1EventsChannel eth1EventsChannel = mock(Eth1EventsChannel.class);
private final DepositContract depositContract = mock(DepositContract.class);
private final Eth1BlockFetcher eth1BlockFetcher = mock(Eth1BlockFetcher.class);
private final AsyncRunner asyncRunner = new StubAsyncRunner();
private final StubAsyncRunner asyncRunner = new StubAsyncRunner();

private final DepositFetcher depositFetcher =
new DepositFetcher(
Expand Down Expand Up @@ -75,6 +83,108 @@ void depositsInConsecutiveBlocks() {
inOrder.verify(eth1BlockFetcher).fetch(BigInteger.valueOf(6), BigInteger.valueOf(10));
}

@Test
void shouldUseMultipleBatchesWhenRangeIsLarge() {
final BigInteger fromBlockNumber = BigInteger.ZERO;
final BigInteger toBlockNumber = BigInteger.valueOf(3 * DEFAULT_BATCH_SIZE - 10);

final BigInteger batch1End = fromBlockNumber.add(BigInteger.valueOf(DEFAULT_BATCH_SIZE));
final BigInteger batch2Start = batch1End.add(BigInteger.ONE);
final BigInteger batch2End = batch2Start.add(BigInteger.valueOf(DEFAULT_BATCH_SIZE));
final BigInteger batch3Start = batch2End.add(BigInteger.ONE);
final SafeFuture<List<DepositContract.DepositEventEventResponse>> batch1Response =
new SafeFuture<>();
final SafeFuture<List<DepositContract.DepositEventEventResponse>> batch2Response =
new SafeFuture<>();
final SafeFuture<List<DepositContract.DepositEventEventResponse>> batch3Response =
new SafeFuture<>();

when(depositContract.depositEventInRange(any(), any()))
.thenReturn(batch1Response)
.thenReturn(batch2Response)
.thenReturn(batch3Response);

final SafeFuture<Void> result =
depositFetcher.fetchDepositsInRange(fromBlockNumber, toBlockNumber);
assertThat(result).isNotDone();

verify(depositContract)
.depositEventInRange(
refEq(DefaultBlockParameter.valueOf(fromBlockNumber)),
refEq(DefaultBlockParameter.valueOf(batch1End)));
verifyNoMoreInteractions(depositContract);

batch1Response.complete(emptyList());
verify(depositContract)
.depositEventInRange(
refEq(DefaultBlockParameter.valueOf(batch2Start)),
refEq(DefaultBlockParameter.valueOf(batch2End)));
verifyNoMoreInteractions(depositContract);

batch2Response.complete(emptyList());
verify(depositContract)
.depositEventInRange(
refEq(DefaultBlockParameter.valueOf(batch3Start)),
refEq(DefaultBlockParameter.valueOf(toBlockNumber)));
verifyNoMoreInteractions(depositContract);

batch3Response.complete(emptyList());
verifyNoMoreInteractions(depositContract);
}

@Test
void shouldReduceBatchSizeWhenRequestIsRejected() {
final BigInteger fromBlockNumber = BigInteger.ZERO;
final BigInteger toBlockNumber = BigInteger.valueOf(DEFAULT_BATCH_SIZE + 1000);

final SafeFuture<List<DepositContract.DepositEventEventResponse>> request1Response =
new SafeFuture<>();
final SafeFuture<List<DepositContract.DepositEventEventResponse>> request2Response =
new SafeFuture<>();
final SafeFuture<List<DepositContract.DepositEventEventResponse>> request3Response =
new SafeFuture<>();

when(depositContract.depositEventInRange(any(), any()))
.thenReturn(request1Response)
.thenReturn(request2Response)
.thenReturn(request3Response);

final SafeFuture<Void> result =
depositFetcher.fetchDepositsInRange(fromBlockNumber, toBlockNumber);
assertThat(result).isNotDone();

// First tries to request a full size batch
verify(depositContract)
.depositEventInRange(
refEq(DefaultBlockParameter.valueOf(fromBlockNumber)),
refEq(DefaultBlockParameter.valueOf(BigInteger.valueOf(DEFAULT_BATCH_SIZE))));
verifyNoMoreInteractions(depositContract);

// But there are too many results
request1Response.completeExceptionally(new RejectedRequestException("Nah mate"));

// So it halves the batch size and retries
asyncRunner.executeQueuedActions();
final BigInteger endSuccessfulRange =
fromBlockNumber.add(BigInteger.valueOf(DEFAULT_BATCH_SIZE / 2));
verify(depositContract)
.depositEventInRange(
refEq(DefaultBlockParameter.valueOf(fromBlockNumber)),
refEq(DefaultBlockParameter.valueOf(endSuccessfulRange)));
verifyNoMoreInteractions(depositContract);

// And that works
request2Response.complete(emptyList());

// So it increases the batch size by 10% to avoid getting stuck with a very small batch size
asyncRunner.executeQueuedActions();
verify(depositContract)
.depositEventInRange(
refEq(DefaultBlockParameter.valueOf(endSuccessfulRange.add(BigInteger.ONE))),
refEq(DefaultBlockParameter.valueOf(toBlockNumber)));
verifyNoMoreInteractions(depositContract);
}

private void mockBlockForEth1Provider(String blockHash, long blockNumber, long timestamp) {
EthBlock.Block block = mock(EthBlock.Block.class);
when(block.getTimestamp()).thenReturn(BigInteger.valueOf(timestamp));
Expand Down

0 comments on commit 00af3b8

Please sign in to comment.