Skip to content

Commit

Permalink
working altda
Browse files Browse the repository at this point in the history
  • Loading branch information
Ubuntu authored and samlaf committed Dec 11, 2024
1 parent 8194c1f commit 6179dcb
Show file tree
Hide file tree
Showing 27 changed files with 219 additions and 42 deletions.
14 changes: 12 additions & 2 deletions bin/host/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use crate::{
blobs::OnlineBlobProvider,
eigenda_blobs::OnlineEigenDABlobProvider,
kv::{
DiskKeyValueStore, LocalKeyValueStore, MemoryKeyValueStore, SharedKeyValueStore,
SplitKeyValueStore,
Expand Down Expand Up @@ -36,6 +37,8 @@ mode, the host runs the client program in a separate thread with the pre-image s
primary thread.
";

const EIGENDA_ADDRESS: &str = "127.0.0.1:31001";

/// The host binary CLI application arguments.
#[derive(Default, Parser, Serialize, Clone, Debug)]
#[command(about = ABOUT, version, styles = cli_styles())]
Expand Down Expand Up @@ -145,20 +148,27 @@ impl HostCli {
/// - A [ReqwestProvider] for the L2 node.
pub async fn create_providers(
&self,
) -> Result<(ReqwestProvider, OnlineBlobProvider, ReqwestProvider)> {
) -> Result<(ReqwestProvider, OnlineBlobProvider, OnlineEigenDABlobProvider, ReqwestProvider )> {
let blob_provider = OnlineBlobProvider::new_http(
self.l1_beacon_address.clone().ok_or(anyhow!("Beacon API URL must be set"))?,
)
.await
.map_err(|e| anyhow!("Failed to load blob provider configuration: {e}"))?;

let eigenda_blob_provider = OnlineEigenDABlobProvider::new_http(
//EIGENDA_ADDRESS.to_string(),
"http://127.0.0.1:3100".to_string(),
).await
.map_err(|e| anyhow!("Failed to load eigenda blob provider configuration: {e}"))?;

let l1_provider = Self::http_provider(
self.l1_node_address.as_ref().ok_or(anyhow!("Provider must be set"))?,
);
let l2_provider = Self::http_provider(
self.l2_node_address.as_ref().ok_or(anyhow!("L2 node address must be set"))?,
);

Ok((l1_provider, blob_provider, l2_provider))
Ok((l1_provider, blob_provider, eigenda_blob_provider, l2_provider))
}

/// Parses the CLI arguments and returns a new instance of a [SharedKeyValueStore], as it is
Expand Down
44 changes: 44 additions & 0 deletions bin/host/src/eigenda_blobs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use alloy_primitives::Bytes;
use anyhow::Ok;
use kona_derive::{errors::BlobProviderError, traits::BlobProvider};
use reqwest::{header::GetAll, Client};
use tracing::trace;

/// An online implementation of the [EigenDABlobProvider] trait.
#[derive(Debug, Clone)]
pub struct OnlineEigenDABlobProvider {
/// The base url.
base: String,
/// The inner reqwest client. Used to talk to proxy
inner: Client,
}

const GET_METHOD: &str = "get";

impl OnlineEigenDABlobProvider {
/// Creates a new instance of the [OnlineEigenDABlobProvider].
///
/// The `genesis_time` and `slot_interval` arguments are _optional_ and the
/// [OnlineEigenDABlobProvider] will attempt to load them dynamically at runtime if they are not
/// provided.
pub async fn new_http(base: String) -> Result<Self, anyhow::Error> {
let inner = Client::new();
Ok(Self { base, inner })
}

pub async fn fetch_eigenda_blob(
&self,
cert: &Bytes,
) -> Result<alloy_rlp::Bytes, reqwest::Error> {
let url = format!("{}/{}/{}", self.base, GET_METHOD, cert.slice(1..));

let raw_response = self.inner
.get(url)
.send()
.await?;

raw_response.bytes().await
}


}
31 changes: 28 additions & 3 deletions bin/host/src/fetcher/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! This module contains the [Fetcher] struct, which is responsible for fetching preimages from a
//! remote source.
use crate::{blobs::OnlineBlobProvider, kv::KeyValueStore};
use crate::{blobs::OnlineBlobProvider, eigenda_blobs::OnlineEigenDABlobProvider, kv::KeyValueStore};
use alloy_consensus::{Header, TxEnvelope, EMPTY_ROOT_HASH};
use alloy_eips::{
eip2718::Encodable2718,
Expand All @@ -22,7 +22,7 @@ use op_alloy_protocol::BlockInfo;
use op_alloy_rpc_types_engine::OpPayloadAttributes;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{error, trace, warn};
use tracing::{error, trace, warn, info};

mod precompiles;

Expand All @@ -38,6 +38,8 @@ where
l1_provider: ReqwestProvider,
/// The blob provider
blob_provider: OnlineBlobProvider,
/// The eigenda provider
eigenda_blob_provider: OnlineEigenDABlobProvider,
/// L2 chain provider.
l2_provider: ReqwestProvider,
/// L2 head
Expand All @@ -55,10 +57,11 @@ where
kv_store: Arc<RwLock<KV>>,
l1_provider: ReqwestProvider,
blob_provider: OnlineBlobProvider,
eigenda_blob_provider: OnlineEigenDABlobProvider,
l2_provider: ReqwestProvider,
l2_head: B256,
) -> Self {
Self { kv_store, l1_provider, blob_provider, l2_provider, l2_head, last_hint: None }
Self { kv_store, l1_provider, blob_provider, eigenda_blob_provider, l2_provider, l2_head, last_hint: None }
}

/// Set the last hint to be received.
Expand Down Expand Up @@ -97,6 +100,7 @@ where

/// Fetch the preimage for the given hint and insert it into the key-value store.
async fn prefetch(&self, hint: &str) -> Result<()> {
trace!(target: "fetcher", "prefetch: {hint}");
let hint = Hint::parse(hint)?;
let (hint_type, hint_data) = hint.split();
trace!(target: "fetcher", "Fetching hint: {hint_type} {hint_data}");
Expand Down Expand Up @@ -541,6 +545,27 @@ where
kv_write_lock.set(key.into(), preimage.into())?;
}
}
HintType::AltDACommitment => {
let cert = hint_data;
info!(target: "fetcher", "Fetching AltDACommitment cert: {:?}", cert);
// Fetch the blob sidecar from the blob provider.
let eigenda_blob = self
.eigenda_blob_provider.
fetch_eigenda_blob(&cert).
await.
map_err(|e| anyhow!("Failed to fetch eigenda blob: {e}"))?;

info!(target: "fetcher", "eigenda_blob len {}", eigenda_blob.len());
// Acquire a lock on the key-value store and set the preimages.
let mut kv_write_lock = self.kv_store.write().await;

// Set the preimage for the blob commitment.
kv_write_lock.set(
PreimageKey::new(*keccak256(cert), PreimageKeyType::GlobalGeneric).into(),
eigenda_blob.to_vec(),
)?;

}
}

Ok(())
Expand Down
9 changes: 7 additions & 2 deletions bin/host/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]

pub mod blobs;
pub mod eigenda_blobs;
pub mod cli;
pub use cli::{init_tracing_subscriber, HostCli};

Expand Down Expand Up @@ -34,11 +35,12 @@ pub async fn start_server(cfg: HostCli) -> Result<()> {
let hint_reader = HintReader::new(hint_chan);
let kv_store = cfg.construct_kv_store();
let fetcher = if !cfg.is_offline() {
let (l1_provider, blob_provider, l2_provider) = cfg.create_providers().await?;
let (l1_provider, blob_provider, eigenda_blob_provider, l2_provider) = cfg.create_providers().await?;
Some(Arc::new(RwLock::new(Fetcher::new(
kv_store.clone(),
l1_provider,
blob_provider,
eigenda_blob_provider,
l2_provider,
cfg.agreed_l2_head_hash,
))))
Expand Down Expand Up @@ -69,18 +71,21 @@ pub async fn start_server_and_native_client(cfg: HostCli) -> Result<i32> {
let preimage_chan = BidirectionalChannel::new()?;
let kv_store = cfg.construct_kv_store();
let fetcher = if !cfg.is_offline() {
let (l1_provider, blob_provider, l2_provider) = cfg.create_providers().await?;
let (l1_provider, blob_provider, eigenda_blob_provider, l2_provider) = cfg.create_providers().await?;
Some(Arc::new(RwLock::new(Fetcher::new(
kv_store.clone(),
l1_provider,
blob_provider,
eigenda_blob_provider,
l2_provider,
cfg.agreed_l2_head_hash,
))))
} else {
None
};

info!(target: "host", "fetcher");

// Create the server and start it.
let server_task = task::spawn(start_native_preimage_server(
kv_store,
Expand Down
2 changes: 2 additions & 0 deletions crates/derive/src/attributes/stateful.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ where
parent_beacon_root = Some(l1_header.parent_beacon_block_root.unwrap_or_default());
}

info!(target: "prepare_payload_attributes", "num tx {}", txs.len());

Ok(OpPayloadAttributes {
payload_attributes: PayloadAttributes {
timestamp: next_l2_time,
Expand Down
1 change: 1 addition & 0 deletions crates/derive/src/pipeline/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ where
PipelineErrorKind::Temporary(PipelineError::Eof) => {
trace!(target: "pipeline", "Pipeline advancing origin");
if let Err(e) = self.attributes.advance_origin().await {
warn!(target: "pipeline", "advance_origin something {:?}", e);
return StepResult::OriginAdvanceErr(e);
}
StepResult::AdvancedOrigin
Expand Down
13 changes: 8 additions & 5 deletions crates/derive/src/sources/eigenda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ where
/// The blob source.
pub ethereum_source: EthereumDataSource<C, B>,
/// The eigenda source.
pub eigenda_source: Option<EigenDABlobSource<A>>,
pub eigenda_source: EigenDABlobSource<A>,
}

impl<C, B, A> EigenDADataSource<C, B, A>
Expand All @@ -34,7 +34,7 @@ where
/// Instantiates a new [EigenDADataSource].
pub const fn new(
ethereum_source: EthereumDataSource<C, B>,
eigenda_source: Option<EigenDABlobSource<A>>,
eigenda_source: EigenDABlobSource<A>,
) -> Self {
Self { ethereum_source, eigenda_source }
}
Expand All @@ -53,13 +53,16 @@ where
// then acutally use ethereum da to fetch. items are Bytes
let item = self.ethereum_source.next(block_ref).await?;

//self.eigenda_source.next(&item).await
// just dump all the data out
todo!()
info!(target: "eth-datasource", "next item {:?}", item);

let eigenda_source_result = self.eigenda_source.next(&item).await;
info!(target: "eigenda-datasource", "eigenda_source_result {:?}", eigenda_source_result);
eigenda_source_result
}

fn clear(&mut self) {
//self.eigenda_source.clear();
self.eigenda_source.clear();
self.ethereum_source.clear();
}
}
44 changes: 36 additions & 8 deletions crates/derive/src/sources/eigenda_blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use alloy_consensus::{Transaction, TxEip4844Variant, TxEnvelope, TxType};
use alloy_eips::eip4844::IndexedBlobHash;
use alloy_primitives::{Address, Bytes};
use async_trait::async_trait;
use op_alloy_protocol::BlockInfo;

use op_alloy_protocol::{BlockInfo, Frame, DERIVATION_VERSION_0};

/// A data iterator that reads from a blob.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -43,18 +44,41 @@ where
}

fn extract_blob_data(&self, txs: Vec<TxEnvelope>) -> (Vec<EigenDABlobData>, Vec<IndexedBlobHash>) {
info!(target: "eigenda-blobsource", "extract_blob_data");
todo!()
}

/// Loads blob data into the source if it is not open.
async fn load_blobs(&mut self, altDACommitment: &Bytes) -> Result<(), BlobProviderError> {
todo!()
if self.open {
return Ok(());
}

info!(target: "eigenda-blobsource", "going to fetch through altda fetcher");
// it should use self.altda_fetcher to get the blob
let data = self.altda_fetcher.get_blob(altDACommitment).await;
match data {
Ok(data) => {
self.open = true;
let mut new_blob = data.clone();
// new_blob.truncate(data.len()-1);
let eigenda_blob = EigenDABlobData{ blob:new_blob } ;
self.data.push(eigenda_blob);

info!(target: "eigenda-blobsource", "load_blobs {:?}", self.data);

Ok(())
},
Err(e) => {
self.open = true;
return Ok(())

},
}
}

fn next_data(&mut self) -> Result<EigenDABlobData, PipelineResult<Bytes>> {
if self.open{
return Err(Err(PipelineError::Eof.temp()));
}
info!(target: "eigenda-blobsource", "self.data.is_empty() {:?}", self.data.is_empty());

if self.data.is_empty() {
return Err(Err(PipelineError::Eof.temp()));
Expand All @@ -63,17 +87,21 @@ where
}

pub async fn next(&mut self, altDACommitment: &Bytes) -> PipelineResult<Bytes> {
info!(target: "eigenda-blobsource", "next");
self.load_blobs(altDACommitment).await?;

info!(target: "eigenda-blobsource", "next 1");
let next_data = match self.next_data() {
Ok(d) => d,
Err(e) => return e,
};

info!(target: "eigenda-blobsource", "next 2");
// Decode the blob data to raw bytes.
// Otherwise, ignore blob and recurse next.
match next_data.decode() {
Ok(d) => Ok(d),
Ok(d) => {
info!(target: "eigenda-blobsource", "next 3");
Ok(d)
},
Err(_) => {
warn!(target: "blob-source", "Failed to decode blob data, skipping");
panic!()
Expand Down
7 changes: 3 additions & 4 deletions crates/derive/src/sources/eigenda_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@ use crate::errors::BlobDecodingError;

#[derive(Default, Clone, Debug)]
pub struct EigenDABlobData {
/// The blob data
pub(crate) version: Option<Bytes>,
/// The calldata
pub(crate) blob: Option<Bytes>,
pub(crate) blob: Bytes,
}

impl EigenDABlobData {
/// Decodes the blob into raw byte data.
/// Returns a [BlobDecodingError] if the blob is invalid.
pub(crate) fn decode(&self) -> Result<Bytes, BlobDecodingError> {
// where we can implement zero bytes etc.
todo!()
info!(target: "eigenda-blobdata", "decode {} {:?}", self.blob.len(), self.blob.clone());
Ok(self.blob.clone())
}

}
2 changes: 2 additions & 0 deletions crates/derive/src/stages/batch/batch_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ where
self.attempt_update()?;

if let Some(batch_validator) = self.batch_validator.as_mut() {
info!(target="batch-provider", "BatchProvider batch_validator");
batch_validator.advance_origin().await
} else if let Some(batch_queue) = self.batch_queue.as_mut() {
info!(target="batch-provider", "BatchProvider batch_queue");
batch_queue.advance_origin().await
} else {
Err(PipelineError::NotEnoughData.temp())
Expand Down
Loading

0 comments on commit 6179dcb

Please sign in to comment.