Skip to content

Commit

Permalink
Improve CommonAncestor logic for sync (Consensys#9057)
Browse files Browse the repository at this point in the history
  • Loading branch information
tbenr authored Jan 31, 2025
1 parent 7bf58da commit 0edcc30
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package tech.pegasys.teku.beacon.sync.forward.singlepeer;

import com.google.common.annotations.VisibleForTesting;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
Expand All @@ -24,63 +26,106 @@

public class CommonAncestor {
private static final Logger LOG = LogManager.getLogger();
private final RecentChainData recentChainData;
private static final int DEFAULT_MAX_ATTEMPTS = 6;
static final UInt64 BLOCK_COUNT_PER_ATTEMPT = UInt64.valueOf(16);
static final UInt64 SLOTS_TO_JUMP_BACK_EXPONENTIAL_BASE = UInt64.valueOf(100);
// We will try to find a common block downloading 16 blocks per attempt.
// We will try 6 times, each time jumping back 100 * 2^attempt slots.
// Which means we will jump back 100 slots, then 200 from previous 100,
// then 400, and so on up to 3200.
// So max attempted distance from initial slot will be 6300 slots.
// We will download max 96 blocks in total, before defaulting to firstNonFinalSlot

static final UInt64 OPTIMISTIC_HISTORY_LENGTH = UInt64.valueOf(3000);
// prysm allows a maximum range of 1000 blocks (endSlot - startSlot) due to database limitations
static final UInt64 BLOCK_COUNT = UInt64.valueOf(100);
private final RecentChainData recentChainData;
private final int maxAttempts;

public CommonAncestor(final RecentChainData recentChainData) {
this(recentChainData, DEFAULT_MAX_ATTEMPTS);
}

@VisibleForTesting
CommonAncestor(final RecentChainData recentChainData, final int maxAttempts) {
this.recentChainData = recentChainData;
this.maxAttempts = maxAttempts;
}

public SafeFuture<UInt64> getCommonAncestor(
final SyncSource peer, final UInt64 firstNonFinalSlot, final UInt64 peerHeadSlot) {
final UInt64 ourHeadSlot = recentChainData.getHeadSlot();
final UInt64 lowestHeadSlot = ourHeadSlot.min(peerHeadSlot);
if (lowestHeadSlot.isLessThan(firstNonFinalSlot.plus(OPTIMISTIC_HISTORY_LENGTH))) {
return SafeFuture.completedFuture(firstNonFinalSlot);
}
final UInt64 localNonFinalisedSlotCount = lowestHeadSlot.minus(firstNonFinalSlot);
final UInt64 firstRequestedSlot = lowestHeadSlot.minus(OPTIMISTIC_HISTORY_LENGTH);
final UInt64 lastSlot = firstRequestedSlot.plus(BLOCK_COUNT);

final UInt64 localNonFinalisedSlotCount = lowestHeadSlot.minusMinZero(firstNonFinalSlot);

LOG.debug(
"Local head slot {}. Have {} non finalized slots, "
+ "will sample ahead from {} to {}. Peer head is {}",
"Local head slot {}. Have {} non finalized slots, peer head is {}",
ourHeadSlot,
localNonFinalisedSlotCount,
firstRequestedSlot,
lastSlot,
peerHeadSlot);

final BestBlockListener blockResponseListener =
new BestBlockListener(recentChainData, firstNonFinalSlot);
return getCommonAncestor(
peer,
lowestHeadSlot.minusMinZero(BLOCK_COUNT_PER_ATTEMPT.decrement()),
firstNonFinalSlot,
0);
}

private SafeFuture<UInt64> getCommonAncestor(
final SyncSource peer,
final UInt64 firstRequestedSlot,
final UInt64 firstNonFinalSlot,
final int attempt) {
if (attempt >= maxAttempts || firstRequestedSlot.isLessThanOrEqualTo(firstNonFinalSlot)) {
return SafeFuture.completedFuture(firstNonFinalSlot);
}

final UInt64 lastSlot = firstRequestedSlot.plus(BLOCK_COUNT_PER_ATTEMPT);

LOG.debug("Sampling ahead from {} to {}.", firstRequestedSlot, lastSlot);

final BestBlockListener blockResponseListener = new BestBlockListener(recentChainData);
final PeerSyncBlockListener blockListener =
new PeerSyncBlockListener(
SafeFuture.COMPLETE, firstRequestedSlot, BLOCK_COUNT, blockResponseListener);
SafeFuture.COMPLETE,
firstRequestedSlot,
BLOCK_COUNT_PER_ATTEMPT,
blockResponseListener);

return peer.requestBlocksByRange(firstRequestedSlot, BLOCK_COUNT, blockListener)
.thenApply(__ -> blockResponseListener.getBestSlot());
return peer.requestBlocksByRange(firstRequestedSlot, BLOCK_COUNT_PER_ATTEMPT, blockListener)
.thenCompose(
__ ->
blockResponseListener
.getBestSlot()
.map(SafeFuture::completedFuture)
.orElseGet(
() ->
getCommonAncestor(
peer,
firstRequestedSlot.minusMinZero(
SLOTS_TO_JUMP_BACK_EXPONENTIAL_BASE.times(1L << attempt)),
firstNonFinalSlot,
attempt + 1)));
}

private static class BestBlockListener implements RpcResponseListener<SignedBeaconBlock> {
private final RecentChainData recentChainData;
private UInt64 bestSlot;
private Optional<UInt64> bestSlot;

BestBlockListener(final RecentChainData recentChainData, final UInt64 bestSlot) {
BestBlockListener(final RecentChainData recentChainData) {
this.recentChainData = recentChainData;
this.bestSlot = bestSlot;
this.bestSlot = Optional.empty();
}

private UInt64 getBestSlot() {
private Optional<UInt64> getBestSlot() {
return bestSlot;
}

@Override
public SafeFuture<?> onResponse(final SignedBeaconBlock block) {
if (recentChainData.containsBlock(block.getRoot())) {
bestSlot = bestSlot.max(block.getSlot());
bestSlot =
bestSlot
.map(uInt64 -> uInt64.max(block.getSlot()))
.or(() -> Optional.of(block.getSlot()));
}

return SafeFuture.COMPLETE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,26 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static tech.pegasys.teku.beacon.sync.forward.singlepeer.CommonAncestor.BLOCK_COUNT_PER_ATTEMPT;
import static tech.pegasys.teku.beacon.sync.forward.singlepeer.CommonAncestor.SLOTS_TO_JUMP_BACK_EXPONENTIAL_BASE;
import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.assertThatSafeFuture;

import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.peers.PeerStatus;

public class CommonAncestorTest extends AbstractSyncTest {

private final CommonAncestor commonAncestor = new CommonAncestor(recentChainData);
private final CommonAncestor commonAncestor = new CommonAncestor(recentChainData, 4);

@Test
void shouldNotSearchCommonAncestorWithoutSufficientLocalData()
throws ExecutionException, InterruptedException {
void shouldNotSearchCommonAncestorWithoutSufficientLocalData() {
final UInt64 firstNonFinalSlot = dataStructureUtil.randomUInt64();
final UInt64 currentLocalHead =
firstNonFinalSlot.plus(CommonAncestor.OPTIMISTIC_HISTORY_LENGTH.minus(1));
final UInt64 currentLocalHead = firstNonFinalSlot.plus(BLOCK_COUNT_PER_ATTEMPT.minus(1));
final PeerStatus status =
withPeerHeadSlot(
currentLocalHead,
Expand All @@ -43,44 +45,107 @@ void shouldNotSearchCommonAncestorWithoutSufficientLocalData()

when(recentChainData.getHeadSlot()).thenReturn(currentLocalHead);

assertThat(
commonAncestor.getCommonAncestor(peer, firstNonFinalSlot, status.getHeadSlot()).get())
.isEqualTo(firstNonFinalSlot);
assertThatSafeFuture(
commonAncestor.getCommonAncestor(peer, firstNonFinalSlot, status.getHeadSlot()))
.isCompletedWithValue(firstNonFinalSlot);

verifyNoInteractions(peer);
}

@Test
void shouldNotSearchCommonAncestorWithoutSufficientRemoteData()
throws ExecutionException, InterruptedException {
void shouldSearchStartingFromCurrentCommonHeadSlot() {
final UInt64 firstNonFinalSlot = dataStructureUtil.randomUInt64();
final UInt64 currentLocalHead =
firstNonFinalSlot.plus(CommonAncestor.OPTIMISTIC_HISTORY_LENGTH);
final UInt64 currentRemoteHead =
firstNonFinalSlot.plus(CommonAncestor.OPTIMISTIC_HISTORY_LENGTH.minus(1));

final UInt64 currentLocalHead = firstNonFinalSlot.plus(BLOCK_COUNT_PER_ATTEMPT.plus(21));
final UInt64 currentRemoteHead = firstNonFinalSlot.plus(BLOCK_COUNT_PER_ATTEMPT.plus(20));
final UInt64 syncStartSlot = currentRemoteHead.minus(BLOCK_COUNT_PER_ATTEMPT.minus(1));
final SafeFuture<Void> requestFuture = new SafeFuture<>();
when(peer.requestBlocksByRange(eq(syncStartSlot), eq(BLOCK_COUNT_PER_ATTEMPT), any()))
.thenReturn(requestFuture);
final PeerStatus status =
withPeerHeadSlot(
currentRemoteHead,
spec.computeEpochAtSlot(currentRemoteHead),
dataStructureUtil.randomBytes32());

when(recentChainData.getHeadSlot()).thenReturn(currentLocalHead);
when(recentChainData.containsBlock(any())).thenReturn(true);

final SafeFuture<UInt64> futureSlot =
commonAncestor.getCommonAncestor(peer, firstNonFinalSlot, status.getHeadSlot());

assertThat(
commonAncestor.getCommonAncestor(peer, firstNonFinalSlot, status.getHeadSlot()).get())
.isEqualTo(firstNonFinalSlot);
assertThat(futureSlot.isDone()).isFalse();

verifyAndRespond(syncStartSlot, requestFuture);

// last received slot is the best slot
assertThatSafeFuture(futureSlot)
.isCompletedWithValue(syncStartSlot.plus(BLOCK_COUNT_PER_ATTEMPT).minus(1));
}

@Test
void shouldSearchCommonAncestorWithSufficientRemoteData()
throws ExecutionException, InterruptedException {
void shouldSearchBackExponentiallyUpToMaxAttemptsTimes() {
final UInt64 firstNonFinalSlot = UInt64.valueOf(10_000);

final UInt64 currentLocalHead = firstNonFinalSlot.plus(BLOCK_COUNT_PER_ATTEMPT.plus(4_001));
final UInt64 currentRemoteHead = firstNonFinalSlot.plus(BLOCK_COUNT_PER_ATTEMPT.plus(4_000));
final UInt64 syncStartSlot = currentRemoteHead.minus(BLOCK_COUNT_PER_ATTEMPT.minus(1));

final UInt64 syncStartSlotAttempt1 = syncStartSlot;
final UInt64 syncStartSlotAttempt2 =
syncStartSlotAttempt1.minus(SLOTS_TO_JUMP_BACK_EXPONENTIAL_BASE);
final UInt64 syncStartSlotAttempt3 =
syncStartSlotAttempt2.minus(SLOTS_TO_JUMP_BACK_EXPONENTIAL_BASE.times(2));
final UInt64 syncStartSlotAttempt4 =
syncStartSlotAttempt3.minus(SLOTS_TO_JUMP_BACK_EXPONENTIAL_BASE.times(4));

final SafeFuture<Void> requestFutureAttempt1 = new SafeFuture<>();
when(peer.requestBlocksByRange(eq(syncStartSlotAttempt1), eq(BLOCK_COUNT_PER_ATTEMPT), any()))
.thenReturn(requestFutureAttempt1);
final SafeFuture<Void> requestFutureAttempt2 = new SafeFuture<>();
when(peer.requestBlocksByRange(eq(syncStartSlotAttempt2), eq(BLOCK_COUNT_PER_ATTEMPT), any()))
.thenReturn(requestFutureAttempt2);
final SafeFuture<Void> requestFutureAttempt3 = new SafeFuture<>();
when(peer.requestBlocksByRange(eq(syncStartSlotAttempt3), eq(BLOCK_COUNT_PER_ATTEMPT), any()))
.thenReturn(requestFutureAttempt3);
final SafeFuture<Void> requestFutureAttempt4 = new SafeFuture<>();
when(peer.requestBlocksByRange(eq(syncStartSlotAttempt4), eq(BLOCK_COUNT_PER_ATTEMPT), any()))
.thenReturn(requestFutureAttempt4);

final PeerStatus status =
withPeerHeadSlot(
currentRemoteHead,
spec.computeEpochAtSlot(currentRemoteHead),
dataStructureUtil.randomBytes32());

when(recentChainData.getHeadSlot()).thenReturn(currentLocalHead);
when(recentChainData.containsBlock(any())).thenReturn(false);

final SafeFuture<UInt64> futureSlot =
commonAncestor.getCommonAncestor(peer, firstNonFinalSlot, status.getHeadSlot());

assertThat(futureSlot.isDone()).isFalse();

verifyAndRespond(syncStartSlotAttempt1, requestFutureAttempt1);
verifyAndRespond(syncStartSlotAttempt2, requestFutureAttempt2);
verifyAndRespond(syncStartSlotAttempt3, requestFutureAttempt3);
verifyAndRespond(syncStartSlotAttempt4, requestFutureAttempt4);

// we ended up on the firstNonFinalSlot
assertThatSafeFuture(futureSlot).isCompletedWithValue(firstNonFinalSlot);

verifyNoMoreInteractions(peer);
}

@Test
void shouldStopSearchingIfFirstNonFinalSlotIsReached() {
final UInt64 firstNonFinalSlot = dataStructureUtil.randomUInt64();
final UInt64 currentLocalHead =
firstNonFinalSlot.plus(CommonAncestor.OPTIMISTIC_HISTORY_LENGTH.times(10));
final UInt64 currentRemoteHead =
firstNonFinalSlot.plus(CommonAncestor.OPTIMISTIC_HISTORY_LENGTH.times(9));
final UInt64 syncStartSlot = currentRemoteHead.minus(CommonAncestor.OPTIMISTIC_HISTORY_LENGTH);

final UInt64 currentLocalHead = firstNonFinalSlot.plus(BLOCK_COUNT_PER_ATTEMPT.plus(21));
final UInt64 currentRemoteHead = firstNonFinalSlot.plus(BLOCK_COUNT_PER_ATTEMPT.plus(20));
final UInt64 syncStartSlot = currentRemoteHead.minus(BLOCK_COUNT_PER_ATTEMPT.minus(1));
final SafeFuture<Void> requestFuture = new SafeFuture<>();
when(peer.requestBlocksByRange(eq(syncStartSlot), eq(CommonAncestor.BLOCK_COUNT), any()))
when(peer.requestBlocksByRange(eq(syncStartSlot), eq(BLOCK_COUNT_PER_ATTEMPT), any()))
.thenReturn(requestFuture);
final PeerStatus status =
withPeerHeadSlot(
Expand All @@ -89,27 +154,40 @@ void shouldSearchCommonAncestorWithSufficientRemoteData()
dataStructureUtil.randomBytes32());

when(recentChainData.getHeadSlot()).thenReturn(currentLocalHead);
when(recentChainData.containsBlock(any())).thenReturn(true);
when(recentChainData.containsBlock(any())).thenReturn(false);

SafeFuture<UInt64> futureSlot =
final SafeFuture<UInt64> futureSlot =
commonAncestor.getCommonAncestor(peer, firstNonFinalSlot, status.getHeadSlot());

assertThat(futureSlot.isDone()).isFalse();

verify(peer)
.requestBlocksByRange(
eq(syncStartSlot),
eq(CommonAncestor.BLOCK_COUNT),
eq(BLOCK_COUNT_PER_ATTEMPT),
blockResponseListenerArgumentCaptor.capture());

requestFuture.complete(null);

assertThatSafeFuture(
commonAncestor.getCommonAncestor(peer, firstNonFinalSlot, status.getHeadSlot()))
.isCompletedWithValue(firstNonFinalSlot);
}

private void verifyAndRespond(
final UInt64 syncStartSlot, final SafeFuture<Void> requestFutureAttempt) {
verify(peer)
.requestBlocksByRange(
eq(syncStartSlot),
eq(BLOCK_COUNT_PER_ATTEMPT),
blockResponseListenerArgumentCaptor.capture());

respondWithBlocksAtSlots(
requestFuture,
requestFutureAttempt,
blockResponseListenerArgumentCaptor.getValue(),
syncStartSlot,
CommonAncestor.BLOCK_COUNT);
BLOCK_COUNT_PER_ATTEMPT);

requestFuture.complete(null);
// last received slot is the best slot
assertThat(futureSlot.get()).isEqualTo(syncStartSlot.plus(CommonAncestor.BLOCK_COUNT).minus(1));
requestFutureAttempt.complete(null);
}
}

0 comments on commit 0edcc30

Please sign in to comment.