Skip to content

Commit

Permalink
feature: txhashset downloading progress display on tui (mimblewimble#…
Browse files Browse the repository at this point in the history
  • Loading branch information
garyyu authored and ignopeverell committed Oct 12, 2018
1 parent 3fb4669 commit 5c0eb11
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 22 deletions.
4 changes: 2 additions & 2 deletions p2p/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl<'a> Message<'a> {
read_body(&self.header, self.conn)
}

pub fn copy_attachment(&mut self, len: usize, writer: &mut Write) -> Result<(), Error> {
pub fn copy_attachment(&mut self, len: usize, writer: &mut Write) -> Result<usize, Error> {
let mut written = 0;
while written < len {
let read_len = cmp::min(8000, len - written);
Expand All @@ -91,7 +91,7 @@ impl<'a> Message<'a> {
writer.write_all(&mut buf)?;
written += read_len;
}
Ok(())
Ok(written)
}

/// Respond to the message with the provided message type and body
Expand Down
11 changes: 11 additions & 0 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::fs::File;
use std::net::{SocketAddr, TcpStream};
use std::sync::{Arc, RwLock};

use chrono::prelude::{DateTime, Utc};
use conn;
use core::core;
use core::core::hash::{Hash, Hashed};
Expand Down Expand Up @@ -466,6 +467,16 @@ impl ChainAdapter for TrackingAdapter {
fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: SocketAddr) -> bool {
self.adapter.txhashset_write(h, txhashset_data, peer_addr)
}

fn txhashset_download_update(
&self,
start_time: DateTime<Utc>,
downloaded_size: u64,
total_size: u64,
) -> bool {
self.adapter
.txhashset_download_update(start_time, downloaded_size, total_size)
}
}

impl NetAdapter for TrackingAdapter {
Expand Down
10 changes: 10 additions & 0 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,16 @@ impl ChainAdapter for Peers {
true
}
}

fn txhashset_download_update(
&self,
start_time: DateTime<Utc>,
downloaded_size: u64,
total_size: u64,
) -> bool {
self.adapter
.txhashset_download_update(start_time, downloaded_size, total_size)
}
}

impl NetAdapter for Peers {
Expand Down
20 changes: 19 additions & 1 deletion p2p/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp;
use std::env;
use std::fs::File;
use std::io::{self, BufWriter};
use std::net::{SocketAddr, TcpStream};
use std::sync::Arc;
use std::time;

use chrono::prelude::Utc;
use conn::{Message, MessageHandler, Response};
use core::core::{self, hash::Hash, CompactBlock};
use core::{global, ser};
Expand Down Expand Up @@ -255,11 +257,27 @@ impl MessageHandler for Protocol {
);
return Err(Error::BadMessage);
}

let download_start_time = Utc::now();
self.adapter
.txhashset_download_update(download_start_time, 0, sm_arch.bytes);

let mut tmp = env::temp_dir();
tmp.push("txhashset.zip");
let mut save_txhashset_to_file = |file| -> Result<(), Error> {
let mut tmp_zip = BufWriter::new(File::create(file)?);
msg.copy_attachment(sm_arch.bytes as usize, &mut tmp_zip)?;
let total_size = sm_arch.bytes as usize;
let mut downloaded_size: usize = 0;
let mut request_size = 48_000;
while request_size > 0 {
downloaded_size += msg.copy_attachment(request_size, &mut tmp_zip)?;
request_size = cmp::min(48_000, total_size - downloaded_size);
self.adapter.txhashset_download_update(
download_start_time,
downloaded_size as u64,
total_size as u64,
);
}
tmp_zip.into_inner().unwrap().sync_all()?;
Ok(())
};
Expand Down
10 changes: 10 additions & 0 deletions p2p/src/serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::{io, thread};

use lmdb;

use chrono::prelude::{DateTime, Utc};
use core::core;
use core::core::hash::Hash;
use core::pow::Difficulty;
Expand Down Expand Up @@ -266,6 +267,15 @@ impl ChainAdapter for DummyAdapter {
fn txhashset_write(&self, _h: Hash, _txhashset_data: File, _peer_addr: SocketAddr) -> bool {
false
}

fn txhashset_download_update(
&self,
_start_time: DateTime<Utc>,
_downloaded_size: u64,
_total_size: u64,
) -> bool {
false
}
}

impl NetAdapter for DummyAdapter {
Expand Down
8 changes: 8 additions & 0 deletions p2p/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,14 @@ pub trait ChainAdapter: Sync + Send {
/// state data.
fn txhashset_receive_ready(&self) -> bool;

/// Update txhashset downloading progress
fn txhashset_download_update(
&self,
start_time: DateTime<Utc>,
downloaded_size: u64,
total_size: u64,
) -> bool;

/// Writes a reading view on a txhashset state that's been provided to us.
/// If we're willing to accept that new state, the data stream will be
/// read as a zip file, unzipped and the resulting state files should be
Expand Down
28 changes: 26 additions & 2 deletions servers/src/common/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::thread;
use std::time::Instant;

use chain::{self, ChainAdapter, Options, Tip};
use chrono::prelude::{DateTime, Utc};
use common::types::{self, ChainValidationMode, ServerConfig, SyncState, SyncStatus};
use core::core::hash::{Hash, Hashed};
use core::core::transaction::Transaction;
Expand Down Expand Up @@ -327,7 +328,29 @@ impl p2p::ChainAdapter for NetToChainAdapter {
}

fn txhashset_receive_ready(&self) -> bool {
self.sync_state.status() == SyncStatus::TxHashsetDownload
match self.sync_state.status() {
SyncStatus::TxHashsetDownload { .. } => true,
_ => false,
}
}

fn txhashset_download_update(
&self,
start_time: DateTime<Utc>,
downloaded_size: u64,
total_size: u64,
) -> bool {
match self.sync_state.status() {
SyncStatus::TxHashsetDownload { .. } => {
self.sync_state
.update_txhashset_download(SyncStatus::TxHashsetDownload {
start_time,
downloaded_size,
total_size,
})
}
_ => false,
}
}

/// Writes a reading view on a txhashset state that's been provided to us.
Expand All @@ -336,7 +359,8 @@ impl p2p::ChainAdapter for NetToChainAdapter {
/// rewound to the provided indexes.
fn txhashset_write(&self, h: Hash, txhashset_data: File, _peer_addr: SocketAddr) -> bool {
// check status again after download, in case 2 txhashsets made it somehow
if self.sync_state.status() != SyncStatus::TxHashsetDownload {
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
} else {
return true;
}

Expand Down
18 changes: 17 additions & 1 deletion servers/src/common/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::{Arc, RwLock};

use api;
use chain;
use chrono::prelude::{DateTime, Utc};
use core::global::ChainTypes;
use core::{core, pow};
use p2p;
Expand Down Expand Up @@ -255,7 +256,11 @@ pub enum SyncStatus {
highest_height: u64,
},
/// Downloading the various txhashsets
TxHashsetDownload,
TxHashsetDownload {
start_time: DateTime<Utc>,
downloaded_size: u64,
total_size: u64,
},
/// Setting up before validation
TxHashsetSetup,
/// Validating the full state
Expand Down Expand Up @@ -316,6 +321,17 @@ impl SyncState {
*status = new_status;
}

/// Update txhashset downloading progress
pub fn update_txhashset_download(&self, new_status: SyncStatus) -> bool {
if let SyncStatus::TxHashsetDownload { .. } = new_status {
let mut status = self.current.write().unwrap();
*status = new_status;
true
} else {
false
}
}

/// Communicate sync error
pub fn set_sync_error(&self, error: Error) {
*self.sync_error.write().unwrap() = Some(error);
Expand Down
36 changes: 22 additions & 14 deletions servers/src/grin/sync/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@ impl StateSync {

// check peer connection status of this sync
if let Some(ref peer) = self.fast_sync_peer {
if !peer.is_connected() && SyncStatus::TxHashsetDownload == self.sync_state.status() {
sync_need_restart = true;
info!(
LOGGER,
"fast_sync: peer connection lost: {:?}. restart", peer.info.addr,
);
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
if !peer.is_connected() {
sync_need_restart = true;
info!(
LOGGER,
"fast_sync: peer connection lost: {:?}. restart", peer.info.addr,
);
}
}
}

Expand All @@ -106,13 +108,15 @@ impl StateSync {
if header_head.height == highest_height {
let (go, download_timeout) = self.fast_sync_due();

if download_timeout && SyncStatus::TxHashsetDownload == self.sync_state.status() {
error!(
LOGGER,
"fast_sync: TxHashsetDownload status timeout in 10 minutes!"
);
self.sync_state
.set_sync_error(Error::P2P(p2p::Error::Timeout));
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
if download_timeout {
error!(
LOGGER,
"fast_sync: TxHashsetDownload status timeout in 10 minutes!"
);
self.sync_state
.set_sync_error(Error::P2P(p2p::Error::Timeout));
}
}

if go {
Expand All @@ -136,7 +140,11 @@ impl StateSync {
}
}

self.sync_state.update(SyncStatus::TxHashsetDownload);
self.sync_state.update(SyncStatus::TxHashsetDownload {
start_time: Utc::now(),
downloaded_size: 0,
total_size: 0,
});
}
}
true
Expand Down
34 changes: 32 additions & 2 deletions src/bin/tui/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

//! Basic status view definition
use chrono::prelude::Utc;
use cursive::direction::Orientation;
use cursive::traits::Identifiable;
use cursive::view::View;
Expand All @@ -26,6 +27,8 @@ use tui::types::TUIStatusListener;
use servers::common::types::SyncStatus;
use servers::ServerStats;

const NANO_TO_MILLIS: f64 = 1.0 / 1_000_000.0;

pub struct TUIStatusView;

impl TUIStatusListener for TUIStatusView {
Expand Down Expand Up @@ -101,8 +104,35 @@ impl TUIStatusListener for TUIStatusView {
};
format!("Downloading headers: {}%, step 1/4", percent)
}
SyncStatus::TxHashsetDownload => {
"Downloading chain state for fast sync, step 2/4".to_string()
SyncStatus::TxHashsetDownload {
start_time,
downloaded_size,
total_size,
} => {
if total_size > 0 {
let percent = if total_size > 0 {
downloaded_size * 100 / total_size
} else {
0
};
let start = start_time.timestamp_nanos();
let fin = Utc::now().timestamp_nanos();
let dur_ms = (fin - start) as f64 * NANO_TO_MILLIS;

format!("Downloading {}(MB) chain state for fast sync: {}% at {:.1?}(kB/s), step 2/4",
total_size / 1_000_000,
percent,
if dur_ms > 1.0f64 { downloaded_size as f64 / dur_ms as f64 } else { 0f64 },
)
} else {
let start = start_time.timestamp_millis();
let fin = Utc::now().timestamp_millis();
let dur_secs = (fin - start) / 1000;

format!("Downloading chain state for fast sync. Waiting remote peer to start: {}s, step 2/4",
dur_secs,
)
}
}
SyncStatus::TxHashsetSetup => {
"Preparing chain state for validation, step 3/4".to_string()
Expand Down

0 comments on commit 5c0eb11

Please sign in to comment.