Skip to content

Commit

Permalink
Add blob_sidecar gossip topic (Consensys#6815)
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov authored Feb 17, 2023
1 parent 935649a commit c008729
Show file tree
Hide file tree
Showing 18 changed files with 619 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ public class SchemaDefinitionsDeneb extends SchemaDefinitionsCapella {
private final SignedBuilderBidSchema signedBuilderBidSchemaDeneb;

private final BlobSchema blobSchema;
private final BlobsSidecarSchema blobsSidecarSchema;
private final BlobSidecarSchema blobSidecarSchema;
private final BlobsSidecarSchema blobsSidecarSchema;
private final SignedBeaconBlockAndBlobsSidecarSchema signedBeaconBlockAndBlobsSidecarSchema;
private final SignedBlobSidecarSchema signedBlobSidecarSchema;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1891,11 +1891,6 @@ public BlobsSidecar randomBlobsSidecar() {
return randomBlobsSidecar(randomBytes32(), randomUInt64());
}

public BlobSidecar randomBlobSidecar() {
return randomBlobSidecar(
randomBytes32(), randomUInt64(), randomUInt64(), randomBytes32(), randomUInt64());
}

public BlobsSidecar randomBlobsSidecar(final UInt64 slot) {
return randomBlobsSidecar(randomBytes32(), slot);
}
Expand All @@ -1909,6 +1904,15 @@ public BlobsSidecar randomBlobsSidecar(final Bytes32 blockRoot, final UInt64 slo
blockRoot, slot, randomInt((int) blobsSidecarSchema.getBlobsSchema().getMaxLength()));
}

public BlobSidecar randomBlobSidecar() {
return randomBlobSidecar(randomUInt64());
}

public BlobSidecar randomBlobSidecar(final UInt64 index) {
return randomBlobSidecar(
randomBytes32(), index, randomUInt64(), randomBytes32(), randomUInt64());
}

public BlobSidecar randomBlobSidecar(
final Bytes32 blockRoot,
final UInt64 index,
Expand Down Expand Up @@ -1972,6 +1976,12 @@ public SignedBlobSidecar randomSignedBlobSidecar() {
.create(randomBlobSidecar(), randomSignature());
}

public SignedBlobSidecar randomSignedBlobSidecar(final UInt64 index) {
return SchemaDefinitionsDeneb.required(spec.getGenesisSchemaDefinitions())
.getSignedBlobSidecarSchema()
.create(randomBlobSidecar(index), randomSignature());
}

public SignedBeaconBlockAndBlobsSidecar randomConsistentSignedBeaconBlockAndBlobsSidecar() {
return randomConsistentSignedBeaconBlockAndBlobsSidecar(randomUInt64());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import tech.pegasys.teku.infrastructure.bytes.Bytes4;
import tech.pegasys.teku.infrastructure.events.EventChannels;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.gossip.BlobSidecarGossipChannel;
import tech.pegasys.teku.networking.eth2.gossip.BlockAndBlobsSidecarGossipChannel;
import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel;
import tech.pegasys.teku.networking.eth2.gossip.config.Eth2Context;
Expand Down Expand Up @@ -128,6 +129,7 @@ private synchronized void startup() {
eventChannels.subscribe(BlockGossipChannel.class, gossipForkManager::publishBlock);
eventChannels.subscribe(
BlockAndBlobsSidecarGossipChannel.class, gossipForkManager::publishBlockAndBlobsSidecar);
eventChannels.subscribe(BlobSidecarGossipChannel.class, gossipForkManager::publishBlobSidecar);
if (isCloseToInSync()) {
startGossip();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import tech.pegasys.teku.spec.datastructures.attestation.ValidateableAttestation;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBeaconBlockAndBlobsSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBlobSidecar;
import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing;
import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing;
import tech.pegasys.teku.spec.datastructures.operations.SignedBlsToExecutionChange;
Expand All @@ -87,6 +88,7 @@
* might be changed in any version in backward incompatible way
*/
public class Eth2P2PNetworkBuilder {

public static final Duration DEFAULT_ETH2_RPC_PING_INTERVAL = Duration.ofSeconds(10);
public static final int DEFAULT_ETH2_RPC_OUTSTANDING_PING_THRESHOLD = 2;
public static final Duration DEFAULT_ETH2_STATUS_UPDATE_INTERVAL = Duration.ofMinutes(5);
Expand All @@ -95,6 +97,7 @@ public class Eth2P2PNetworkBuilder {
protected EventChannels eventChannels;
protected RecentChainData recentChainData;
protected OperationProcessor<SignedBeaconBlock> gossipedBlockProcessor;
protected OperationProcessor<SignedBlobSidecar> gossipedBlobSidecarProcessor;
protected OperationProcessor<SignedBeaconBlockAndBlobsSidecar> gossipedBlockAndBlobsProcessor;
protected OperationProcessor<ValidateableAttestation> gossipedAttestationConsumer;
protected OperationProcessor<ValidateableAttestation> gossipedAggregateProcessor;
Expand Down Expand Up @@ -277,6 +280,7 @@ private GossipForkSubscriptions createSubscriptions(
recentChainData,
gossipEncoding,
gossipedBlockProcessor,
gossipedBlobSidecarProcessor,
gossipedBlockAndBlobsProcessor,
gossipedAttestationConsumer,
gossipedAggregateProcessor,
Expand Down Expand Up @@ -380,6 +384,7 @@ private void validate() {
assertNotNull("keyValueStore", keyValueStore);
assertNotNull("timeProvider", timeProvider);
assertNotNull("gossipedBlockProcessor", gossipedBlockProcessor);
assertNotNull("gossipedBlobSidecarProcessor", gossipedBlobSidecarProcessor);
assertNotNull("gossipedAttestationProcessor", gossipedAttestationConsumer);
assertNotNull("gossipedAggregateProcessor", gossipedAggregateProcessor);
assertNotNull("gossipedAttesterSlashingProcessor", gossipedAttesterSlashingConsumer);
Expand Down Expand Up @@ -440,6 +445,13 @@ public Eth2P2PNetworkBuilder gossipedBlockProcessor(
return this;
}

public Eth2P2PNetworkBuilder gossipedBlobSidecarProcessor(
final OperationProcessor<SignedBlobSidecar> blobSidecarProcessor) {
checkNotNull(blobSidecarProcessor);
this.gossipedBlobSidecarProcessor = blobSidecarProcessor;
return this;
}

public Eth2P2PNetworkBuilder gossipedBlockAndBlobsProcessor(
final OperationProcessor<SignedBeaconBlockAndBlobsSidecar> gossipedBlockAndBlobsProcessor) {
checkNotNull(gossipedBlockAndBlobsProcessor);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright ConsenSys Software Inc., 2023
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.networking.eth2.gossip;

import tech.pegasys.teku.infrastructure.events.VoidReturningChannelInterface;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBlobSidecar;

public interface BlobSidecarGossipChannel extends VoidReturningChannelInterface {

void publishBlobSidecar(SignedBlobSidecar blobSidecar);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright ConsenSys Software Inc., 2023
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.networking.eth2.gossip;

import com.google.common.annotations.VisibleForTesting;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.util.Optional;
import java.util.stream.IntStream;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding;
import tech.pegasys.teku.networking.eth2.gossip.topics.GossipTopicName;
import tech.pegasys.teku.networking.eth2.gossip.topics.OperationMilestoneValidator;
import tech.pegasys.teku.networking.eth2.gossip.topics.OperationProcessor;
import tech.pegasys.teku.networking.eth2.gossip.topics.topichandlers.Eth2TopicHandler;
import tech.pegasys.teku.networking.p2p.gossip.GossipNetwork;
import tech.pegasys.teku.networking.p2p.gossip.TopicChannel;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.SpecVersion;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBlobSidecarSchema;
import tech.pegasys.teku.spec.datastructures.state.ForkInfo;
import tech.pegasys.teku.spec.schemas.SchemaDefinitionsDeneb;
import tech.pegasys.teku.statetransition.validation.InternalValidationResult;
import tech.pegasys.teku.storage.client.RecentChainData;

public class BlobSidecarGossipManager implements GossipManager {

private final GossipNetwork gossipNetwork;
private final GossipEncoding gossipEncoding;
private final Int2ObjectMap<Eth2TopicHandler<SignedBlobSidecar>> indexToTopicHandler;

private final Int2ObjectMap<TopicChannel> indexToChannel = new Int2ObjectOpenHashMap<>();

public static BlobSidecarGossipManager create(
final RecentChainData recentChainData,
final Spec spec,
final AsyncRunner asyncRunner,
final GossipNetwork gossipNetwork,
final GossipEncoding gossipEncoding,
final ForkInfo forkInfo,
final OperationProcessor<SignedBlobSidecar> processor,
final int maxMessageSize) {
final SpecVersion forkSpecVersion = spec.atEpoch(forkInfo.getFork().getEpoch());
final int maxBlobsPerBlock =
SpecConfigDeneb.required(forkSpecVersion.getConfig()).getMaxBlobsPerBlock();
final Int2ObjectMap<Eth2TopicHandler<SignedBlobSidecar>> indexToTopicHandler =
new Int2ObjectOpenHashMap<>();
IntStream.range(0, maxBlobsPerBlock)
.forEach(
index -> {
final Eth2TopicHandler<SignedBlobSidecar> topicHandler =
createBlobSidecarTopicHandler(
index,
recentChainData,
spec,
asyncRunner,
processor,
gossipEncoding,
forkInfo,
SchemaDefinitionsDeneb.required(forkSpecVersion.getSchemaDefinitions())
.getSignedBlobSidecarSchema(),
maxMessageSize);
indexToTopicHandler.put(index, topicHandler);
});
return new BlobSidecarGossipManager(gossipNetwork, gossipEncoding, indexToTopicHandler);
}

private BlobSidecarGossipManager(
final GossipNetwork gossipNetwork,
final GossipEncoding gossipEncoding,
final Int2ObjectMap<Eth2TopicHandler<SignedBlobSidecar>> indexToTopicHandler) {
this.gossipNetwork = gossipNetwork;
this.gossipEncoding = gossipEncoding;
this.indexToTopicHandler = indexToTopicHandler;
}

public void publishBlobSidecar(final SignedBlobSidecar message) {
final int index = message.getBlobSidecar().getIndex().intValue();
Optional.ofNullable(indexToChannel.get(index))
.ifPresent(channel -> channel.gossip(gossipEncoding.encode(message)));
}

@VisibleForTesting
Eth2TopicHandler<SignedBlobSidecar> getTopicHandler(final int index) {
return indexToTopicHandler.get(index);
}

@Override
public void subscribe() {
indexToTopicHandler
.int2ObjectEntrySet()
.forEach(
entry -> {
final Eth2TopicHandler<SignedBlobSidecar> topicHandler = entry.getValue();
final TopicChannel channel =
gossipNetwork.subscribe(topicHandler.getTopic(), topicHandler);
indexToChannel.put(entry.getIntKey(), channel);
});
}

@Override
public void unsubscribe() {
indexToChannel.values().forEach(TopicChannel::close);
indexToChannel.clear();
}

@Override
public boolean isEnabledDuringOptimisticSync() {
return true;
}

private static Eth2TopicHandler<SignedBlobSidecar> createBlobSidecarTopicHandler(
final int index,
final RecentChainData recentChainData,
final Spec spec,
final AsyncRunner asyncRunner,
final OperationProcessor<SignedBlobSidecar> processor,
final GossipEncoding gossipEncoding,
final ForkInfo forkInfo,
final SignedBlobSidecarSchema gossipType,
final int maxMessageSize) {
return new Eth2TopicHandler<>(
recentChainData,
asyncRunner,
new TopicIndexAwareOperationProcessor(index, processor),
gossipEncoding,
forkInfo.getForkDigest(spec),
GossipTopicName.getBlobSidecarIndexTopicName(index),
new OperationMilestoneValidator<>(
spec,
forkInfo.getFork(),
blobSidecar -> spec.computeEpochAtSlot(blobSidecar.getBlobSidecar().getSlot())),
gossipType,
maxMessageSize);
}

private static class TopicIndexAwareOperationProcessor
implements OperationProcessor<SignedBlobSidecar> {

private final int topicIndex;
private final OperationProcessor<SignedBlobSidecar> delegate;

private TopicIndexAwareOperationProcessor(
final int topicIndex, final OperationProcessor<SignedBlobSidecar> delegate) {
this.topicIndex = topicIndex;
this.delegate = delegate;
}

@Override
public SafeFuture<InternalValidationResult> process(final SignedBlobSidecar blobSidecar) {
final int blobSidecarIndex = blobSidecar.getBlobSidecar().getIndex().intValue();
if (blobSidecarIndex != topicIndex) {
return SafeFuture.completedFuture(
InternalValidationResult.reject(
"blob sidecar with index %d does not match the topic index %d",
blobSidecarIndex, topicIndex));
}
return delegate.process(blobSidecar);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import tech.pegasys.teku.spec.datastructures.attestation.ValidateableAttestation;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBeaconBlockAndBlobsSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBlobSidecar;
import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing;
import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing;
import tech.pegasys.teku.spec.datastructures.operations.SignedBlsToExecutionChange;
Expand Down Expand Up @@ -163,6 +164,14 @@ public synchronized void publishBlock(final SignedBeaconBlock block) {
publishMessage(block.getSlot(), block, "block", GossipForkSubscriptions::publishBlock);
}

public synchronized void publishBlobSidecar(final SignedBlobSidecar blobSidecar) {
publishMessage(
blobSidecar.getBlobSidecar().getSlot(),
blobSidecar,
"blob sidecar",
GossipForkSubscriptions::publishBlobSidecar);
}

public synchronized void publishBlockAndBlobsSidecar(
final SignedBeaconBlockAndBlobsSidecar blockAndBlobsSidecar) {
publishMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import tech.pegasys.teku.spec.datastructures.attestation.ValidateableAttestation;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBeaconBlockAndBlobsSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBlobSidecar;
import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing;
import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing;
import tech.pegasys.teku.spec.datastructures.operations.SignedBlsToExecutionChange;
Expand All @@ -39,6 +40,10 @@ public interface GossipForkSubscriptions {

void publishBlock(SignedBeaconBlock block);

default void publishBlobSidecar(SignedBlobSidecar blobSidecar) {
// since Deneb
}

default void publishBlockAndBlobsSidecar(SignedBeaconBlockAndBlobsSidecar blockAndBlobsSidecar) {
// since Deneb
}
Expand Down
Loading

0 comments on commit c008729

Please sign in to comment.