From b30280827c12b771e53074524b963e9ee2c295dd Mon Sep 17 00:00:00 2001 From: stringhandler Date: Fri, 20 Dec 2024 17:24:23 +0200 Subject: [PATCH] feat: move events to process watcher (#1301) Co-authored-by: brianp --- src-tauri/src/binaries/binaries_manager.rs | 6 +- src-tauri/src/commands.rs | 83 ++++--------- src-tauri/src/gpu_miner.rs | 64 ++-------- src-tauri/src/gpu_miner_adapter.rs | 18 +-- src-tauri/src/main.rs | 28 +++-- src-tauri/src/node_adapter.rs | 138 +++++++++++++-------- src-tauri/src/node_manager.rs | 59 ++++----- src-tauri/src/process_watcher.rs | 4 +- src-tauri/src/telemetry_manager.rs | 66 +++++----- src-tauri/src/wallet_adapter.rs | 20 ++- src-tauri/src/wallet_manager.rs | 8 +- 11 files changed, 232 insertions(+), 262 deletions(-) diff --git a/src-tauri/src/binaries/binaries_manager.rs b/src-tauri/src/binaries/binaries_manager.rs index 6ea44eedb..a898c478a 100644 --- a/src-tauri/src/binaries/binaries_manager.rs +++ b/src-tauri/src/binaries/binaries_manager.rs @@ -36,7 +36,7 @@ use super::{ Binaries, }; -use log::{error, info, warn}; +use log::{debug, error, info, warn}; pub const LOG_TARGET: &str = "tari::universe::binary_manager"; @@ -102,7 +102,7 @@ impl BinaryManager { VersionReq::default() }); - info!(target: LOG_TARGET, "Version requirements for {:?}: {:?}", binary_name, version_requirement); + debug!(target: LOG_TARGET, "Version requirements for {:?}: {:?}", binary_name, version_requirement); version_requirement } @@ -514,7 +514,7 @@ impl BinaryManager { } pub async fn read_local_versions(&mut self) { - info!(target: LOG_TARGET,"Reading local versions for binary: {:?}", self.binary_name); + debug!(target: LOG_TARGET,"Reading local versions for binary: {:?}", self.binary_name); let binary_folder = match self.adapter.get_binary_folder() { Ok(path) => path, diff --git a/src-tauri/src/commands.rs b/src-tauri/src/commands.rs index 3355b29f5..586bf330f 100644 --- a/src-tauri/src/commands.rs +++ b/src-tauri/src/commands.rs @@ -33,14 +33,13 @@ use crate::external_dependencies::{ use crate::gpu_miner_adapter::{GpuMinerStatus, GpuNodeSource}; use crate::hardware::hardware_status_monitor::{HardwareStatusMonitor, PublicDeviceProperties}; use crate::internal_wallet::{InternalWallet, PaperWalletConfig}; -use crate::node_manager::NodeManagerError; use crate::p2pool::models::{Connections, Stats}; use crate::progress_tracker::ProgressTracker; use crate::tor_adapter::TorConfig; use crate::utils::shutdown_utils::stop_all_processes; use crate::wallet_adapter::{TransactionInfo, WalletBalance}; use crate::wallet_manager::WalletManagerError; -use crate::{setup_inner, UniverseAppState, APPLICATION_FOLDER_ID}; +use crate::{node_adapter, setup_inner, UniverseAppState, APPLICATION_FOLDER_ID}; use base64::prelude::*; use keyring::Entry; @@ -54,7 +53,6 @@ use std::sync::atomic::Ordering; use std::thread::{available_parallelism, sleep}; use std::time::{Duration, Instant, SystemTime}; use tari_common::configuration::Network; -use tari_core::transactions::tari_amount::MicroMinotari; use tauri::{Manager, PhysicalPosition, PhysicalSize}; use tauri_plugin_sentry::sentry; use tauri_plugin_sentry::sentry::protocol::Event; @@ -398,18 +396,27 @@ pub async fn get_miner_metrics( } state.is_getting_miner_metrics.store(true, Ordering::SeqCst); - let (sha_hash_rate, randomx_hash_rate, block_reward, block_height, block_time, is_synced) = state.node_manager - .get_network_hash_rate_and_block_reward().await - .unwrap_or_else(|e| { - if !matches!(e, NodeManagerError::NodeNotStarted) { - warn!(target: LOG_TARGET, "Error getting network hash rate and block reward: {}", e); - } - (0, 0, MicroMinotari(0), 0, 0, false) - }); + let node_status = state.base_node_latest_status.borrow().clone(); + let node_adapter::BaseNodeStatus { + sha_network_hashrate, + randomx_network_hashrate, + block_height, + block_time, + is_synced, + block_reward, + } = node_status; + // let (sha_hash_rate, randomx_hash_rate, block_reward, block_height, block_time, is_synced) = state.node_manager + // .get_network_hash_rate_and_block_reward().await + // .unwrap_or_else(|e| { + // if !matches!(e, NodeManagerError::NodeNotStarted) { + // warn!(target: LOG_TARGET, "Error getting network hash rate and block reward: {}", e); + // } + // (0, 0, MicroMinotari(0), 0, 0, false) + // }); let cpu_miner = state.cpu_miner.read().await; let cpu_mining_status = match cpu_miner - .status(randomx_hash_rate, block_reward) + .status(randomx_network_hashrate, block_reward) .await .map_err(|e| e.to_string()) { @@ -424,18 +431,7 @@ pub async fn get_miner_metrics( }; drop(cpu_miner); - let gpu_miner = state.gpu_miner.read().await; - let gpu_mining_status = match gpu_miner.status(sha_hash_rate, block_reward).await { - Ok(gpu) => gpu, - Err(e) => { - warn!(target: LOG_TARGET, "Error getting gpu miner status: {:?}", e); - state - .is_getting_miner_metrics - .store(false, Ordering::SeqCst); - return Err(e.to_string()); - } - }; - drop(gpu_miner); + let gpu_mining_status = state.gpu_latest_status.borrow().clone(); let gpu_public_parameters = HardwareStatusMonitor::current() .get_gpu_devices_public_properties() @@ -453,8 +449,8 @@ pub async fn get_miner_metrics( } let metrics_ret = MinerMetrics { - sha_network_hash_rate: sha_hash_rate, - randomx_network_hash_rate: randomx_hash_rate, + sha_network_hash_rate: sha_network_hashrate, + randomx_network_hash_rate: randomx_network_hashrate, cpu: CpuMinerMetrics { // hardware: cpu_public_parameters.clone(), mining: cpu_mining_status, @@ -670,43 +666,16 @@ pub async fn get_tari_wallet_details( state: tauri::State<'_, UniverseAppState>, ) -> Result { let timer = Instant::now(); - if state.is_getting_wallet_balance.load(Ordering::SeqCst) { - let read = state.cached_wallet_details.read().await; - if let Some(details) = &*read { - warn!(target: LOG_TARGET, "Already getting wallet balance, returning cached value"); - return Ok(details.clone()); - } - warn!(target: LOG_TARGET, "Already getting wallet balance"); - return Err("Already getting wallet balance".to_string()); - } - state - .is_getting_wallet_balance - .store(true, Ordering::SeqCst); - let wallet_balance = match state.wallet_manager.get_balance().await { - Ok(w) => Some(w), - Err(e) => { - if !matches!(e, WalletManagerError::WalletNotStarted) { - warn!(target: LOG_TARGET, "Error getting wallet balance: {}", e); - } - - None - } - }; let tari_address = state.tari_address.read().await; - - if timer.elapsed() > MAX_ACCEPTABLE_COMMAND_TIME { - warn!(target: LOG_TARGET, "get_tari_wallet_details took too long: {:?}", timer.elapsed()); - } + let wallet_balance = state.wallet_latest_balance.borrow().clone(); let result = TariWalletDetails { wallet_balance, tari_address_base58: tari_address.to_base58(), tari_address_emoji: tari_address.to_emoji_string(), }; - let mut lock = state.cached_wallet_details.write().await; - *lock = Some(result.clone()); - state - .is_getting_wallet_balance - .store(false, Ordering::SeqCst); + if timer.elapsed() > MAX_ACCEPTABLE_COMMAND_TIME { + warn!(target: LOG_TARGET, "get_tari_wallet_details took too long: {:?}", timer.elapsed()); + } Ok(result) } diff --git a/src-tauri/src/gpu_miner.rs b/src-tauri/src/gpu_miner.rs index f12767e78..28e40e4da 100644 --- a/src-tauri/src/gpu_miner.rs +++ b/src-tauri/src/gpu_miner.rs @@ -20,14 +20,13 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{path::PathBuf, sync::Arc}; - use log::info; use serde::Deserialize; +use std::time::Duration; +use std::{path::PathBuf, sync::Arc}; use tari_common_types::tari_address::TariAddress; -use tari_core::transactions::tari_amount::MicroMinotari; use tari_shutdown::ShutdownSignal; -use tokio::sync::RwLock; +use tokio::sync::{watch, RwLock}; use crate::app_config::GpuThreads; use crate::binaries::{Binaries, BinaryResolver}; @@ -39,7 +38,6 @@ use crate::{ process_watcher::ProcessWatcher, }; -const SHA_BLOCKS_PER_DAY: u64 = 360; const LOG_TARGET: &str = "tari::universe::gpu_miner"; #[derive(Debug, Deserialize)] @@ -66,9 +64,12 @@ pub(crate) struct GpuMiner { } impl GpuMiner { - pub fn new() -> Self { - let adapter = GpuMinerAdapter::new(vec![]); - let process_watcher = ProcessWatcher::new(adapter); + pub fn new(status_broadcast: watch::Sender) -> Self { + let adapter = GpuMinerAdapter::new(vec![], status_broadcast); + let mut process_watcher = ProcessWatcher::new(adapter); + process_watcher.health_timeout = Duration::from_secs(9); + process_watcher.poll_time = Duration::from_secs(10); + Self { watcher: Arc::new(RwLock::new(process_watcher)), is_available: false, @@ -135,53 +136,6 @@ impl GpuMiner { lock.is_pid_file_exists(base_path) } - pub async fn status( - &self, - network_hash_rate: u64, - block_reward: MicroMinotari, - ) -> Result { - let process_watcher = self.watcher.read().await; - if !process_watcher.is_running() { - return Ok(GpuMinerStatus { - hash_rate: 0, - estimated_earnings: 0, - is_mining: false, - is_available: self.is_available, - }); - } - match &process_watcher.status_monitor { - Some(status_monitor) => { - let mut status = status_monitor.status().await?; - let hash_rate = status.hash_rate; - let estimated_earnings = if network_hash_rate == 0 { - 0 - } else { - #[allow(clippy::cast_possible_truncation)] - { - ((block_reward.as_u64() as f64) - * (hash_rate as f64 / network_hash_rate as f64) - * (SHA_BLOCKS_PER_DAY as f64)) - .floor() as u64 - } - }; - // Can't be more than the max reward for a day - let estimated_earnings = std::cmp::min( - estimated_earnings, - block_reward.as_u64() * SHA_BLOCKS_PER_DAY, - ); - status.estimated_earnings = estimated_earnings; - status.is_available = self.is_available; - Ok(status) - } - None => Ok(GpuMinerStatus { - hash_rate: 0, - estimated_earnings: 0, - is_mining: false, - is_available: self.is_available, - }), - } - } - pub async fn detect(&mut self, config_dir: PathBuf) -> Result<(), anyhow::Error> { info!(target: LOG_TARGET, "Verify if gpu miner can work on the system"); diff --git a/src-tauri/src/gpu_miner_adapter.rs b/src-tauri/src/gpu_miner_adapter.rs index 6dfa4a6bb..31b13e86d 100644 --- a/src-tauri/src/gpu_miner_adapter.rs +++ b/src-tauri/src/gpu_miner_adapter.rs @@ -36,6 +36,7 @@ use std::time::Instant; use tari_common::configuration::Network; use tari_common_types::tari_address::TariAddress; use tari_shutdown::Shutdown; +use tokio::sync::watch; #[cfg(target_os = "windows")] use crate::utils::setup_utils::setup_utils::add_firewall_rule; @@ -60,10 +61,14 @@ pub(crate) struct GpuMinerAdapter { pub(crate) coinbase_extra: String, pub(crate) excluded_gpu_devices: Vec, pub(crate) gpu_devices: Vec, + latest_status_broadcast: watch::Sender, } impl GpuMinerAdapter { - pub fn new(gpu_devices: Vec) -> Self { + pub fn new( + gpu_devices: Vec, + latest_status_broadcast: watch::Sender, + ) -> Self { Self { tari_address: TariAddress::default(), gpu_grid_size: gpu_devices @@ -77,6 +82,7 @@ impl GpuMinerAdapter { coinbase_extra: "tari-universe".to_string(), excluded_gpu_devices: vec![], gpu_devices, + latest_status_broadcast, } } @@ -233,6 +239,7 @@ impl ProcessAdapter for GpuMinerAdapter { GpuMinerStatusMonitor { http_api_port, start_time: Instant::now(), + latest_status_broadcast: self.latest_status_broadcast.clone(), }, )) } @@ -250,12 +257,14 @@ impl ProcessAdapter for GpuMinerAdapter { pub struct GpuMinerStatusMonitor { http_api_port: u16, start_time: Instant, + latest_status_broadcast: watch::Sender, } #[async_trait] impl StatusMonitor for GpuMinerStatusMonitor { async fn check_health(&self) -> HealthStatus { if let Ok(status) = self.status().await { + let _result = self.latest_status_broadcast.send(status.clone()); // GPU returns 0 for first 10 seconds until it has an average if status.hash_rate > 0 || self.start_time.elapsed().as_secs() < 11 { HealthStatus::Healthy @@ -285,14 +294,12 @@ impl GpuMinerStatusMonitor { is_mining: false, hash_rate: 0, estimated_earnings: 0, - is_available: false, }); } return Ok(GpuMinerStatus { is_mining: false, hash_rate: 0, estimated_earnings: 0, - is_available: false, }); } }; @@ -305,7 +312,6 @@ impl GpuMinerStatusMonitor { is_mining: false, hash_rate: 0, estimated_earnings: 0, - is_available: false, }); } }; @@ -314,7 +320,6 @@ impl GpuMinerStatusMonitor { is_mining: true, estimated_earnings: 0, hash_rate: body.total_hashrate.ten_seconds.unwrap_or(0.0) as u64, - is_available: true, }) } } @@ -334,10 +339,9 @@ pub(crate) struct AverageHashrate { one_minute: Option, } -#[derive(Debug, Serialize, Clone)] +#[derive(Debug, Serialize, Clone, Default)] pub(crate) struct GpuMinerStatus { pub is_mining: bool, pub hash_rate: u64, pub estimated_earnings: u64, - pub is_available: bool, } diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index e775dcbc1..9eeb7d0af 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -24,13 +24,16 @@ #![cfg_attr(not(debug_assertions), windows_subsystem = "windows")] use auto_launcher::AutoLauncher; +use gpu_miner_adapter::GpuMinerStatus; use hardware::hardware_status_monitor::HardwareStatusMonitor; use log::trace; use log::{debug, error, info, warn}; +use node_adapter::BaseNodeStatus; use p2pool::models::Connections; use std::fs::{remove_dir_all, remove_file}; use tokio::sync::watch::{self}; use updates_manager::UpdatesManager; +use wallet_adapter::WalletBalance; use log4rs::config::RawConfig; use serde::Serialize; @@ -60,7 +63,7 @@ use telemetry_manager::TelemetryManager; use crate::cpu_miner::CpuMiner; use crate::app_config::WindowSettings; -use crate::commands::{CpuMinerConnection, MinerMetrics, TariWalletDetails}; +use crate::commands::{CpuMinerConnection, MinerMetrics}; #[allow(unused_imports)] use crate::external_dependencies::ExternalDependencies; use crate::feedback::Feedback; @@ -562,7 +565,9 @@ async fn setup_inner( #[derive(Clone)] struct UniverseAppState { stop_start_mutex: Arc>, - is_getting_wallet_balance: Arc, + base_node_latest_status: Arc>, + wallet_latest_balance: Arc>>, + gpu_latest_status: Arc>, is_getting_p2pool_stats: Arc, is_getting_p2pool_connections: Arc, is_getting_miner_metrics: Arc, @@ -586,7 +591,6 @@ struct UniverseAppState { updates_manager: UpdatesManager, cached_p2pool_stats: Arc>>>, cached_p2pool_connections: Arc>>>, - cached_wallet_details: Arc>>, cached_miner_metrics: Arc>>, setup_counter: Arc>>, } @@ -618,8 +622,10 @@ fn main() { // NOTE: Nothing is started at this point, so ports are not known. You can only start settings ports // and addresses once the different services have been started. // A better way is to only provide the config when we start the service. - let node_manager = NodeManager::new(); - let wallet_manager = WalletManager::new(node_manager.clone()); + let (base_node_watch_tx, base_node_watch_rx) = watch::channel(BaseNodeStatus::default()); + let node_manager = NodeManager::new(base_node_watch_tx); + let (wallet_watch_tx, wallet_watch_rx) = watch::channel::>(None); + let wallet_manager = WalletManager::new(node_manager.clone(), wallet_watch_tx); let wallet_manager2 = wallet_manager.clone(); let p2pool_manager = P2poolManager::new(); @@ -636,20 +642,21 @@ fn main() { let app_in_memory_config = Arc::new(RwLock::new(app_in_memory_config::AppInMemoryConfig::init())); + let (gpu_status_tx, gpu_status_rx) = watch::channel(GpuMinerStatus::default()); let cpu_miner: Arc> = Arc::new(CpuMiner::new().into()); - let gpu_miner: Arc> = Arc::new(GpuMiner::new().into()); + let gpu_miner: Arc> = Arc::new(GpuMiner::new(gpu_status_tx).into()); let app_config_raw = AppConfig::new(); let app_config = Arc::new(RwLock::new(app_config_raw.clone())); let telemetry_manager: TelemetryManager = TelemetryManager::new( - node_manager.clone(), cpu_miner.clone(), - gpu_miner.clone(), app_config.clone(), app_in_memory_config.clone(), Some(Network::default()), p2pool_manager.clone(), + gpu_status_rx.clone(), + base_node_watch_rx.clone(), ); let updates_manager = UpdatesManager::new(app_config.clone(), shutdown.to_signal()); @@ -662,7 +669,9 @@ fn main() { is_getting_miner_metrics: Arc::new(AtomicBool::new(false)), is_getting_p2pool_stats: Arc::new(AtomicBool::new(false)), is_getting_p2pool_connections: Arc::new(AtomicBool::new(false)), - is_getting_wallet_balance: Arc::new(AtomicBool::new(false)), + base_node_latest_status: Arc::new(base_node_watch_rx), + wallet_latest_balance: Arc::new(wallet_watch_rx), + gpu_latest_status: Arc::new(gpu_status_rx), is_setup_finished: Arc::new(RwLock::new(false)), is_getting_transaction_history: Arc::new(AtomicBool::new(false)), config: app_config.clone(), @@ -683,7 +692,6 @@ fn main() { updates_manager, cached_p2pool_stats: Arc::new(RwLock::new(None)), cached_p2pool_connections: Arc::new(RwLock::new(None)), - cached_wallet_details: Arc::new(RwLock::new(None)), cached_miner_metrics: Arc::new(RwLock::new(None)), setup_counter: Arc::new(RwLock::new(AutoRollback::new(false))), }; diff --git a/src-tauri/src/node_adapter.rs b/src-tauri/src/node_adapter.rs index 24169aafc..5802c6aae 100644 --- a/src-tauri/src/node_adapter.rs +++ b/src-tauri/src/node_adapter.rs @@ -30,7 +30,7 @@ use crate::utils::logging_utils::setup_logging; use crate::ProgressTracker; use anyhow::{anyhow, Error}; use async_trait::async_trait; -use log::info; +use log::{info, warn}; use minotari_node_grpc_client::grpc::{ BlockHeader, Empty, GetBlocksRequest, HeightRequest, NewBlockTemplateRequest, Peer, PowAlgo, SyncState, @@ -44,6 +44,7 @@ use tari_core::transactions::tari_amount::MicroMinotari; use tari_crypto::ristretto::RistrettoPublicKey; use tari_shutdown::{Shutdown, ShutdownSignal}; use tari_utilities::ByteArray; +use tokio::sync::watch; #[cfg(target_os = "windows")] use crate::utils::setup_utils::setup_utils::add_firewall_rule; @@ -57,10 +58,11 @@ pub(crate) struct MinotariNodeAdapter { pub(crate) use_pruned_mode: bool, pub(crate) tor_control_port: Option, required_initial_peers: u32, + latest_status_broadcast: watch::Sender, } impl MinotariNodeAdapter { - pub fn new() -> Self { + pub fn new(status_broadcast: watch::Sender) -> Self { let port = PortAllocator::new().assign_port_with_fallback(); let tcp_listener_port = PortAllocator::new().assign_port_with_fallback(); Self { @@ -70,6 +72,7 @@ impl MinotariNodeAdapter { required_initial_peers: 3, use_tor: false, tor_control_port: None, + latest_status_broadcast: status_broadcast, } } } @@ -209,9 +212,7 @@ impl ProcessAdapter for MinotariNodeAdapter { grpc_port: self.grpc_port, required_sync_peers: self.required_initial_peers, shutdown_signal: status_shutdown, - last_block_height: 0, - last_sha3_estimated_hashrate: 0, - last_randomx_estimated_hashrate: 0, + latest_status_broadcast: self.latest_status_broadcast.clone(), }, )) } @@ -233,52 +234,77 @@ pub enum MinotariNodeStatusMonitorError { NodeNotStarted, } +#[derive(Clone, Debug, Default)] +pub(crate) struct BaseNodeStatus { + pub sha_network_hashrate: u64, + pub randomx_network_hashrate: u64, + pub block_reward: MicroMinotari, + pub block_height: u64, + pub block_time: u64, + pub is_synced: bool, +} + #[derive(Clone)] pub struct MinotariNodeStatusMonitor { grpc_port: u16, required_sync_peers: u32, shutdown_signal: ShutdownSignal, - last_block_height: u64, - last_sha3_estimated_hashrate: u64, - last_randomx_estimated_hashrate: u64, + latest_status_broadcast: watch::Sender, } #[async_trait] impl StatusMonitor for MinotariNodeStatusMonitor { async fn check_health(&self) -> HealthStatus { - if self.get_identity().await.is_ok() { - HealthStatus::Healthy - } else { - HealthStatus::Unhealthy + match self.get_network_hash_rate_and_block_reward().await { + Ok(res) => { + let _res = self.latest_status_broadcast.send(res); + HealthStatus::Healthy + } + Err(e) => { + warn!(target: LOG_TARGET, "Error checking base node health: {:?}", e); + HealthStatus::Unhealthy + } } } } impl MinotariNodeStatusMonitor { pub async fn get_network_hash_rate_and_block_reward( - &mut self, - ) -> Result<(u64, u64, MicroMinotari, u64, u64, bool), MinotariNodeStatusMonitorError> { - // TODO: use GRPC port returned from process + &self, + ) -> Result { let mut client = BaseNodeGrpcClient::connect(format!("http://127.0.0.1:{}", self.grpc_port)) .await .map_err(|_| MinotariNodeStatusMonitorError::NodeNotStarted)?; - let res = client - .get_new_block_template(NewBlockTemplateRequest { - algo: Some(PowAlgo { pow_algo: 1 }), - max_weight: 0, - }) - .await - .map_err(|e| MinotariNodeStatusMonitorError::UnknownError(e.into()))?; - let res = res.into_inner(); + let mut reward = 0; + // The base node returns a stupid error if the template is out of sync, so try multiple times + let max_template_retries = 5; - let reward = res - .miner_data - .ok_or_else(|| { - MinotariNodeStatusMonitorError::UnknownError(anyhow!("No miner data found")) - })? - .reward; + for _ in 0..max_template_retries { + let res = match client + .get_new_block_template(NewBlockTemplateRequest { + algo: Some(PowAlgo { pow_algo: 1 }), + max_weight: 0, + }) + .await + { + Ok(r) => r, + Err(e) => { + warn!(target: LOG_TARGET, "Failed to get new block template: {}", e); + continue; + } + }; + let res = res.into_inner(); + + reward = res + .miner_data + .ok_or_else(|| { + MinotariNodeStatusMonitorError::UnknownError(anyhow!("No miner data found")) + })? + .reward; + break; + } let res = client .get_tip_info(Empty {}) @@ -300,16 +326,16 @@ impl MinotariNodeStatusMonitor { metadata.timestamp, ); - if sync_achieved && (block_height <= self.last_block_height) { - return Ok(( - self.last_sha3_estimated_hashrate, - self.last_randomx_estimated_hashrate, - MicroMinotari(reward), - block_height, - block_time, - sync_achieved, - )); - } + // if sync_achieved && (block_height <= self.last_block_height) { + // return Ok(( + // self.last_sha3_estimated_hashrate, + // self.last_randomx_estimated_hashrate, + // MicroMinotari(reward), + // block_height, + // block_time, + // sync_achieved, + // )); + // } // First try with 10 blocks let blocks = [10, 100]; @@ -342,23 +368,20 @@ impl MinotariNodeStatusMonitor { last_randomx_estimated_hashrate = difficulty.randomx_estimated_hash_rate; } - result = Ok(( - last_sha3_estimated_hashrate, - last_randomx_estimated_hashrate, - MicroMinotari(reward), + result = Ok(BaseNodeStatus { + sha_network_hashrate: last_sha3_estimated_hashrate, + randomx_network_hashrate: last_randomx_estimated_hashrate, + block_reward: MicroMinotari(reward), block_height, block_time, - sync_achieved, - )); + is_synced: sync_achieved, + }); } if last_randomx_estimated_hashrate != 0 && last_sha3_estimated_hashrate != 0 { - self.last_sha3_estimated_hashrate = last_sha3_estimated_hashrate; - self.last_randomx_estimated_hashrate = last_randomx_estimated_hashrate; break; } } - self.last_block_height = block_height; Ok(result?) } @@ -406,16 +429,27 @@ impl MinotariNodeStatusMonitor { } #[allow(clippy::too_many_lines)] - pub async fn wait_synced(&self, progress_tracker: ProgressTracker) -> Result<(), Error> { + pub async fn wait_synced( + &self, + progress_tracker: ProgressTracker, + ) -> Result<(), MinotariNodeStatusMonitorError> { let mut client = - BaseNodeGrpcClient::connect(format!("http://127.0.0.1:{}", self.grpc_port)).await?; + BaseNodeGrpcClient::connect(format!("http://127.0.0.1:{}", self.grpc_port)) + .await + .map_err(|_e| MinotariNodeStatusMonitorError::NodeNotStarted)?; loop { if self.shutdown_signal.is_triggered() { break Ok(()); } - let tip = client.get_tip_info(Empty {}).await?; - let sync_progress = client.get_sync_progress(Empty {}).await?; + let tip = client + .get_tip_info(Empty {}) + .await + .map_err(|e| MinotariNodeStatusMonitorError::UnknownError(e.into()))?; + let sync_progress = client + .get_sync_progress(Empty {}) + .await + .map_err(|e| MinotariNodeStatusMonitorError::UnknownError(e.into()))?; let tip_res = tip.into_inner(); let sync_progress = sync_progress.into_inner(); if tip_res.initial_sync_achieved { diff --git a/src-tauri/src/node_manager.rs b/src-tauri/src/node_manager.rs index 1c4e0ea4e..2aff328db 100644 --- a/src-tauri/src/node_manager.rs +++ b/src-tauri/src/node_manager.rs @@ -29,17 +29,16 @@ use log::{error, info}; use minotari_node_grpc_client::grpc::Peer; use serde_json::json; use tari_common::configuration::Network; -use tari_core::transactions::tari_amount::MicroMinotari; use tari_crypto::ristretto::RistrettoPublicKey; use tari_shutdown::ShutdownSignal; use tari_utilities::hex::Hex; use tauri_plugin_sentry::sentry; use tauri_plugin_sentry::sentry::protocol::Event; use tokio::fs; -use tokio::sync::RwLock; +use tokio::sync::{watch, RwLock}; use crate::network_utils::{get_best_block_from_block_scan, get_block_info_from_block_scan}; -use crate::node_adapter::{MinotariNodeAdapter, MinotariNodeStatusMonitorError}; +use crate::node_adapter::{BaseNodeStatus, MinotariNodeAdapter, MinotariNodeStatusMonitorError}; use crate::process_watcher::ProcessWatcher; use crate::ProgressTracker; @@ -68,7 +67,7 @@ impl Clone for NodeManager { } impl NodeManager { - pub fn new() -> Self { + pub fn new(status_broadcast: watch::Sender) -> Self { // TODO: wire up to front end // let mut use_tor = true; @@ -78,9 +77,10 @@ impl NodeManager { // use_tor = false; // } - let adapter = MinotariNodeAdapter::new(); + let adapter = MinotariNodeAdapter::new(status_broadcast); let mut process_watcher = ProcessWatcher::new(adapter); - process_watcher.health_timeout = Duration::from_secs(10); + process_watcher.poll_time = Duration::from_secs(10); + process_watcher.health_timeout = Duration::from_secs(9); process_watcher.expected_startup_time = Duration::from_secs(120); Self { @@ -164,7 +164,19 @@ impl NodeManager { .status_monitor .as_ref() .ok_or_else(|| anyhow::anyhow!("wait_synced: Node not started"))?; - status_monitor.wait_synced(progress_tracker).await + loop { + match status_monitor.wait_synced(progress_tracker.clone()).await { + Ok(_) => return Ok(()), + Err(e) => match e { + MinotariNodeStatusMonitorError::NodeNotStarted => { + continue; + } + _ => { + return Err(NodeManagerError::UnknownError(e.into()).into()); + } + }, + } + } } pub async fn wait_ready(&self) -> Result<(), NodeManagerError> { @@ -198,27 +210,6 @@ impl NodeManager { Ok(0) } - /// Returns Sha hashrate, Rx hashrate and block reward - pub async fn get_network_hash_rate_and_block_reward( - &self, - ) -> Result<(u64, u64, MicroMinotari, u64, u64, bool), NodeManagerError> { - let mut status_monitor_lock = self.watcher.write().await; - let status_monitor = status_monitor_lock - .status_monitor - .as_mut() - .ok_or_else(|| NodeManagerError::NodeNotStarted)?; - status_monitor - .get_network_hash_rate_and_block_reward() - .await - .map_err(|e| { - if matches!(e, MinotariNodeStatusMonitorError::NodeNotStarted) { - NodeManagerError::NodeNotStarted - } else { - NodeManagerError::UnknownError(e.into()) - } - }) - } - pub async fn get_identity(&self) -> Result { let status_monitor_lock = self.watcher.read().await; let status_monitor = status_monitor_lock @@ -248,12 +239,16 @@ impl NodeManager { &self, report_to_sentry: bool, ) -> Result { - let mut status_monitor_lock = self.watcher.write().await; + let status_monitor_lock = self.watcher.read().await; let status_monitor = status_monitor_lock .status_monitor - .as_mut() - .ok_or_else(|| anyhow::anyhow!("Node not started"))?; - let (_, _, _, local_tip, _, is_synced) = status_monitor + .as_ref() + .ok_or_else(|| anyhow::anyhow!("check_if_is_orphan_chain: Node not started"))?; + let BaseNodeStatus { + is_synced, + block_height: local_tip, + .. + } = status_monitor .get_network_hash_rate_and_block_reward() .await .map_err(|e| { diff --git a/src-tauri/src/process_watcher.rs b/src-tauri/src/process_watcher.rs index 128ccd959..745710da2 100644 --- a/src-tauri/src/process_watcher.rs +++ b/src-tauri/src/process_watcher.rs @@ -38,6 +38,7 @@ pub struct ProcessWatcher { watcher_task: Option>>, internal_shutdown: Shutdown, pub poll_time: tokio::time::Duration, + /// Health timeout should always be less than poll time otherwise you will have overlapping calls pub health_timeout: tokio::time::Duration, pub expected_startup_time: tokio::time::Duration, pub(crate) status_monitor: Option, @@ -76,7 +77,6 @@ impl ProcessWatcher { log_path: PathBuf, binary: Binaries, ) -> Result<(), anyhow::Error> { - info!(target: LOG_TARGET, "App shutdown triggered or terminated status for {} = {} | {}", self.adapter.name(),app_shutdown.is_triggered(),app_shutdown.is_terminated()); if app_shutdown.is_terminated() || app_shutdown.is_triggered() { return Ok(()); } @@ -221,7 +221,7 @@ async fn do_health_check( } } HealthStatus::Unhealthy => { - error!(target: LOG_TARGET, "{} is not healthy. Health check returned false", name); + warn!(target: LOG_TARGET, "{} is not healthy. Health check returned false", name); } } } diff --git a/src-tauri/src/telemetry_manager.rs b/src-tauri/src/telemetry_manager.rs index 1aabff9b6..0639fd919 100644 --- a/src-tauri/src/telemetry_manager.rs +++ b/src-tauri/src/telemetry_manager.rs @@ -21,13 +21,13 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use crate::app_in_memory_config::AppInMemoryConfig; +use crate::gpu_miner_adapter::GpuMinerStatus; use crate::hardware::hardware_status_monitor::HardwareStatusMonitor; +use crate::node_adapter::BaseNodeStatus; use crate::p2pool_manager::{self, P2poolManager}; use crate::{ app_config::{AppConfig, MiningMode}, cpu_miner::CpuMiner, - gpu_miner::GpuMiner, - node_manager::NodeManager, }; use anyhow::Result; use base64::prelude::*; @@ -46,10 +46,9 @@ use std::ops::Div; use std::pin::Pin; use std::{sync::Arc, thread::sleep, time::Duration}; use tari_common::configuration::Network; -use tari_core::transactions::tari_amount::MicroMinotari; use tari_utilities::encoding::MBase58; use tauri::Emitter; -use tokio::sync::RwLock; +use tokio::sync::{watch, RwLock}; use tokio_util::sync::CancellationToken; const LOG_TARGET: &str = "tari::universe::telemetry_manager"; @@ -199,38 +198,38 @@ pub struct TelemetryData { } pub struct TelemetryManager { - node_manager: NodeManager, cpu_miner: Arc>, - gpu_miner: Arc>, config: Arc>, in_memory_config: Arc>, pub cancellation_token: CancellationToken, node_network: Option, p2pool_manager: P2poolManager, airdrop_access_token: Arc>>, + gpu_status: watch::Receiver, + node_status: watch::Receiver, } impl TelemetryManager { pub fn new( - node_manager: NodeManager, cpu_miner: Arc>, - gpu_miner: Arc>, config: Arc>, in_memory_config: Arc>, network: Option, p2pool_manager: P2poolManager, + gpu_status: watch::Receiver, + node_status: watch::Receiver, ) -> Self { let cancellation_token = CancellationToken::new(); Self { - node_manager, cpu_miner, - gpu_miner, config, cancellation_token, node_network: network, in_memory_config, p2pool_manager, airdrop_access_token: Arc::new(RwLock::new(None)), + gpu_status, + node_status, } } @@ -296,9 +295,9 @@ impl TelemetryManager { timeout: Duration, window: tauri::Window, ) -> Result<(), TelemetryManagerError> { - let node_manager = self.node_manager.clone(); let cpu_miner = self.cpu_miner.clone(); - let gpu_miner = self.gpu_miner.clone(); + let gpu_status = self.gpu_status.clone(); + let node_status = self.node_status.clone(); let config = self.config.clone(); let cancellation_token: CancellationToken = self.cancellation_token.clone(); let network = self.node_network; @@ -314,7 +313,7 @@ impl TelemetryManager { let telemetry_collection_enabled = config_cloned.read().await.allow_telemetry(); if telemetry_collection_enabled { let airdrop_access_token_validated = validate_jwt(airdrop_access_token.clone()).await; - let telemetry_data = get_telemetry_data(cpu_miner.clone(), gpu_miner.clone(), node_manager.clone(), p2pool_manager_cloned.clone(), config.clone(), network).await; + let telemetry_data = get_telemetry_data(&cpu_miner, &gpu_status, &node_status, &p2pool_manager_cloned, &config, network).await; let airdrop_api_url = in_memory_config_cloned.read().await.airdrop_api_url.clone(); handle_telemetry_data(telemetry_data, airdrop_api_url, airdrop_access_token_validated, window.clone()).await; } @@ -370,40 +369,33 @@ fn decode_jwt_claims(t: &str) -> Option { #[allow(clippy::too_many_lines)] async fn get_telemetry_data( - cpu_miner: Arc>, - gpu_miner: Arc>, - node_manager: NodeManager, - p2pool_manager: p2pool_manager::P2poolManager, - config: Arc>, + cpu_miner: &RwLock, + gpu_latest_miner_stats: &watch::Receiver, + node_latest_status: &watch::Receiver, + p2pool_manager: &p2pool_manager::P2poolManager, + config: &RwLock, network: Option, ) -> Result { - let (sha_hash_rate, randomx_hash_rate, block_reward, block_height, _block_time, is_synced) = - node_manager - .get_network_hash_rate_and_block_reward() - .await - .unwrap_or((0, 0, MicroMinotari(0), 0, 0, false)); + let BaseNodeStatus { + randomx_network_hashrate, + block_reward, + block_height, + is_synced, + .. + } = node_latest_status.borrow().clone(); let cpu_miner = cpu_miner.read().await; - let cpu = match cpu_miner.status(randomx_hash_rate, block_reward).await { + let cpu = match cpu_miner + .status(randomx_network_hashrate, block_reward) + .await + { Ok(cpu) => cpu, Err(e) => { warn!(target: LOG_TARGET, "Error getting cpu miner status: {:?}", e); return Err(TelemetryManagerError::Other(e)); } }; - let gpu_miner_lock = gpu_miner.read().await; - let gpu_status = match gpu_miner_lock.status(sha_hash_rate, block_reward).await { - Ok(gpu) => gpu, - Err(e) => { - warn!(target: LOG_TARGET, "Error getting gpu miner status: {:?}", e); - return Err(TelemetryManagerError::Other(e)); - } - }; - - // let hardware_status = HardwareMonitor::current() - // .write() - // .await - // .read_hardware_parameters(); + let gpu_status = gpu_latest_miner_stats.borrow().clone(); let gpu_hardware_parameters = HardwareStatusMonitor::current() .get_gpu_public_properties() diff --git a/src-tauri/src/wallet_adapter.rs b/src-tauri/src/wallet_adapter.rs index 40e0f6d99..8029185fa 100644 --- a/src-tauri/src/wallet_adapter.rs +++ b/src-tauri/src/wallet_adapter.rs @@ -39,6 +39,7 @@ use tari_core::transactions::tari_amount::MicroMinotari; use tari_crypto::ristretto::RistrettoPublicKey; use tari_shutdown::Shutdown; use tari_utilities::hex::Hex; +use tokio::sync::watch; #[cfg(target_os = "windows")] use crate::utils::setup_utils::setup_utils::add_firewall_rule; @@ -53,10 +54,11 @@ pub struct WalletAdapter { pub(crate) spend_key: String, pub(crate) tcp_listener_port: u16, pub(crate) grpc_port: u16, + balance_broadcast: watch::Sender>, } impl WalletAdapter { - pub fn new(use_tor: bool) -> Self { + pub fn new(use_tor: bool, balance_broadcast: watch::Sender>) -> Self { let tcp_listener_port = PortAllocator::new().assign_port_with_fallback(); let grpc_port = PortAllocator::new().assign_port_with_fallback(); Self { @@ -67,6 +69,7 @@ impl WalletAdapter { spend_key: "".to_string(), tcp_listener_port, grpc_port, + balance_broadcast, } } } @@ -196,6 +199,7 @@ impl ProcessAdapter for WalletAdapter { }, WalletStatusMonitor { grpc_port: self.grpc_port, + latest_balance_broadcast: self.balance_broadcast.clone(), }, )) } @@ -222,15 +226,21 @@ pub enum WalletStatusMonitorError { #[derive(Clone)] pub struct WalletStatusMonitor { grpc_port: u16, + latest_balance_broadcast: watch::Sender>, } #[async_trait] impl StatusMonitor for WalletStatusMonitor { async fn check_health(&self) -> HealthStatus { - if self.get_balance().await.is_ok() { - HealthStatus::Healthy - } else { - HealthStatus::Unhealthy + match self.get_balance().await { + Ok(b) => { + let _result = self.latest_balance_broadcast.send(Some(b)); + HealthStatus::Healthy + } + Err(e) => { + warn!(target: LOG_TARGET, "Wallet health check failed: {}", e); + HealthStatus::Unhealthy + } } } } diff --git a/src-tauri/src/wallet_manager.rs b/src-tauri/src/wallet_manager.rs index 564d8892f..be1d0c4cb 100644 --- a/src-tauri/src/wallet_manager.rs +++ b/src-tauri/src/wallet_manager.rs @@ -30,6 +30,7 @@ use futures_util::future::FusedFuture; use std::path::PathBuf; use std::sync::Arc; use tari_shutdown::ShutdownSignal; +use tokio::sync::watch; use tokio::sync::RwLock; #[derive(thiserror::Error, Debug)] @@ -57,11 +58,14 @@ impl Clone for WalletManager { } impl WalletManager { - pub fn new(node_manager: NodeManager) -> Self { + pub fn new( + node_manager: NodeManager, + wallet_watch_tx: watch::Sender>, + ) -> Self { // TODO: wire up to front end let use_tor = false; - let adapter = WalletAdapter::new(use_tor); + let adapter = WalletAdapter::new(use_tor, wallet_watch_tx); let process_watcher = ProcessWatcher::new(adapter); Self {