Skip to content

Commit

Permalink
fix: use engine responses to progress autoseal mining task (paradigmx…
Browse files Browse the repository at this point in the history
…yz#3727)

Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
Rjected and mattsse authored Jul 12, 2023
1 parent dbafe23 commit 6799fc3
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 29 deletions.
63 changes: 35 additions & 28 deletions crates/consensus/auto-seal/src/task.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use crate::{mode::MiningMode, Storage};
use futures_util::{future::BoxFuture, FutureExt, StreamExt};
use reth_beacon_consensus::BeaconEngineMessage;
use futures_util::{future::BoxFuture, FutureExt};
use reth_beacon_consensus::{BeaconEngineMessage, ForkchoiceStatus};
use reth_interfaces::consensus::ForkchoiceState;
use reth_primitives::{
constants::{EMPTY_RECEIPTS, EMPTY_TRANSACTIONS, ETHEREUM_BLOCK_GAS_LIMIT},
proofs,
stage::StageId,
Block, BlockBody, ChainSpec, Header, IntoRecoveredTransaction, ReceiptWithBloom,
proofs, Block, BlockBody, ChainSpec, Header, IntoRecoveredTransaction, ReceiptWithBloom,
SealedBlockWithSenders, EMPTY_OMMER_ROOT, U256,
};
use reth_provider::{CanonChainTracker, CanonStateNotificationSender, Chain, StateProviderFactory};
Expand All @@ -26,7 +24,7 @@ use std::{
};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, trace, warn};
use tracing::{debug, error, trace, warn};

/// A Future that listens for new ready transactions and puts new blocks into storage
pub struct MiningTask<Client, Pool: TransactionPool> {
Expand Down Expand Up @@ -117,7 +115,7 @@ where
let client = this.client.clone();
let chain_spec = Arc::clone(&this.chain_spec);
let pool = this.pool.clone();
let mut events = this.pipe_line_events.take();
let events = this.pipe_line_events.take();
let canon_state_notification = this.canon_state_notification.clone();

// Create the mining future that creates a block, notifies the engine that drives
Expand Down Expand Up @@ -226,29 +224,38 @@ where
};
drop(storage);

// send the new update to the engine, this will trigger the pipeline to
// download the block, execute it and store it in the database.
let (tx, _rx) = oneshot::channel();
let _ = to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs: None,
tx,
});
debug!(target: "consensus::auto", ?state, "sent fork choice update");

// wait for the pipeline to finish
if let Some(events) = events.as_mut() {
debug!(target: "consensus::auto", "waiting for finish stage event...");
// wait for the finish stage to
loop {
if let Some(PipelineEvent::Running { stage_id, .. }) =
events.next().await
{
if stage_id == StageId::Finish {
debug!(target: "consensus::auto", "received finish stage event");
break
// TODO: make this a future
// await the fcu call rx for SYNCING, then wait for a VALID response
loop {
// send the new update to the engine, this will trigger the engine
// to download and execute the block we just inserted
let (tx, rx) = oneshot::channel();
let _ = to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs: None,
tx,
});
debug!(target: "consensus::auto", ?state, "Sent fork choice update");

match rx.await.unwrap() {
Ok(fcu_response) => {
match fcu_response.forkchoice_status() {
ForkchoiceStatus::Valid => break,
ForkchoiceStatus::Invalid => {
error!(target: "consensus::auto", ?fcu_response, "Forkchoice update returned invalid response");
return None
}
ForkchoiceStatus::Syncing => {
debug!(target: "consensus::auto", ?fcu_response, "Forkchoice update returned SYNCING, waiting for VALID");
// wait for the next fork choice update
continue
}
}
}
Err(err) => {
error!(target: "consensus::auto", ?err, "Autoseal fork choice update failed");
return None
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/consensus/beacon/src/engine/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl OnForkChoiceUpdated {
}

/// Returns the determined status of the received ForkchoiceState.
pub(crate) fn forkchoice_status(&self) -> ForkchoiceStatus {
pub fn forkchoice_status(&self) -> ForkchoiceStatus {
self.forkchoice_status
}

Expand Down
1 change: 1 addition & 0 deletions crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ mod handle;
pub use handle::BeaconConsensusEngineHandle;

mod forkchoice;
pub use forkchoice::ForkchoiceStatus;
mod metrics;
pub(crate) mod prune;
pub(crate) mod sync;
Expand Down

0 comments on commit 6799fc3

Please sign in to comment.