Skip to content

Commit

Permalink
swarm: add ability to run nodes using multithreaded runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
bmwill committed Jun 2, 2022
1 parent 3c5e497 commit cadb941
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 12 deletions.
2 changes: 1 addition & 1 deletion crates/sui-swarm/src/memory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! runtime.
mod node;
pub use node::Node;
pub use node::{Node, RuntimeType};

mod swarm;
pub use swarm::{Swarm, SwarmBuilder};
56 changes: 45 additions & 11 deletions crates/sui-swarm/src/memory/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use tracing::{error, trace};
pub struct Node {
thread: Option<Container>,
config: NodeConfig,
runtime_type: RuntimeType,
}

impl Node {
Expand All @@ -33,6 +34,7 @@ impl Node {
Self {
thread: None,
config,
runtime_type: RuntimeType::SingleThreaded,
}
}

Expand All @@ -45,16 +47,16 @@ impl Node {
/// up.
pub fn spawn(&mut self) -> Result<tokio::sync::oneshot::Receiver<()>> {
trace!(name =% self.name(), "starting in-memory node");
let (startup_reciever, node_handle) = Container::spawn(self.config.clone());
let (startup_reciever, node_handle) =
Container::spawn(self.config.clone(), self.runtime_type);
self.thread = Some(node_handle);
Ok(startup_reciever)
}

/// Start this Node, waiting until its completely started up.
pub async fn start(&mut self) -> Result<()> {
let (startup_reciever, node_handle) = Container::spawn(self.config.clone());
let startup_reciever = self.spawn()?;
startup_reciever.await?;
self.thread = Some(node_handle);
Ok(())
}

Expand Down Expand Up @@ -129,21 +131,46 @@ impl Drop for Container {

impl Container {
/// Spawn a new Node.
pub fn spawn(config: NodeConfig) -> (tokio::sync::oneshot::Receiver<()>, Self) {
pub fn spawn(
config: NodeConfig,
runtime: RuntimeType,
) -> (tokio::sync::oneshot::Receiver<()>, Self) {
let (startup_sender, startup_reciever) = tokio::sync::oneshot::channel();
let (cancel_sender, cancel_reciever) = tokio::sync::oneshot::channel();

let thread = thread::spawn(move || {
let name_span = tracing::span!(
let span = tracing::span!(
tracing::Level::INFO,
"node",
name =% config.sui_address()
);
let _guard = name_span.enter();

let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let _guard = span.enter();

let mut builder = match runtime {
RuntimeType::SingleThreaded => tokio::runtime::Builder::new_current_thread(),
RuntimeType::MultiThreaded => {
thread_local! {
static SPAN: std::cell::RefCell<Option<tracing::span::EnteredSpan>> =
std::cell::RefCell::new(None);
}
let mut builder = tokio::runtime::Builder::new_multi_thread();
let span = span.clone();
builder
.on_thread_start(move || {
SPAN.with(|maybe_entered_span| {
*maybe_entered_span.borrow_mut() = Some(span.clone().entered());
});
})
.on_thread_stop(|| {
SPAN.with(|maybe_entered_span| {
maybe_entered_span.borrow_mut().take();
});
});

builder
}
};
let runtime = builder.enable_all().build().unwrap();

runtime.block_on(async move {
let _server = SuiNode::start(&config).await.unwrap();
Expand Down Expand Up @@ -180,6 +207,13 @@ impl Container {
}
}

/// The type of tokio runtime that should be used for a particular Node
#[derive(Clone, Copy, Debug)]
pub enum RuntimeType {
SingleThreaded,
MultiThreaded,
}

#[cfg(test)]
mod test {
use crate::memory::Swarm;
Expand Down

0 comments on commit cadb941

Please sign in to comment.