From a74e7c3d02d873c5eba8432e36358edd4cfd983c Mon Sep 17 00:00:00 2001 From: ade <93547199+oxade@users.noreply.github.com> Date: Mon, 16 May 2022 19:18:44 -0400 Subject: [PATCH] Full node initial (#1969) * Initial full node impl --- crates/sui-config/Cargo.toml | 1 + sui/src/bin/{node.rs => full_node.rs} | 26 +- sui/src/config/mod.rs | 2 + sui/src/lib.rs | 2 +- sui/src/sui_commands.rs | 1 + sui/src/sui_full_node.rs | 218 ++++++++++++++ sui/src/sui_node.rs | 134 --------- sui/src/unit_tests/cli_tests.rs | 1 + sui_core/src/authority.rs | 4 +- sui_core/src/authority/authority_store.rs | 75 +++-- sui_core/src/full_node.rs | 114 ++++++++ sui_core/src/full_node/follower.rs | 329 ++++++++++++++++++++++ sui_core/src/full_node/full_node_state.rs | 251 +++++++++++++++++ sui_core/src/lib.rs | 1 + sui_core/src/transaction_input_checker.rs | 16 +- 15 files changed, 991 insertions(+), 184 deletions(-) rename sui/src/bin/{node.rs => full_node.rs} (81%) create mode 100644 sui/src/sui_full_node.rs delete mode 100644 sui/src/sui_node.rs create mode 100644 sui_core/src/full_node.rs create mode 100644 sui_core/src/full_node/follower.rs create mode 100644 sui_core/src/full_node/full_node_state.rs diff --git a/crates/sui-config/Cargo.toml b/crates/sui-config/Cargo.toml index 6a5747036c251..718d4675c36d3 100644 --- a/crates/sui-config/Cargo.toml +++ b/crates/sui-config/Cargo.toml @@ -24,5 +24,6 @@ move-binary-format = { git = "https://github.com/move-language/move", rev = "1b2 move-package = { git = "https://github.com/move-language/move", rev = "1b2d3b4274345f5b4b6a1a1bde5aee452003ab5b" } sui-framework = { path = "../../sui_programmability/framework" } + sui-adapter = { path = "../sui-adapter" } sui-types = { path = "../sui-types" } diff --git a/sui/src/bin/node.rs b/sui/src/bin/full_node.rs similarity index 81% rename from sui/src/bin/node.rs rename to sui/src/bin/full_node.rs index 6865e98dc0f9f..f27e9c1602632 100644 --- a/sui/src/bin/node.rs +++ b/sui/src/bin/full_node.rs @@ -13,8 +13,8 @@ use std::{ }; use sui::{ api::{RpcGatewayOpenRpc, RpcGatewayServer}, - config::sui_config_dir, - sui_node::SuiNode, + config::{sui_config_dir, FULL_NODE_DB_PATH}, + sui_full_node::SuiFullNode, }; use tracing::info; @@ -22,12 +22,11 @@ const DEFAULT_NODE_SERVER_PORT: &str = "5002"; const DEFAULT_NODE_SERVER_ADDR_IPV4: &str = "127.0.0.1"; #[derive(Parser)] -#[clap( - name = "Sui Node", - about = "A Byzantine fault tolerant chain with low-latency finality and high throughput", - rename_all = "kebab-case" -)] +#[clap(name = "Sui Full Node", about = "TODO", rename_all = "kebab-case")] struct SuiNodeOpt { + #[clap(long)] + db_path: Option, + #[clap(long)] config: Option, @@ -50,9 +49,14 @@ async fn main() -> anyhow::Result<()> { let guard = telemetry_subscribers::init(config); let options: SuiNodeOpt = SuiNodeOpt::parse(); + let db_path = options + .db_path + .map(PathBuf::from) + .unwrap_or(sui_config_dir()?.join(FULL_NODE_DB_PATH)); + let config_path = options .config - .unwrap_or(sui_config_dir()?.join("node.conf")); + .unwrap_or(sui_config_dir()?.join("network.conf")); info!("Node config file path: {:?}", config_path); let server_builder = HttpServerBuilder::default(); @@ -75,7 +79,11 @@ async fn main() -> anyhow::Result<()> { let mut module = RpcModule::new(()); let open_rpc = RpcGatewayOpenRpc::open_rpc(); module.register_method("rpc.discover", move |_, _| Ok(open_rpc.clone()))?; - module.merge(SuiNode::new(&config_path)?.into_rpc())?; + module.merge( + SuiFullNode::start_with_genesis(&config_path, &db_path) + .await? + .into_rpc(), + )?; info!( "Available JSON-RPC methods : {:?}", diff --git a/sui/src/config/mod.rs b/sui/src/config/mod.rs index 80e5bb51f37ee..2a3c27b3564a1 100644 --- a/sui/src/config/mod.rs +++ b/sui/src/config/mod.rs @@ -27,6 +27,8 @@ const SUI_CONFIG_DIR: &str = "sui_config"; pub const SUI_NETWORK_CONFIG: &str = "network.conf"; pub const SUI_WALLET_CONFIG: &str = "wallet.conf"; pub const SUI_GATEWAY_CONFIG: &str = "gateway.conf"; +pub const FULL_NODE_DB_PATH: &str = "full_node_db"; + pub const SUI_DEV_NET_URL: &str = "https://gateway.devnet.sui.io:9000"; pub fn sui_config_dir() -> Result { diff --git a/sui/src/lib.rs b/sui/src/lib.rs index 9a34e5941c15f..8f211443c9373 100644 --- a/sui/src/lib.rs +++ b/sui/src/lib.rs @@ -10,5 +10,5 @@ pub mod rpc_gateway; pub mod rpc_gateway_client; pub mod shell; pub mod sui_commands; -pub mod sui_node; +pub mod sui_full_node; pub mod wallet_commands; diff --git a/sui/src/sui_commands.rs b/sui/src/sui_commands.rs index d30acae89cf64..4ea84a7c2bf9b 100644 --- a/sui/src/sui_commands.rs +++ b/sui/src/sui_commands.rs @@ -10,6 +10,7 @@ use crate::{ use anyhow::{anyhow, bail}; use base64ct::{Base64, Encoding}; use clap::*; + use std::fs; use std::num::NonZeroUsize; use std::path::PathBuf; diff --git a/sui/src/sui_full_node.rs b/sui/src/sui_full_node.rs new file mode 100644 index 0000000000000..4e5caa6e4b636 --- /dev/null +++ b/sui/src/sui_full_node.rs @@ -0,0 +1,218 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + collections::BTreeMap, + path::{Path, PathBuf}, + sync::Arc, + time::Duration, +}; +use sui_types::sui_serde::Base64; + +use crate::{ + api::{RpcGatewayServer, TransactionBytes}, + rpc_gateway::responses::{ObjectResponse, SuiTypeTag}, +}; +use anyhow::anyhow; +use async_trait::async_trait; +use jsonrpsee::core::RpcResult; +use sui_config::{NetworkConfig, PersistedConfig}; +use sui_core::{ + authority::ReplicaStore, + full_node::FullNodeState, + gateway_types::{ + GetObjectInfoResponse, SuiObjectRef, TransactionEffectsResponse, TransactionResponse, + }, + sui_json::SuiJsonValue, +}; +use sui_core::{ + authority_client::NetworkAuthorityClient, full_node::FullNode, + gateway_state::GatewayTxSeqNumber, +}; +use sui_types::{ + base_types::{ObjectID, SuiAddress, TransactionDigest}, + error::SuiError, +}; +use tracing::info; + +pub struct SuiFullNode { + client: FullNode, +} + +impl SuiFullNode { + pub async fn start_with_genesis( + network_config_path: &Path, + db_path: &Path, + ) -> anyhow::Result { + // Network config is all we need for now + let network_config: NetworkConfig = PersistedConfig::read(network_config_path)?; + + // Start a full node + let full_node = make_full_node(db_path.to_path_buf(), &network_config).await?; + full_node.spawn_tasks().await; + info!("Started full node "); + + Ok(Self { client: full_node }) + } +} + +#[async_trait] +impl RpcGatewayServer for SuiFullNode { + async fn transfer_coin( + &self, + _signer: SuiAddress, + _object_id: ObjectID, + _gas: Option, + _gas_budget: u64, + _recipient: SuiAddress, + ) -> RpcResult { + Err(anyhow!("Sui Node only supports read-only methods").into()) + } + + async fn publish( + &self, + _sender: SuiAddress, + _compiled_modules: Vec, + _gas: Option, + _gas_budget: u64, + ) -> RpcResult { + Err(anyhow!("Sui Node only supports read-only methods").into()) + } + + async fn split_coin( + &self, + _signer: SuiAddress, + _coin_object_id: ObjectID, + _split_amounts: Vec, + _gas: Option, + _gas_budget: u64, + ) -> RpcResult { + Err(anyhow!("Sui Node only supports read-only methods").into()) + } + + async fn merge_coin( + &self, + _signer: SuiAddress, + _primary_coin: ObjectID, + _coin_to_merge: ObjectID, + _gas: Option, + _gas_budget: u64, + ) -> RpcResult { + Err(anyhow!("Sui Node only supports read-only methods").into()) + } + + async fn execute_transaction( + &self, + _tx_bytes: Base64, + _signature: Base64, + _pub_key: Base64, + ) -> RpcResult { + Err(anyhow!("Sui Node only supports read-only methods").into()) + } + + async fn move_call( + &self, + _signer: SuiAddress, + _package_object_id: ObjectID, + _module: String, + _function: String, + _type_arguments: Vec, + _rpc_arguments: Vec, + _gas: Option, + _gas_budget: u64, + ) -> RpcResult { + Err(anyhow!("Sui Node only supports read-only methods").into()) + } + + async fn sync_account_state(&self, _address: SuiAddress) -> RpcResult<()> { + todo!() + } + + // + // Read APIs + // + + async fn get_owned_objects(&self, owner: SuiAddress) -> RpcResult { + let resp = ObjectResponse { + objects: self + .client + .get_owned_objects(owner) + .await? + .iter() + .map(|w| SuiObjectRef::from(*w)) + .collect(), + }; + Ok(resp) + } + + async fn get_object_info(&self, object_id: ObjectID) -> RpcResult { + Ok(self + .client + .get_object_info(object_id) + .await? + .try_into() + .map_err(|e| anyhow!("{}", e))?) + } + + async fn get_total_transaction_number(&self) -> RpcResult { + Ok(self.client.state.get_total_transaction_number()?) + } + + async fn get_transactions_in_range( + &self, + start: GatewayTxSeqNumber, + end: GatewayTxSeqNumber, + ) -> RpcResult> { + Ok(self.client.state.get_transactions_in_range(start, end)?) + } + + async fn get_recent_transactions( + &self, + count: u64, + ) -> RpcResult> { + Ok(self.client.state.get_recent_transactions(count)?) + } + + async fn get_transaction( + &self, + digest: TransactionDigest, + ) -> RpcResult { + Ok(self.client.state.get_transaction(digest).await?) + } +} + +pub async fn make_full_node( + db_store_path: PathBuf, + net_config: &NetworkConfig, +) -> Result, SuiError> { + let store = Arc::new(ReplicaStore::open(db_store_path, None)); + + let val_config = net_config + .validator_configs() + .iter() + .next() + .expect("Validtor set must be non empty"); + + let follower_node_state = + FullNodeState::new_with_genesis(net_config.committee(), store, val_config.genesis()) + .await?; + + let mut authority_clients = BTreeMap::new(); + let mut config = mysten_network::config::Config::new(); + config.connect_timeout = Some(Duration::from_secs(5)); + config.request_timeout = Some(Duration::from_secs(5)); + for validator in net_config + .validator_configs() + .iter() + .next() + .unwrap() + .committee_config() + .validator_set() + { + let channel = config.connect_lazy(validator.network_address()).unwrap(); + let client = NetworkAuthorityClient::new(channel); + authority_clients.insert(validator.public_key(), client); + } + + Ok(FullNode::new(Arc::new(follower_node_state), authority_clients).unwrap()) +} diff --git a/sui/src/sui_node.rs b/sui/src/sui_node.rs deleted file mode 100644 index 4bbcbba6df3b7..0000000000000 --- a/sui/src/sui_node.rs +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright (c) 2022, Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use std::path::Path; - -use crate::{ - api::{RpcGatewayServer, TransactionBytes}, - rpc_gateway::responses::{ObjectResponse, SuiTypeTag}, -}; -use anyhow::anyhow; -use async_trait::async_trait; -use jsonrpsee::core::RpcResult; -use sui_core::gateway_types::{TransactionEffectsResponse, TransactionResponse}; - -use sui_core::gateway_state::GatewayTxSeqNumber; -use sui_core::gateway_types::GetObjectInfoResponse; -use sui_core::sui_json::SuiJsonValue; -use sui_types::base_types::{ObjectID, SuiAddress, TransactionDigest}; -use sui_types::sui_serde::Base64; - -pub struct SuiNode {} - -impl SuiNode { - pub fn new(_config_path: &Path) -> anyhow::Result { - Ok(Self {}) - } -} - -#[async_trait] -impl RpcGatewayServer for SuiNode { - async fn transfer_coin( - &self, - _signer: SuiAddress, - _object_id: ObjectID, - _gas: Option, - _gas_budget: u64, - _recipient: SuiAddress, - ) -> RpcResult { - Err(anyhow!("Sui Node only supports read-only methods").into()) - } - - async fn publish( - &self, - _sender: SuiAddress, - _compiled_modules: Vec, - _gas: Option, - _gas_budget: u64, - ) -> RpcResult { - Err(anyhow!("Sui Node only supports read-only methods").into()) - } - - async fn split_coin( - &self, - _signer: SuiAddress, - _coin_object_id: ObjectID, - _split_amounts: Vec, - _gas: Option, - _gas_budget: u64, - ) -> RpcResult { - Err(anyhow!("Sui Node only supports read-only methods").into()) - } - - async fn merge_coin( - &self, - _signer: SuiAddress, - _primary_coin: ObjectID, - _coin_to_merge: ObjectID, - _gas: Option, - _gas_budget: u64, - ) -> RpcResult { - Err(anyhow!("Sui Node only supports read-only methods").into()) - } - - async fn get_owned_objects(&self, _owner: SuiAddress) -> RpcResult { - todo!() - } - - async fn get_object_info(&self, _object_id: ObjectID) -> RpcResult { - todo!() - } - - async fn execute_transaction( - &self, - _tx_bytes: Base64, - _signature: Base64, - _pub_key: Base64, - ) -> RpcResult { - Err(anyhow!("Sui Node only supports read-only methods").into()) - } - - async fn move_call( - &self, - _signer: SuiAddress, - _package_object_id: ObjectID, - _module: String, - _function: String, - _type_arguments: Vec, - _rpc_arguments: Vec, - _gas: Option, - _gas_budget: u64, - ) -> RpcResult { - Err(anyhow!("Sui Node only supports read-only methods").into()) - } - - async fn sync_account_state(&self, _address: SuiAddress) -> RpcResult<()> { - todo!() - } - - async fn get_total_transaction_number(&self) -> RpcResult { - todo!() - } - - async fn get_transactions_in_range( - &self, - _start: GatewayTxSeqNumber, - _end: GatewayTxSeqNumber, - ) -> RpcResult> { - todo!() - } - - async fn get_recent_transactions( - &self, - _count: u64, - ) -> RpcResult> { - todo!() - } - - async fn get_transaction( - &self, - _digest: TransactionDigest, - ) -> RpcResult { - todo!() - } -} diff --git a/sui/src/unit_tests/cli_tests.rs b/sui/src/unit_tests/cli_tests.rs index 0cbe336adb134..9d42806f4f7fe 100644 --- a/sui/src/unit_tests/cli_tests.rs +++ b/sui/src/unit_tests/cli_tests.rs @@ -84,6 +84,7 @@ async fn test_genesis() -> Result<(), anyhow::Error> { assert!(files.contains(&SUI_WALLET_CONFIG.to_string())); assert!(files.contains(&SUI_GATEWAY_CONFIG.to_string())); assert!(files.contains(&SUI_NETWORK_CONFIG.to_string())); + assert!(files.contains(&"wallet.key".to_string())); // Check network config diff --git a/sui_core/src/authority.rs b/sui_core/src/authority.rs index e57651d1fff6d..eb91b3c96a7f6 100644 --- a/sui_core/src/authority.rs +++ b/sui_core/src/authority.rs @@ -65,11 +65,11 @@ mod temporary_store; pub use temporary_store::AuthorityTemporaryStore; mod authority_store; -pub use authority_store::{AuthorityStore, GatewayStore, SuiDataStore}; +pub use authority_store::{AuthorityStore, GatewayStore, ReplicaStore, SuiDataStore}; pub mod authority_notifier; -const MAX_ITEMS_LIMIT: u64 = 100_000; +pub const MAX_ITEMS_LIMIT: u64 = 100_000; const BROADCAST_CAPACITY: usize = 10_000; /// Prometheus metrics which can be displayed in Grafana, queried and alerted on diff --git a/sui_core/src/authority/authority_store.rs b/sui_core/src/authority/authority_store.rs index 21a301312469e..42a7e38581bc5 100644 --- a/sui_core/src/authority/authority_store.rs +++ b/sui_core/src/authority/authority_store.rs @@ -17,10 +17,10 @@ use typed_store::rocks::{DBBatch, DBMap}; use typed_store::{reopen, traits::Map}; -pub type AuthorityStore = SuiDataStore; +pub type AuthorityStore = SuiDataStore; #[allow(dead_code)] -pub type ReplicaStore = SuiDataStore; -pub type GatewayStore = SuiDataStore; +pub type ReplicaStore = SuiDataStore; +pub type GatewayStore = SuiDataStore; const NUM_SHARDS: usize = 4096; @@ -31,10 +31,12 @@ const LAST_CONSENSUS_INDEX_ADDR: u64 = 0; /// ALL_OBJ_VER determines whether we want to store all past /// versions of every object in the store. Authority doesn't store /// them, but other entities such as replicas will. +/// USE_LOCKS determines whether we maintain locks when updating state +/// this is because replicas for example don't currently require locks` /// S is a template on Authority signature state. This allows SuiDataStore to be used on either /// authorities or non-authorities. Specifically, when storing transactions and effects, /// S allows SuiDataStore to either store the authority signed version or unsigned version. -pub struct SuiDataStore { +pub struct SuiDataStore { /// This is a map between the object ID and the latest state of the object, namely the /// state that is needed to process new transactions. If an object is deleted its entry is /// removed from this map. @@ -107,8 +109,11 @@ pub struct SuiDataStore { last_consensus_index: DBMap, } -impl Deserialize<'de>> - SuiDataStore +impl< + const ALL_OBJ_VER: bool, + const USE_LOCKS: bool, + S: Eq + Serialize + for<'de> Deserialize<'de>, + > SuiDataStore { /// Open an authority store by directory path pub fn open>(path: P, db_options: Option) -> Self { @@ -265,6 +270,10 @@ impl Deserialize<'de>> &self, input_objects: impl Iterator, ) -> Vec> { + if !USE_LOCKS { + return vec![]; + } + let num_locks = self.lock_table.len(); // TODO: randomize the lock mapping based on a secret to avoid DoS attacks. let lock_number: BTreeSet = input_objects @@ -645,8 +654,10 @@ impl Deserialize<'de>> .iter() .filter(|(id, _, _)| objects.get(id).unwrap().is_owned()); - // Archive the old lock. - write_batch = write_batch.delete_batch(&self.transaction_lock, owned_inputs.clone())?; + if USE_LOCKS { + // Archive the old lock. + write_batch = write_batch.delete_batch(&self.transaction_lock, owned_inputs.clone())?; + } // Delete objects. // Wrapped objects need to be deleted as well because we can no longer track their @@ -705,17 +716,19 @@ impl Deserialize<'de>> }), )?; - // Create locks for new objects, if they are owned. - write_batch = write_batch.insert_batch( - &self.transaction_lock, - written.iter().filter_map(|(_, (object_ref, new_object))| { - if new_object.is_owned() { - Some((object_ref, None)) - } else { - None - } - }), - )?; + if USE_LOCKS { + // Create locks for new objects, if they are owned. + write_batch = write_batch.insert_batch( + &self.transaction_lock, + written.iter().filter_map(|(_, (object_ref, new_object))| { + if new_object.is_owned() { + Some((object_ref, None)) + } else { + None + } + }), + )?; + } if ALL_OBJ_VER { // Keep all versions of every object if ALL_OBJ_VER is true. @@ -755,11 +768,13 @@ impl Deserialize<'de>> // Acquire the lock to ensure no one else writes when we are in here. let _mutexes = self.acquire_locks(owned_inputs.clone()); - // Check the locks are still active - // TODO: maybe we could just check if the certificate is there instead? - let locks = self.transaction_lock.multi_get(owned_inputs)?; - for object_lock in locks { - object_lock.ok_or(SuiError::TransactionLockDoesNotExist)?; + if USE_LOCKS { + // Check the locks are still active + // TODO: maybe we could just check if the certificate is there instead? + let locks = self.transaction_lock.multi_get(owned_inputs)?; + for object_lock in locks { + object_lock.ok_or(SuiError::TransactionLockDoesNotExist)?; + } } if let Some(next_seq) = seq_opt { @@ -1025,7 +1040,7 @@ impl Deserialize<'de>> } } -impl SuiDataStore { +impl SuiDataStore { pub fn get_signed_transaction_info( &self, transaction_digest: &TransactionDigest, @@ -1038,14 +1053,14 @@ impl SuiDataStore { } } -impl SuiDataStore { +impl SuiDataStore { pub fn pending_transactions(&self) -> &DBMap { &self.transactions } } -impl Deserialize<'de>> BackingPackageStore - for SuiDataStore +impl Deserialize<'de>> + BackingPackageStore for SuiDataStore { fn get_package(&self, package_id: &ObjectID) -> SuiResult> { let package = self.get_object(package_id)?; @@ -1061,8 +1076,8 @@ impl Deserialize<'de>> BackingPackag } } -impl Deserialize<'de>> ModuleResolver - for SuiDataStore +impl Deserialize<'de>> ModuleResolver + for SuiDataStore { type Error = SuiError; diff --git a/sui_core/src/full_node.rs b/sui_core/src/full_node.rs new file mode 100644 index 0000000000000..a84b3c3e7aa3f --- /dev/null +++ b/sui_core/src/full_node.rs @@ -0,0 +1,114 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::collections::BTreeMap; +use std::{collections::HashMap, sync::Arc}; + +use sui_types::base_types::AuthorityName; +use sui_types::base_types::{ObjectID, ObjectRef, SuiAddress}; +use sui_types::error::SuiResult; +use sui_types::object::ObjectRead; +use tokio::sync::Mutex as TokioMutex; +use tokio::sync::Mutex; +use tracing::debug; + +use crate::authority_client::AuthorityAPI; +use crate::{authority_active::AuthorityHealth, authority_aggregator::AuthorityAggregator}; +use futures::channel::mpsc::channel as MpscChannel; +mod follower; +pub use crate::full_node::follower::follow_multiple; + +mod full_node_state; +pub use crate::full_node::full_node_state::FullNodeState; + +use self::follower::Downloader; + +const DOWNLOADER_CHANNEL_SIZE: usize = 1024; + +pub struct FullNode { + // Local state + pub state: Arc, + + // The network interfaces to authorities + pub aggregator: Arc>, + + // Network health + pub health: Arc>>, +} + +impl FullNode +where + A: AuthorityAPI + Send + Sync + 'static + Clone, +{ + // TODO: Provide way to run genesis here + + pub fn new( + state: Arc, + authority_clients: BTreeMap, + ) -> SuiResult { + let committee = state.committee.clone(); + + Ok(Self { + health: Arc::new(Mutex::new( + committee + .clone() + .voting_rights + .iter() + .map(|(name, _)| (*name, AuthorityHealth::default())) + .collect(), + )), + state, + aggregator: Arc::new(AuthorityAggregator::new(committee, authority_clients)), + }) + } + + async fn download_object_from_authorities(&self, object_id: ObjectID) -> SuiResult { + let result = self.aggregator.get_object_info_execute(object_id).await?; + if let ObjectRead::Exists(obj_ref, object, _) = &result { + let local_object = self.state.store.get_object(&object_id)?; + if local_object.is_none() + || &local_object.unwrap().compute_object_reference() != obj_ref + { + self.state.store.insert_object_direct(*obj_ref, object)?; + } + } + debug!(?result, "Downloaded object from authorities"); + + Ok(result) + } + + pub async fn get_object_info(&self, object_id: ObjectID) -> Result { + let result = self.download_object_from_authorities(object_id).await?; + Ok(result) + } + + pub async fn get_owned_objects( + &self, + account_addr: SuiAddress, + ) -> Result, anyhow::Error> { + Ok(self.state.store.get_account_objects(account_addr)?) + } +} + +impl FullNode +where + A: AuthorityAPI + Send + 'static + Sync + Clone, +{ + pub async fn spawn_tasks(&self) { + let (send_chann, recv_chann) = MpscChannel(DOWNLOADER_CHANNEL_SIZE); + + let downloader = Downloader { + aggregator: Arc::new(AuthorityAggregator::new( + self.state.committee.clone(), + self.aggregator.authority_clients.clone(), + )), + state: self.state.clone(), + }; + + // Spawn a downloader + downloader.start_downloader(recv_chann).await; + + // Spawn follower tasks + follow_multiple(self, self.state.committee.quorum_threshold(), send_chann).await; + } +} diff --git a/sui_core/src/full_node/follower.rs b/sui_core/src/full_node/follower.rs new file mode 100644 index 0000000000000..71716f774b5b6 --- /dev/null +++ b/sui_core/src/full_node/follower.rs @@ -0,0 +1,329 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::authority::MAX_ITEMS_LIMIT; +use crate::{ + authority_aggregator::AuthorityAggregator, authority_client::AuthorityAPI, + safe_client::SafeClient, +}; +use futures::{ + channel::mpsc::{Receiver as MpscReceiver, Sender as MpscSender}, + SinkExt, StreamExt, +}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; +use sui_types::crypto::PublicKeyBytes; +use sui_types::{ + base_types::{AuthorityName, ObjectID, TransactionDigest}, + batch::UpdateItem, + error::SuiError, + messages::{ + BatchInfoRequest, BatchInfoResponseItem, ObjectInfoRequest, ObjectInfoRequestKind, + TransactionInfoRequest, TransactionInfoResponse, + }, + object::Object, +}; + +use tracing::{debug, error, info}; + +/// Follows one authority +struct Follower { + // Authority being followed + name: AuthorityName, + + client: SafeClient, + + state: Arc, + + downloader_channel: MpscSender<(AuthorityName, TransactionDigest)>, +} + +use super::{FullNode, FullNodeState}; + +/// Spawns tasks to follow a quorum of minumum stake min_stake_target +pub async fn follow_multiple( + full_node: &FullNode, + min_stake_target: usize, + downloader_channel: MpscSender<(AuthorityName, TransactionDigest)>, +) where + A: AuthorityAPI + Send + Sync + 'static + Clone, +{ + let committee = &full_node.state.committee; + if min_stake_target > committee.total_votes { + error!("Stake target cannot be greater than total_votes") + } + info!("Follower coordinator starting. Target stake {min_stake_target}"); + + let mut authority_names = HashSet::new(); + let mut num_tasks = 0; + let mut stake = 0; + while stake < min_stake_target { + let name = full_node.state.committee.sample(); + if authority_names.contains(name) { + continue; + } + + authority_names.insert(*name); + let d_channel = downloader_channel.clone(); + + info!("Will follow authority {:?}", name); + + let follower = Follower::new(*name, full_node, d_channel); + + tokio::task::spawn(async move { + follower.spawn().await; + }); + + num_tasks += 1; + stake += committee.weight(name); + } + + info!( + "Spawned {num_tasks} follower tasks to achieve {stake} stake out of {} total", + committee.total_votes + ); + + // drop orig channel + drop(downloader_channel); +} + +impl Follower +where + A: AuthorityAPI + Send + Sync + 'static + Clone, +{ + pub fn new( + peer_name: AuthorityName, + full_node: &FullNode, + downloader_channel: MpscSender<(AuthorityName, TransactionDigest)>, + ) -> Follower { + Self { + name: peer_name, + client: full_node.aggregator.authority_clients[&peer_name].clone(), + state: full_node.state.clone(), + + downloader_channel, + } + } + + pub async fn spawn(mut self) { + let peer_name = self.name; + info!("Spawn follower for {:?}", peer_name); + + let _ = tokio::task::spawn(async move { self.follow().await }).await; + } + + async fn follow(&mut self) -> Result<(), SuiError> { + let mut start = 0; + let length = MAX_ITEMS_LIMIT; + + loop { + info!("Follower listener started for {:?}", self.name); + let mut batch_listen_chann = match self + .client + .handle_batch_stream(BatchInfoRequest { + start: Some(start), + length, + }) + .await + { + Ok(c) => c, + Err(e) => { + error!( + "Follower listener error for authority: {:?}, err: {e}", + self.name + ); + break; + } + }; + + info!( + "Follower batch listener for authority: {:?} starting at sequence: {:?}, for length: {}.", + self.name, start, length + ); + + while let Some(item) = batch_listen_chann.next().await { + match item { + Ok(BatchInfoResponseItem(UpdateItem::Transaction((tx_seq, tx_digest)))) => { + debug!( + "Received single tx_seq {tx_seq}, digest {:?} from authority {:?}", + tx_digest, self.name + ); + if !self.state.store.effects_exists(&tx_digest)? { + self.downloader_channel + .send((self.name, tx_digest)) + .await.unwrap_or_else(|e| panic!("Unable to send tx {:?} to downloader, from authority {:?}, err {:?} ", tx_digest, self.name, e)); + } else { + debug!("Ignoring previously seen tx tx_seq {tx_seq}, digest {:?} from authority {:?}", tx_digest, self.name); + } + } + Ok(BatchInfoResponseItem(UpdateItem::Batch(batch))) => { + // Strictly informational for now + debug!("Received batch {:?}", batch); + + // TODO: This should be persisted to disk to avoid re-fetching + start = batch.batch.next_sequence_number; + } + Err(err) => { + error!("{:?}", err); + return Err(err); + } + } + } + + // If we ever get here, we should restart loop because stream closed + } + + Ok(()) + } +} + +/// Downloads certs, objects etc and updates state +#[derive(Clone)] +pub struct Downloader { + pub aggregator: Arc>, + pub state: Arc, +} + +impl Downloader +where + A: AuthorityAPI + Send + Sync + 'static + Clone, +{ + //TODO: Downloader needs to be more robust to cover cases of missing dependencies, deleted objects and byzantine authorities + pub async fn start_downloader( + self, + downloader_channel: MpscReceiver<(AuthorityName, TransactionDigest)>, + ) { + tokio::task::spawn(async move { + match downloader_task(&self, downloader_channel).await { + Ok(_) => todo!(), + Err(e) => error!("Downloader task failed with err {e}"), + }; + }); + } + + // TODO: update to download dependencies + async fn update_state( + &self, + name: AuthorityName, + tx_resp: TransactionInfoResponse, + ) -> Result<(), SuiError> { + debug!( + "Updating state from authority {:?}, with tx resp {:?}", + name, tx_resp + ); + + let signed_effects = tx_resp + .clone() + .signed_effects + .ok_or(SuiError::ByzantineAuthoritySuspicion { authority: name })?; + let certificate = tx_resp + .clone() + .certified_transaction + .ok_or(SuiError::ByzantineAuthoritySuspicion { authority: name })?; + + // Get all the objects which were changed + let mutated_and_created_objects = signed_effects.effects.mutated_and_created(); + let mut modified_objects = HashMap::new(); + + // TODO: make this parallel + // Download the objects + for r in mutated_and_created_objects { + if let Some(obj) = self.download_latest_object(name, r.0 .0).await? { + modified_objects.insert(obj.compute_object_reference(), obj); + } else { + // This can happen when the authority view of the object is changing faster than we can download + // TODO: try asking other authorites in case one still has the object + error!( + "Download for object {} failed. Object not present in authority", + r.0 .0 + ); + } + } + + // Get the active inputs to the TX + let mut active_input_objects = vec![]; + + for obj_kind in certificate.data.input_objects()? { + if let Some(obj) = self + .download_latest_object(name, obj_kind.object_id()) + .await? + { + active_input_objects.push((obj_kind, obj)); + } else { + // This can happen when the authority view of the object is changing faster than we can download + // TODO: try asking other authorites in case one still has the object + error!( + "Download for object {} failed. Object not present in authority", + obj_kind.object_id() + ); + } + } + + // TODO: is it safe to continue if some objects are missing? + self.state.store.update_gateway_state( + active_input_objects, + modified_objects, + certificate, + signed_effects.effects.to_unsigned_effects(), + // TODO: decide what to do with this + self.state + .next_tx_seq_number + .fetch_add(1, std::sync::atomic::Ordering::SeqCst), + )?; + Ok(()) + } + + // TODO: better error handling and dependencies fetching + // TODO: There is a chance that the object has changed by the time we fetch it. Can we do better? + pub async fn download_latest_object( + &self, + name: AuthorityName, + obj: ObjectID, + ) -> Result, SuiError> { + Ok(self.aggregator.authority_clients[&name] + .clone() + .handle_object_info_request(ObjectInfoRequest { + object_id: obj, + request_kind: ObjectInfoRequestKind::LatestObjectInfo(None), + }) + .await? + .object_and_lock + .map(|o| o.object)) + } +} + +pub async fn downloader_task( + d: &Downloader, + mut recv: MpscReceiver<(PublicKeyBytes, TransactionDigest)>, +) -> Result<(), SuiError> +where + A: AuthorityAPI + Send + Sync + 'static + Clone, +{ + info!("Full node downlader started..."); + loop { + while let Some((name, digest)) = recv.next().await { + if !d.state.store.effects_exists(&digest)? { + let client = d.aggregator.authority_clients[&name].clone(); + // Download the certificate + let response = client + .handle_transaction_info_request(TransactionInfoRequest::from(digest)) + .await?; + if response.clone().certified_transaction.is_some() { + d.update_state(name, response).await?; + } else { + let err: Result<(), SuiError> = + Err(SuiError::ByzantineAuthoritySuspicion { authority: name }); + error!("Error when downloading {:?}", err); + return err; + } + } else { + debug!( + "Downloader ignoring previously seen tx digest {:?} from authority {:?}", + digest, name + ); + } + } + } +} diff --git a/sui_core/src/full_node/full_node_state.rs b/sui_core/src/full_node/full_node_state.rs new file mode 100644 index 0000000000000..ed532ffe0b5d9 --- /dev/null +++ b/sui_core/src/full_node/full_node_state.rs @@ -0,0 +1,251 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::anyhow; +use std::{ + collections::HashSet, + sync::{atomic::AtomicU64, Arc}, +}; +use sui_config::genesis::Genesis; + +use crate::{ + authority::{AuthorityTemporaryStore, ReplicaStore}, + gateway_state::GatewayTxSeqNumber, + gateway_types::TransactionEffectsResponse, +}; +use move_binary_format::CompiledModule; +use move_vm_runtime::{move_vm::MoveVM, native_functions::NativeFunctionTable}; +use sui_adapter::adapter; +use sui_types::{ + base_types::{ObjectID, TransactionDigest, TxContext}, + committee::Committee as SuiCommittee, + error::{SuiError, SuiResult}, + fp_ensure, + gas::SuiGasStatus, + messages::Transaction, + object::Object, + MOVE_STDLIB_ADDRESS, SUI_FRAMEWORK_ADDRESS, +}; +use tracing::debug; + +// use std::path::Path; + +// use crate::{ +// api::{RpcGatewayServer, TransactionBytes}, +// rpc_gateway::responses::{ObjectResponse, SuiTypeTag}, +// }; +// use anyhow::anyhow; +// use async_trait::async_trait; +// use jsonrpsee::core::RpcResult; +// use sui_core::gateway_types::{TransactionEffectsResponse, TransactionResponse}; + +// use sui_core::gateway_state::GatewayTxSeqNumber; +// use sui_core::gateway_types::GetObjectInfoResponse; +// use sui_core::sui_json::SuiJsonValue; +// use sui_types::base_types::{ObjectID, SuiAddress, TransactionDigest}; +// use sui_types::sui_serde::Base64; + +const MAX_TX_RANGE_SIZE: u64 = 4096; + +pub struct FullNodeState { + pub store: Arc, + pub committee: SuiCommittee, + pub next_tx_seq_number: AtomicU64, + + /// Move native functions that are available to invoke + _native_functions: NativeFunctionTable, + /// Will be used for local exec in future + _move_vm: Arc, +} + +impl FullNodeState { + pub async fn new_without_genesis( + committee: SuiCommittee, + store: Arc, + ) -> Result { + let native_functions = + sui_framework::natives::all_natives(MOVE_STDLIB_ADDRESS, SUI_FRAMEWORK_ADDRESS); + let next_tx_seq_number = AtomicU64::new(store.next_sequence_number()?); + Ok(Self { + committee, + store, + _native_functions: native_functions.clone(), + _move_vm: Arc::new( + adapter::new_move_vm(native_functions) + .expect("We defined natives to not fail here"), + ), + next_tx_seq_number, + }) + } + + pub async fn new_with_genesis( + committee: SuiCommittee, + store: Arc, + genesis: &Genesis, + ) -> Result { + let state = Self::new_without_genesis(committee, store.clone()).await?; + + // Only initialize an empty database. + if store + .database_is_empty() + .expect("Database read should not fail.") + { + let mut genesis_ctx = genesis.genesis_ctx().to_owned(); + for genesis_modules in genesis.modules() { + state + .store_package_and_init_modules_for_genesis( + &mut genesis_ctx, + genesis_modules.to_owned(), + ) + .await + .expect("We expect publishing the Genesis packages to not fail"); + state + .insert_genesis_objects_bulk_unsafe( + &genesis.objects().iter().collect::>(), + ) + .await; + } + } + + Ok(state) + } + + /// TODO: consolidate with Authoritycounterpart + /// Persist the Genesis package to DB along with the side effects for module initialization + async fn store_package_and_init_modules_for_genesis( + &self, + ctx: &mut TxContext, + modules: Vec, + ) -> SuiResult { + let inputs = Transaction::input_objects_in_compiled_modules(&modules); + let ids: Vec<_> = inputs.iter().map(|kind| kind.object_id()).collect(); + let input_objects = self.get_objects(&ids[..]).await?; + // When publishing genesis packages, since the std framework packages all have + // non-zero addresses, [`Transaction::input_objects_in_compiled_modules`] will consider + // them as dependencies even though they are not. Hence input_objects contain objects + // that don't exist on-chain because they are yet to be published. + #[cfg(debug_assertions)] + { + let to_be_published_addresses: HashSet<_> = modules + .iter() + .map(|module| *module.self_id().address()) + .collect(); + assert!( + // An object either exists on-chain, or is one of the packages to be published. + inputs + .iter() + .zip(input_objects.iter()) + .all(|(kind, obj_opt)| obj_opt.is_some() + || to_be_published_addresses.contains(&kind.object_id())) + ); + } + let filtered = inputs + .into_iter() + .zip(input_objects.into_iter()) + .filter_map(|(input, object_opt)| object_opt.map(|object| (input, object))) + .collect::>(); + + debug_assert!(ctx.digest() == TransactionDigest::genesis()); + let mut temporary_store = + AuthorityTemporaryStore::new(self.store.clone(), filtered, ctx.digest()); + let package_id = ObjectID::from(*modules[0].self_id().address()); + let natives = self._native_functions.clone(); + let mut gas_status = SuiGasStatus::new_unmetered(); + let vm = adapter::verify_and_link( + &temporary_store, + &modules, + package_id, + natives, + &mut gas_status, + )?; + adapter::store_package_and_init_modules( + &mut temporary_store, + &vm, + modules, + ctx, + &mut gas_status, + )?; + self.store + .update_objects_state_for_genesis(temporary_store, ctx.digest()) + } + + pub async fn insert_genesis_objects_bulk_unsafe(&self, objects: &[&Object]) { + self.store + .bulk_object_insert(objects) + .expect("TODO: propagate the error") + } + + pub fn get_total_transaction_number(&self) -> Result { + Ok(self.store.next_sequence_number()?) + } + + pub fn get_transactions_in_range( + &self, + start: GatewayTxSeqNumber, + end: GatewayTxSeqNumber, + ) -> Result, anyhow::Error> { + fp_ensure!( + start <= end, + SuiError::GatewayInvalidTxRangeQuery { + error: format!( + "start must not exceed end, (start={}, end={}) given", + start, end + ), + } + .into() + ); + fp_ensure!( + end - start <= MAX_TX_RANGE_SIZE, + SuiError::GatewayInvalidTxRangeQuery { + error: format!( + "Number of transactions queried must not exceed {}, {} queried", + MAX_TX_RANGE_SIZE, + end - start + ), + } + .into() + ); + let res = self.store.transactions_in_seq_range(start, end)?; + debug!(?start, ?end, ?res, "Fetched transactions"); + Ok(res) + } + + pub fn get_recent_transactions( + &self, + count: u64, + ) -> Result, anyhow::Error> { + fp_ensure!( + count <= MAX_TX_RANGE_SIZE, + SuiError::GatewayInvalidTxRangeQuery { + error: format!( + "Number of transactions queried must not exceed {}, {} queried", + MAX_TX_RANGE_SIZE, count + ), + } + .into() + ); + let end = self.get_total_transaction_number()?; + let start = if end >= count { end - count } else { 0 }; + self.get_transactions_in_range(start, end) + } + + pub async fn get_objects( + &self, + _objects: &[ObjectID], + ) -> Result>, SuiError> { + self.store.get_objects(_objects) + } + pub async fn get_transaction( + &self, + digest: TransactionDigest, + ) -> Result { + let opt = self.store.get_certified_transaction(&digest)?; + match opt { + Some(certificate) => Ok(TransactionEffectsResponse { + certificate: certificate.try_into()?, + effects: self.store.get_effects(&digest)?.into(), + }), + None => Err(anyhow!(SuiError::TransactionNotFound { digest })), + } + } +} diff --git a/sui_core/src/lib.rs b/sui_core/src/lib.rs index 20372f674424e..96cafead690d4 100644 --- a/sui_core/src/lib.rs +++ b/sui_core/src/lib.rs @@ -9,6 +9,7 @@ pub mod authority_client; pub mod authority_server; pub mod consensus_adapter; pub mod execution_engine; +pub mod full_node; pub mod gateway_state; pub mod gateway_types; pub mod make; diff --git a/sui_core/src/transaction_input_checker.rs b/sui_core/src/transaction_input_checker.rs index 8bf31a3377551..86c22be22dc2f 100644 --- a/sui_core/src/transaction_input_checker.rs +++ b/sui_core/src/transaction_input_checker.rs @@ -18,8 +18,8 @@ use tracing::{debug, instrument}; use crate::authority::SuiDataStore; #[instrument(level = "trace", skip_all)] -pub async fn check_transaction_input( - store: &SuiDataStore, +pub async fn check_transaction_input( + store: &SuiDataStore, transaction: &TransactionEnvelope, shared_obj_metric: &IntCounter, ) -> Result<(SuiGasStatus<'static>, Vec<(InputObjectKind, Object)>), SuiError> @@ -51,8 +51,8 @@ where /// Returns the gas object (to be able to reuse it latter) and a gas status /// that will be used in the entire lifecycle of the transaction execution. #[instrument(level = "trace", skip_all)] -async fn check_gas( - store: &SuiDataStore, +async fn check_gas( + store: &SuiDataStore, gas_payment_id: ObjectID, gas_budget: u64, ) -> SuiResult<(Object, SuiGasStatus<'static>)> @@ -70,8 +70,8 @@ where } #[instrument(level = "trace", skip_all, fields(num_objects = input_objects.len()))] -async fn fetch_objects( - store: &SuiDataStore, +async fn fetch_objects( + store: &SuiDataStore, input_objects: &[InputObjectKind], gas_object_opt: Option, ) -> Result>, SuiError> @@ -94,8 +94,8 @@ where /// Check all the objects used in the transaction against the database, and ensure /// that they are all the correct version and number. #[instrument(level = "trace", skip_all)] -async fn check_locks( - store: &SuiDataStore, +async fn check_locks( + store: &SuiDataStore, transaction: &TransactionData, gas_object: Object, ) -> Result, SuiError>