Skip to content

Commit

Permalink
chore(deps): bump http, hyper etc. to 1.0; jsonrpsee 0.23 (paradigmxy…
Browse files Browse the repository at this point in the history
…z#7018)

Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
DaniPopes and mattsse authored Jun 11, 2024
1 parent e9d8cda commit 55317eb
Show file tree
Hide file tree
Showing 26 changed files with 545 additions and 599 deletions.
779 changes: 367 additions & 412 deletions Cargo.lock

Large diffs are not rendered by default.

58 changes: 34 additions & 24 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ members = [
"examples/rpc-db/",
"examples/txpool-tracing/",
"testing/ef-tests/",
"testing/testing-utils"
"testing/testing-utils",
]
default-members = ["bin/reth"]

Expand Down Expand Up @@ -330,9 +330,15 @@ reth-trie-parallel = { path = "crates/trie/parallel" }
reth-trie-types = { path = "crates/trie/types" }

# revm
revm = { version = "9.0.0", features = [ "std", "secp256k1", "blst", ], default-features = false }
revm-primitives = { version = "4.0.0", features = [ "std", ], default-features = false }
revm-inspectors = { git = "https://github.com/paradigmxyz/evm-inspectors", rev = "53aa2b2" }
revm = { version = "9.0.0", features = [
"std",
"secp256k1",
"blst",
], default-features = false }
revm-primitives = { version = "4.0.0", features = [
"std",
], default-features = false }
revm-inspectors = { git = "https://github.com/paradigmxyz/revm-inspectors", rev = "5e3058a" }

# eth
alloy-chains = "0.1.15"
Expand All @@ -341,21 +347,21 @@ alloy-dyn-abi = "0.7.2"
alloy-sol-types = "0.7.2"
alloy-rlp = "0.3.4"
alloy-trie = "0.4"
alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-rpc-types-anvil = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-rpc-types-trace = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-rpc-types-engine = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-rpc-types-beacon = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-genesis = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-node-bindings = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-provider = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93", default-features = false, features = [
alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-rpc-types-anvil = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-rpc-types-trace = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-rpc-types-engine = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-rpc-types-beacon = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-genesis = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-node-bindings = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-provider = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d", default-features = false, features = [
"reqwest",
] }
alloy-eips = { git = "https://github.com/alloy-rs/alloy", default-features = false, rev = "cc68b93" }
alloy-signer = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-signer-wallet = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-network = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-eips = { git = "https://github.com/alloy-rs/alloy", default-features = false, rev = "14ed25d" }
alloy-signer = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-signer-wallet = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-network = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }

# misc
auto_impl = "1"
Expand Down Expand Up @@ -415,21 +421,25 @@ async-trait = "0.1.68"
futures = "0.3.26"
pin-project = "1.0.12"
futures-util = "0.3.25"
hyper = "0.14.25"
hyper = "1.3"
hyper-util = "0.1.5"
reqwest = { version = "0.12", default-features = false }
tower = "0.4"
tower-http = "0.4"
http = "0.2.8"
http-body = "0.4.5"
tower-http = "0.5"

# p2p
discv5 = "0.6.0"
igd-next = "0.14.3"

# rpc
jsonrpsee = "0.22"
jsonrpsee-core = "0.22"
jsonrpsee-types = "0.22"
jsonrpsee = "0.23"
jsonrpsee-core = "0.23"
jsonrpsee-types = "0.23"
jsonrpsee-http-client = "0.23"

# http
http = "1.0"
http-body = "1.0"

# crypto
secp256k1 = { version = "0.28", default-features = false, features = [
Expand Down
3 changes: 2 additions & 1 deletion crates/consensus/debug-client/src/providers/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use alloy_eips::BlockNumberOrTag;
use alloy_provider::{Provider, ProviderBuilder};
use futures::StreamExt;
use reth_node_core::rpc::types::RichBlock;
use reth_rpc_types::BlockTransactionsKind;
use tokio::sync::mpsc::Sender;

/// Block provider that fetches new blocks from an RPC endpoint using a websocket connection.
Expand Down Expand Up @@ -32,7 +33,7 @@ impl BlockProvider for RpcBlockProvider {

while let Some(block) = stream.next().await {
let full_block = ws_provider
.get_block_by_hash(block.header.hash.unwrap(), true)
.get_block_by_hash(block.header.hash.unwrap(), BlockTransactionsKind::Full)
.await
.expect("failed to get block")
.expect("block not found");
Expand Down
4 changes: 3 additions & 1 deletion crates/node-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ serde.workspace = true
serde_json.workspace = true

# http/rpc
hyper.workspace = true
http.workspace = true
jsonrpsee.workspace = true
tower.workspace = true

# tracing
tracing.workspace = true
Expand Down
54 changes: 29 additions & 25 deletions crates/node-core/src/metrics/prometheus_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
use crate::metrics::version_metrics::register_version_metrics;
use eyre::WrapErr;
use hyper::{
service::{make_service_fn, service_fn},
Body, Request, Response, Server,
};
use http::Response;
use metrics::describe_gauge;
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use metrics_util::layers::{PrefixLayer, Stack};
Expand Down Expand Up @@ -64,29 +61,36 @@ async fn start_endpoint<F: Hook + 'static>(
hook: Arc<F>,
task_executor: TaskExecutor,
) -> eyre::Result<()> {
let make_svc = make_service_fn(move |_| {
let handle = handle.clone();
let hook = Arc::clone(&hook);
async move {
Ok::<_, Infallible>(service_fn(move |_: Request<Body>| {
let listener =
tokio::net::TcpListener::bind(listen_addr).await.wrap_err("Could not bind to address")?;

task_executor.spawn_with_graceful_shutdown_signal(|signal| async move {
let mut shutdown = signal.ignore_guard();
loop {
let io = tokio::select! {
res = listener.accept() => match res {
Ok((stream, _remote_addr)) => stream,
Err(err) => {
tracing::error!(%err, "failed to accept connection");
continue;
}
},
_ = &mut shutdown => break,
};

let handle = handle.clone();
let hook = hook.clone();
let service = tower::service_fn(move |_| {
(hook)();
let metrics = handle.render();
async move { Ok::<_, Infallible>(Response::new(Body::from(metrics))) }
}))
}
});

let server =
Server::try_bind(&listen_addr).wrap_err("Could not bind to address")?.serve(make_svc);

task_executor.spawn_with_graceful_shutdown_signal(move |signal| async move {
if let Err(error) = server
.with_graceful_shutdown(async move {
let _ = signal.await;
})
.await
{
tracing::error!(%error, "metrics endpoint crashed")
async move { Ok::<_, Infallible>(Response::new(metrics)) }
});

if let Err(error) =
jsonrpsee::server::serve_with_graceful_shutdown(io, service, &mut shutdown).await
{
tracing::error!(%error, "metrics endpoint crashed")
}
}
});

Expand Down
5 changes: 1 addition & 4 deletions crates/optimism/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ revm-primitives.workspace = true

# async
async-trait.workspace = true
hyper.workspace = true
reqwest = { workspace = true, default-features = false, features = [
"rustls-tls-native-roots",
] }
reqwest = { workspace = true, features = ["rustls-tls-native-roots"] }
tracing.workspace = true

# misc
Expand Down
3 changes: 0 additions & 3 deletions crates/optimism/node/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ use std::sync::{atomic::AtomicUsize, Arc};
/// Error type when interacting with the Sequencer
#[derive(Debug, thiserror::Error)]
pub enum SequencerRpcError {
/// Wrapper around a [`hyper::Error`].
#[error(transparent)]
HyperError(#[from] hyper::Error),
/// Wrapper around an [`reqwest::Error`].
#[error(transparent)]
HttpError(#[from] reqwest::Error),
Expand Down
22 changes: 11 additions & 11 deletions crates/rpc/ipc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ where
/// async fn run_server() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
/// let server = Builder::default().build("/tmp/my-uds");
/// let mut module = RpcModule::new(());
/// module.register_method("say_hello", |_, _| "lo")?;
/// module.register_method("say_hello", |_, _, _| "lo")?;
/// let handle = server.start(module).await?;
///
/// // In this example we don't care about doing shutdown so let's it run forever.
Expand Down Expand Up @@ -390,7 +390,7 @@ where
let rpc_service = self.rpc_middleware.service(RpcService::new(
self.inner.methods.clone(),
max_response_body_size,
self.inner.conn_id as usize,
self.inner.conn_id.into(),
cfg,
));
// an ipc connection needs to handle read+write concurrently
Expand Down Expand Up @@ -896,7 +896,7 @@ mod tests {
let endpoint = dummy_endpoint();
let server = Builder::default().max_response_body_size(100).build(&endpoint);
let mut module = RpcModule::new(());
module.register_method("anything", |_, _| "a".repeat(101)).unwrap();
module.register_method("anything", |_, _, _| "a".repeat(101)).unwrap();
let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped());

Expand All @@ -911,7 +911,7 @@ mod tests {
let endpoint = dummy_endpoint();
let server = Builder::default().max_request_body_size(100).build(&endpoint);
let mut module = RpcModule::new(());
module.register_method("anything", |_, _| "succeed").unwrap();
module.register_method("anything", |_, _, _| "succeed").unwrap();
let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped());

Expand Down Expand Up @@ -939,7 +939,7 @@ mod tests {
let endpoint = dummy_endpoint();
let server = Builder::default().max_connections(2).build(&endpoint);
let mut module = RpcModule::new(());
module.register_method("anything", |_, _| "succeed").unwrap();
module.register_method("anything", |_, _, _| "succeed").unwrap();
let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped());

Expand Down Expand Up @@ -973,7 +973,7 @@ mod tests {
let server = Builder::default().build(&endpoint);
let mut module = RpcModule::new(());
let msg = r#"{"jsonrpc":"2.0","id":83,"result":"0x7a69"}"#;
module.register_method("eth_chainId", move |_, _| msg).unwrap();
module.register_method("eth_chainId", move |_, _, _| msg).unwrap();
let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped());

Expand All @@ -987,7 +987,7 @@ mod tests {
let endpoint = dummy_endpoint();
let server = Builder::default().build(&endpoint);
let mut module = RpcModule::new(());
module.register_method("anything", |_, _| "ok").unwrap();
module.register_method("anything", |_, _, _| "ok").unwrap();
let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped());

Expand All @@ -1013,7 +1013,7 @@ mod tests {
let server = Builder::default().build(&endpoint);
let mut module = RpcModule::new(());
let msg = r#"{"admin":"1.0","debug":"1.0","engine":"1.0","eth":"1.0","ethash":"1.0","miner":"1.0","net":"1.0","rpc":"1.0","txpool":"1.0","web3":"1.0"}"#;
module.register_method("rpc_modules", move |_, _| msg).unwrap();
module.register_method("rpc_modules", move |_, _, _| msg).unwrap();
let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped());

Expand All @@ -1036,7 +1036,7 @@ mod tests {
"subscribe_hello",
"s_hello",
"unsubscribe_hello",
|_, pending, tx| async move {
|_, pending, tx, _| async move {
let rx = tx.subscribe();
let stream = BroadcastStream::new(rx);
pipe_from_stream_with_bounded_buffer(pending, stream).await?;
Expand Down Expand Up @@ -1088,8 +1088,8 @@ mod tests {
let mut module = RpcModule::new(());
let goodbye_msg = r#"{"jsonrpc":"2.0","id":1,"result":"goodbye"}"#;
let hello_msg = r#"{"jsonrpc":"2.0","id":2,"result":"hello"}"#;
module.register_method("say_hello", move |_, _| hello_msg).unwrap();
module.register_method("say_goodbye", move |_, _| goodbye_msg).unwrap();
module.register_method("say_hello", move |_, _, _| hello_msg).unwrap();
module.register_method("say_goodbye", move |_, _, _| goodbye_msg).unwrap();
let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped());

Expand Down
37 changes: 12 additions & 25 deletions crates/rpc/ipc/src/server/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use jsonrpsee::{
IdProvider,
},
types::{error::reject_too_many_subscriptions, ErrorCode, ErrorObject, Request},
BoundedSubscriptions, ConnectionDetails, MethodCallback, MethodResponse, MethodSink, Methods,
SubscriptionState,
BoundedSubscriptions, ConnectionId, Extensions, MethodCallback, MethodResponse, MethodSink,
Methods, SubscriptionState,
};
use std::sync::Arc;

/// JSON-RPC service middleware.
#[derive(Clone, Debug)]
pub struct RpcService {
conn_id: usize,
conn_id: ConnectionId,
methods: Methods,
max_response_body_size: usize,
cfg: RpcServiceCfg,
Expand All @@ -39,7 +39,7 @@ impl RpcService {
pub(crate) const fn new(
methods: Methods,
max_response_body_size: usize,
conn_id: usize,
conn_id: ConnectionId,
cfg: RpcServiceCfg,
) -> Self {
Self { methods, max_response_body_size, conn_id, cfg }
Expand All @@ -58,38 +58,25 @@ impl<'a> RpcServiceT<'a> for RpcService {
let params = req.params();
let name = req.method_name();
let id = req.id().clone();
let extensions = Extensions::new();

match self.methods.method_with_name(name) {
None => {
let rp = MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound));
ResponseFuture::ready(rp)
}
Some((_name, method)) => match method {
MethodCallback::Async(callback) => {
let params = params.into_owned();
let id = id.into_owned();

let fut = (callback)(id, params, conn_id, max_response_body_size);
ResponseFuture::future(fut)
MethodCallback::Sync(callback) => {
let rp = (callback)(id, params, max_response_body_size, extensions);
ResponseFuture::ready(rp)
}
MethodCallback::AsyncWithDetails(callback) => {
MethodCallback::Async(callback) => {
let params = params.into_owned();
let id = id.into_owned();

// Note: Add the `Request::extensions` to the connection details when available
// here.
let fut = (callback)(
id,
params,
ConnectionDetails::_new(conn_id),
max_response_body_size,
);
let fut = (callback)(id, params, conn_id, max_response_body_size, extensions);
ResponseFuture::future(fut)
}
MethodCallback::Sync(callback) => {
let rp = (callback)(id, params, max_response_body_size);
ResponseFuture::ready(rp)
}
MethodCallback::Subscription(callback) => {
let RpcServiceCfg::CallsAndSubscriptions {
bounded_subscriptions,
Expand All @@ -110,7 +97,7 @@ impl<'a> RpcServiceT<'a> for RpcService {
subscription_permit: p,
};

let fut = callback(id.clone(), params, sink, conn_state);
let fut = callback(id.clone(), params, sink, conn_state, extensions);
ResponseFuture::future(fut)
} else {
let max = bounded_subscriptions.max();
Expand All @@ -129,7 +116,7 @@ impl<'a> RpcServiceT<'a> for RpcService {
return ResponseFuture::ready(rp);
};

let rp = callback(id, params, conn_id, max_response_body_size);
let rp = callback(id, params, conn_id, max_response_body_size, extensions);
ResponseFuture::ready(rp)
}
},
Expand Down
Loading

0 comments on commit 55317eb

Please sign in to comment.