Skip to content

Commit

Permalink
Notify fCu subscribers only when fCu will be sent to EL (Consensys#7864)
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov authored Jan 9, 2024
1 parent 82435a7 commit e4faa92
Show file tree
Hide file tree
Showing 22 changed files with 102 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
import tech.pegasys.teku.statetransition.block.BlockManager;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoice;
import tech.pegasys.teku.statetransition.forkchoice.MergeTransitionBlockValidator;
import tech.pegasys.teku.statetransition.forkchoice.StubForkChoiceNotifier;
import tech.pegasys.teku.statetransition.forkchoice.NoopForkChoiceNotifier;
import tech.pegasys.teku.statetransition.util.FutureItems;
import tech.pegasys.teku.statetransition.util.PendingPool;
import tech.pegasys.teku.statetransition.util.PoolFactory;
Expand Down Expand Up @@ -122,7 +122,7 @@ public static SyncingNodeManager create(
new InlineEventThread(),
recentChainData,
BlobSidecarManager.NOOP,
new StubForkChoiceNotifier(),
new NoopForkChoiceNotifier(),
transitionBlockValidator,
new StubMetricsSystem());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@
import tech.pegasys.teku.statetransition.forkchoice.ForkChoice;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceNotifier;
import tech.pegasys.teku.statetransition.forkchoice.MergeTransitionBlockValidator;
import tech.pegasys.teku.statetransition.forkchoice.NoopForkChoiceNotifier;
import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager;
import tech.pegasys.teku.statetransition.forkchoice.StubForkChoiceNotifier;
import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool;
import tech.pegasys.teku.statetransition.validation.SignedBlsToExecutionChangeValidator;
import tech.pegasys.teku.statetransition.validatorcache.ActiveValidatorCache;
Expand Down Expand Up @@ -204,7 +204,7 @@ private void setupStorage(
new InlineEventThread(),
recentChainData,
BlobSidecarManager.NOOP,
new StubForkChoiceNotifier(),
new NoopForkChoiceNotifier(),
new MergeTransitionBlockValidator(
spec, recentChainData, ExecutionLayerChannel.NOOP),
storageSystem.getMetricsSystem());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import tech.pegasys.teku.api.response.v1.EventType;
import tech.pegasys.teku.beacon.sync.events.SyncState;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.events.EventChannels;
import tech.pegasys.teku.infrastructure.json.JsonUtil;
import tech.pegasys.teku.infrastructure.restapi.endpoints.ListQueryParameterUtils;
Expand All @@ -50,8 +49,6 @@
import tech.pegasys.teku.spec.datastructures.operations.SignedVoluntaryExit;
import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SignedContributionAndProof;
import tech.pegasys.teku.spec.datastructures.state.Checkpoint;
import tech.pegasys.teku.spec.executionlayer.ForkChoiceUpdatedResult;
import tech.pegasys.teku.spec.executionlayer.PayloadBuildingAttributes;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceUpdatedResultSubscriber.ForkChoiceUpdatedResultNotification;
import tech.pegasys.teku.statetransition.validation.InternalValidationResult;
import tech.pegasys.teku.storage.api.ChainHeadChannel;
Expand Down Expand Up @@ -225,24 +222,19 @@ protected void onNewProposerSlashing(

protected void onForkChoiceUpdatedResult(
final ForkChoiceUpdatedResultNotification forkChoiceUpdatedResultNotification) {
final Optional<PayloadBuildingAttributes> maybePayloadAttributes =
forkChoiceUpdatedResultNotification.payloadAttributes();
// no payload attributes
if (maybePayloadAttributes.isEmpty()) {
return;
}
final SafeFuture<Optional<ForkChoiceUpdatedResult>> forkChoiceUpdatedResult =
forkChoiceUpdatedResultNotification.forkChoiceUpdatedResult();
// no fCu has been sent
if (forkChoiceUpdatedResult.isCompletedNormally() && forkChoiceUpdatedResult.join().isEmpty()) {
return;
}
final PayloadBuildingAttributes payloadAttributes = maybePayloadAttributes.get();
final SpecMilestone milestone = spec.atSlot(payloadAttributes.getProposalSlot()).getMilestone();
final PayloadAttributesEvent payloadAttributesEvent =
PayloadAttributesEvent.create(
milestone, payloadAttributes, forkChoiceUpdatedResultNotification.forkChoiceState());
notifySubscribersOfEvent(EventType.payload_attributes, payloadAttributesEvent);
forkChoiceUpdatedResultNotification
.payloadAttributes()
.ifPresent(
payloadAttributes -> {
final SpecMilestone milestone =
spec.atSlot(payloadAttributes.getProposalSlot()).getMilestone();
final PayloadAttributesEvent payloadAttributesEvent =
PayloadAttributesEvent.create(
milestone,
payloadAttributes,
forkChoiceUpdatedResultNotification.forkChoiceState());
notifySubscribersOfEvent(EventType.payload_attributes, payloadAttributesEvent);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@
import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SignedContributionAndProof;
import tech.pegasys.teku.spec.datastructures.state.Checkpoint;
import tech.pegasys.teku.spec.executionlayer.ForkChoiceState;
import tech.pegasys.teku.spec.executionlayer.ForkChoiceUpdatedResult;
import tech.pegasys.teku.spec.executionlayer.PayloadBuildingAttributes;
import tech.pegasys.teku.spec.executionlayer.PayloadStatus;
import tech.pegasys.teku.spec.util.DataStructureUtil;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceUpdatedResultSubscriber.ForkChoiceUpdatedResultNotification;
import tech.pegasys.teku.statetransition.validation.InternalValidationResult;
Expand Down Expand Up @@ -150,10 +148,7 @@ public class EventSubscriptionManagerTest {
false),
Optional.of(samplePayloadAttributes),
false,
SafeFuture.completedFuture(
Optional.of(
new ForkChoiceUpdatedResult(
PayloadStatus.VALID, Optional.of(data.randomBytes8())))));
new SafeFuture<>());

private final AsyncContext async = mock(AsyncContext.class);
private final EventChannels channels = mock(EventChannels.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import tech.pegasys.teku.statetransition.block.BlockImporter;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoice;
import tech.pegasys.teku.statetransition.forkchoice.MergeTransitionBlockValidator;
import tech.pegasys.teku.statetransition.forkchoice.StubForkChoiceNotifier;
import tech.pegasys.teku.statetransition.forkchoice.NoopForkChoiceNotifier;
import tech.pegasys.teku.storage.client.MemoryOnlyRecentChainData;
import tech.pegasys.teku.storage.client.RecentChainData;
import tech.pegasys.teku.weaksubjectivity.WeakSubjectivityFactory;
Expand Down Expand Up @@ -118,7 +118,7 @@ public void init() throws Exception {
new InlineEventThread(),
recentChainData,
BlobSidecarManager.NOOP,
new StubForkChoiceNotifier(),
new NoopForkChoiceNotifier(),
transitionBlockValidator,
metricsSystem);
localChain = BeaconChainUtil.create(spec, recentChainData, validatorKeys, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import tech.pegasys.teku.statetransition.block.BlockImporter;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoice;
import tech.pegasys.teku.statetransition.forkchoice.MergeTransitionBlockValidator;
import tech.pegasys.teku.statetransition.forkchoice.StubForkChoiceNotifier;
import tech.pegasys.teku.statetransition.forkchoice.NoopForkChoiceNotifier;
import tech.pegasys.teku.storage.client.MemoryOnlyRecentChainData;
import tech.pegasys.teku.storage.client.RecentChainData;
import tech.pegasys.teku.weaksubjectivity.WeakSubjectivityFactory;
Expand Down Expand Up @@ -103,7 +103,7 @@ public void importBlocks() throws Exception {
new InlineEventThread(),
recentChainData,
BlobSidecarManager.NOOP,
new StubForkChoiceNotifier(),
new NoopForkChoiceNotifier(),
transitionBlockValidator,
metricsSystem);
BeaconChainUtil localChain =
Expand Down Expand Up @@ -196,7 +196,7 @@ public void importBlocksMemProfiling() throws Exception {
new InlineEventThread(),
recentChainData,
BlobSidecarManager.NOOP,
new StubForkChoiceNotifier(),
new NoopForkChoiceNotifier(),
transitionBlockValidator,
metricsSystem);
BlockImporter blockImporter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import tech.pegasys.teku.statetransition.block.BlockImporter;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoice;
import tech.pegasys.teku.statetransition.forkchoice.MergeTransitionBlockValidator;
import tech.pegasys.teku.statetransition.forkchoice.StubForkChoiceNotifier;
import tech.pegasys.teku.statetransition.forkchoice.NoopForkChoiceNotifier;
import tech.pegasys.teku.storage.client.MemoryOnlyRecentChainData;
import tech.pegasys.teku.storage.client.RecentChainData;
import tech.pegasys.teku.weaksubjectivity.WeakSubjectivityFactory;
Expand Down Expand Up @@ -96,7 +96,7 @@ public void init() throws Exception {
new InlineEventThread(),
recentChainData,
BlobSidecarManager.NOOP,
new StubForkChoiceNotifier(),
new NoopForkChoiceNotifier(),
transitionBlockValidator,
new StubMetricsSystem());
localChain = BeaconChainUtil.create(spec, recentChainData, validatorKeys, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
import tech.pegasys.teku.statetransition.forkchoice.ForkChoice;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceStateProvider;
import tech.pegasys.teku.statetransition.forkchoice.MergeTransitionBlockValidator;
import tech.pegasys.teku.statetransition.forkchoice.StubForkChoiceNotifier;
import tech.pegasys.teku.statetransition.forkchoice.NoopForkChoiceNotifier;
import tech.pegasys.teku.statetransition.forkchoice.TickProcessor;
import tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator;
import tech.pegasys.teku.statetransition.validation.InternalValidationResult;
Expand Down Expand Up @@ -134,7 +134,7 @@ spec, new SignedBlockAndState(anchorBlock, anchorState)),
eventThread,
recentChainData,
blobSidecarManager,
new StubForkChoiceNotifier(),
new NoopForkChoiceNotifier(),
new ForkChoiceStateProvider(eventThread, recentChainData),
new TickProcessor(spec, recentChainData),
transitionBlockValidator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoice;
import tech.pegasys.teku.statetransition.forkchoice.MergeTransitionBlockValidator;
import tech.pegasys.teku.statetransition.forkchoice.StubForkChoiceNotifier;
import tech.pegasys.teku.statetransition.forkchoice.NoopForkChoiceNotifier;
import tech.pegasys.teku.statetransition.util.FutureItems;
import tech.pegasys.teku.statetransition.util.PendingPool;
import tech.pegasys.teku.statetransition.util.PoolFactory;
Expand Down Expand Up @@ -84,7 +84,7 @@ class AttestationManagerIntegrationTest {
new InlineEventThread(),
recentChainData,
BlobSidecarManager.NOOP,
new StubForkChoiceNotifier(),
new NoopForkChoiceNotifier(),
transitionBlockValidator,
storageSystem.getMetricsSystem());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,24 +164,22 @@ public ForkChoice(
public void onForkChoiceUpdatedResult(
final ForkChoiceUpdatedResultNotification forkChoiceUpdatedResultNotification) {
forkChoiceUpdatedResultNotification
.forkChoiceUpdatedResult()
.forkChoiceUpdatedResultFuture()
.thenAccept(
maybeForkChoiceUpdatedResult ->
maybeForkChoiceUpdatedResult.ifPresent(
forkChoiceUpdatedResult -> {
if (forkChoiceUpdatedResultNotification.isTerminalBlockCall()
&& forkChoiceUpdatedResult.getPayloadStatus().hasInvalidStatus()) {
LOG.error(
"Execution engine considers INVALID recently provided terminal block {}",
forkChoiceUpdatedResultNotification
.forkChoiceState()
.getHeadExecutionBlockHash());
return;
}
onExecutionPayloadResult(
forkChoiceUpdatedResultNotification.forkChoiceState().getHeadBlockRoot(),
forkChoiceUpdatedResult.getPayloadStatus());
}))
forkChoiceUpdatedResult -> {
if (forkChoiceUpdatedResultNotification.isTerminalBlockCall()
&& forkChoiceUpdatedResult.getPayloadStatus().hasInvalidStatus()) {
LOG.error(
"Execution engine considers INVALID recently provided terminal block {}",
forkChoiceUpdatedResultNotification
.forkChoiceState()
.getHeadExecutionBlockHash());
return;
}
onExecutionPayloadResult(
forkChoiceUpdatedResultNotification.forkChoiceState().getHeadBlockRoot(),
forkChoiceUpdatedResult.getPayloadStatus());
})
.finish(
error -> {
final String errorMessage = "Failed to update fork choice. ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import tech.pegasys.teku.spec.executionlayer.ForkChoiceState;

public interface ForkChoiceNotifier {

void onForkChoiceUpdated(ForkChoiceState forkChoiceState, Optional<UInt64> proposingSlot);

void onAttestationsDue(UInt64 slot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayloadContext;
import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel;
import tech.pegasys.teku.spec.executionlayer.ForkChoiceState;
import tech.pegasys.teku.spec.executionlayer.ForkChoiceUpdatedResult;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceUpdatedResultSubscriber.ForkChoiceUpdatedResultNotification;
import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager.ProposersDataManagerSubscriber;
import tech.pegasys.teku.storage.client.RecentChainData;
Expand Down Expand Up @@ -239,15 +238,17 @@ private void internalAttestationsDue(final UInt64 slot) {
}

private void sendForkChoiceUpdated() {
final SafeFuture<Optional<ForkChoiceUpdatedResult>> forkChoiceUpdatedResult =
forkChoiceUpdateData.send(executionLayerChannel, timeProvider.getTimeInMillis());
forkChoiceUpdatedSubscribers.deliver(
ForkChoiceUpdatedResultSubscriber::onForkChoiceUpdatedResult,
new ForkChoiceUpdatedResultNotification(
forkChoiceUpdateData.getForkChoiceState(),
forkChoiceUpdateData.getPayloadBuildingAttributes(),
forkChoiceUpdateData.hasTerminalBlockHash(),
forkChoiceUpdatedResult));
forkChoiceUpdateData
.send(executionLayerChannel, timeProvider.getTimeInMillis())
.ifPresent(
forkChoiceUpdatedResultFuture ->
forkChoiceUpdatedSubscribers.deliver(
ForkChoiceUpdatedResultSubscriber::onForkChoiceUpdatedResult,
new ForkChoiceUpdatedResultNotification(
forkChoiceUpdateData.getForkChoiceState(),
forkChoiceUpdateData.getPayloadBuildingAttributes(),
forkChoiceUpdateData.hasTerminalBlockHash(),
forkChoiceUpdatedResultFuture)));
}

private void updatePayloadAttributes(final UInt64 blockSlot) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,17 +160,17 @@ public SafeFuture<Optional<ExecutionPayloadContext>> getExecutionPayloadContext(
return executionPayloadContext;
}

public SafeFuture<Optional<ForkChoiceUpdatedResult>> send(
public Optional<SafeFuture<ForkChoiceUpdatedResult>> send(
final ExecutionLayerChannel executionLayer, final UInt64 currentTimestamp) {
if (!shouldBeSent(currentTimestamp)) {
return SafeFuture.completedFuture(Optional.empty());
return Optional.empty();
}
toBeSentAtTime = currentTimestamp.plus(RESEND_AFTER_MILLIS);

if (forkChoiceState.getHeadExecutionBlockHash().isZero()) {
LOG.debug("send - getHeadBlockHash is zero - returning empty");
executionPayloadContext.complete(Optional.empty());
return SafeFuture.completedFuture(Optional.empty());
return Optional.empty();
}

logSendingForkChoiceUpdated();
Expand All @@ -193,7 +193,7 @@ public SafeFuture<Optional<ForkChoiceUpdatedResult>> send(
payloadBuildingAttributes.orElseThrow())))
.propagateTo(executionPayloadContext);

return forkChoiceUpdatedResult.thenApply(Optional::of);
return Optional.of(forkChoiceUpdatedResult);
}

private void logSendForkChoiceUpdatedComplete(Optional<Bytes8> payloadId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ record ForkChoiceUpdatedResultNotification(
ForkChoiceState forkChoiceState,
Optional<PayloadBuildingAttributes> payloadAttributes,
boolean isTerminalBlockCall,
SafeFuture<Optional<ForkChoiceUpdatedResult>> forkChoiceUpdatedResult) {}
SafeFuture<ForkChoiceUpdatedResult> forkChoiceUpdatedResultFuture) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
import tech.pegasys.teku.statetransition.forkchoice.ForkChoice;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceNotifier;
import tech.pegasys.teku.statetransition.forkchoice.MergeTransitionBlockValidator;
import tech.pegasys.teku.statetransition.forkchoice.StubForkChoiceNotifier;
import tech.pegasys.teku.statetransition.forkchoice.NoopForkChoiceNotifier;
import tech.pegasys.teku.storage.client.MemoryOnlyRecentChainData;
import tech.pegasys.teku.storage.client.RecentChainData;
import tech.pegasys.teku.storage.storageSystem.InMemoryStorageSystemBuilder;
Expand All @@ -90,7 +90,7 @@ public class BlockImporterTest {
MemoryOnlyRecentChainData.builder().specProvider(spec).build();
private final WeakSubjectivityValidator weakSubjectivityValidator =
mock(WeakSubjectivityValidator.class);
private final ForkChoiceNotifier forkChoiceNotifier = new StubForkChoiceNotifier();
private final ForkChoiceNotifier forkChoiceNotifier = new NoopForkChoiceNotifier();
private final MergeTransitionBlockValidator transitionBlockValidator =
new MergeTransitionBlockValidator(spec, recentChainData, ExecutionLayerChannel.NOOP);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
import tech.pegasys.teku.statetransition.forkchoice.ForkChoice;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceNotifier;
import tech.pegasys.teku.statetransition.forkchoice.MergeTransitionBlockValidator;
import tech.pegasys.teku.statetransition.forkchoice.StubForkChoiceNotifier;
import tech.pegasys.teku.statetransition.forkchoice.NoopForkChoiceNotifier;
import tech.pegasys.teku.statetransition.util.FutureItems;
import tech.pegasys.teku.statetransition.util.PendingPool;
import tech.pegasys.teku.statetransition.util.PoolFactory;
Expand Down Expand Up @@ -127,7 +127,7 @@ public class BlockManagerTest {
private StorageSystem localChain;
private RecentChainData localRecentChainData = mock(RecentChainData.class);

private final ForkChoiceNotifier forkChoiceNotifier = new StubForkChoiceNotifier();
private final ForkChoiceNotifier forkChoiceNotifier = new NoopForkChoiceNotifier();
private MergeTransitionBlockValidator transitionBlockValidator;
private final BlobSidecarManager blobSidecarManager = mock(BlobSidecarManager.class);
private ForkChoice forkChoice;
Expand Down
Loading

0 comments on commit e4faa92

Please sign in to comment.