Skip to content

Commit

Permalink
Moved syncing log out of the client (openethereum#1670)
Browse files Browse the repository at this point in the history
  • Loading branch information
arkpar authored and gavofyork committed Jul 20, 2016
1 parent 0cba70f commit b007770
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 103 deletions.
3 changes: 2 additions & 1 deletion ethcore/src/client/chain_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ pub trait ChainNotify : Send + Sync {
_invalid: Vec<H256>,
_enacted: Vec<H256>,
_retracted: Vec<H256>,
_sealed: Vec<H256>) {
_sealed: Vec<H256>,
_duration: u64) {
// does nothing by default
}

Expand Down
94 changes: 32 additions & 62 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ use std::sync::{Arc, Weak};
use std::path::{Path, PathBuf};
use std::fmt;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::time::{Instant, Duration};
use std::time::{Instant};
use time::precise_time_ns;

// util
use util::{journaldb, rlp, Bytes, Stream, View, PerfTimer, Itertools, Mutex, RwLock, Colour};
use util::{journaldb, rlp, Bytes, Stream, View, PerfTimer, Itertools, Mutex, RwLock};
use util::journaldb::JournalDB;
use util::rlp::{RlpStream, Rlp, UntrustedRlp};
use util::numbers::*;
Expand Down Expand Up @@ -135,10 +135,8 @@ pub struct Client {
sleep_state: Mutex<SleepState>,
liveness: AtomicBool,
io_channel: IoChannel<ClientIoMessage>,
notify: RwLock<Option<Weak<ChainNotify>>>,
notify: RwLock<Vec<Weak<ChainNotify>>>,
queue_transactions: AtomicUsize,
skipped: AtomicUsize,
last_import: Mutex<Instant>,
last_hashes: RwLock<VecDeque<H256>>,
}

Expand Down Expand Up @@ -229,24 +227,24 @@ impl Client {
trie_factory: TrieFactory::new(config.trie_spec),
miner: miner,
io_channel: message_channel,
notify: RwLock::new(None),
notify: RwLock::new(Vec::new()),
queue_transactions: AtomicUsize::new(0),
skipped: AtomicUsize::new(0),
last_import: Mutex::new(Instant::now()),
last_hashes: RwLock::new(VecDeque::new()),
};
Ok(Arc::new(client))
}

/// Sets the actor to be notified on certain events
pub fn set_notify(&self, target: &Arc<ChainNotify>) {
let mut write_lock = self.notify.write();
*write_lock = Some(Arc::downgrade(target));
/// Adds an actor to be notified on certain events
pub fn add_notify(&self, target: &Arc<ChainNotify>) {
self.notify.write().push(Arc::downgrade(target));
}

fn notify(&self) -> Option<Arc<ChainNotify>> {
let read_lock = self.notify.read();
read_lock.as_ref().and_then(|weak| weak.upgrade())
fn notify<F>(&self, f: F) where F: Fn(&ChainNotify) {
for np in self.notify.read().iter() {
if let Some(n) = np.upgrade() {
f(&*n);
}
}
}

/// Flush the block import queue.
Expand Down Expand Up @@ -357,28 +355,24 @@ impl Client {
/// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self) -> usize {
let max_blocks_to_import = 64;
let (imported_blocks, import_results, invalid_blocks, original_best, imported) = {
let (imported_blocks, import_results, invalid_blocks, original_best, imported, duration) = {
let mut imported_blocks = Vec::with_capacity(max_blocks_to_import);
let mut invalid_blocks = HashSet::new();
let mut import_results = Vec::with_capacity(max_blocks_to_import);

let _import_lock = self.import_lock.lock();
let _timer = PerfTimer::new("import_verified_blocks");
let start = precise_time_ns();
let blocks = self.block_queue.drain(max_blocks_to_import);

let original_best = self.chain_info().best_block_hash;

for block in blocks {
let header = &block.header;
let start = precise_time_ns();

if invalid_blocks.contains(&header.parent_hash) {
invalid_blocks.insert(header.hash());
continue;
}
let tx_count = block.transactions.len();
let size = block.bytes.len();

let closed_block = self.check_and_close_block(&block);
if let Err(_) = closed_block {
invalid_blocks.insert(header.hash());
Expand All @@ -392,30 +386,6 @@ impl Client {
import_results.push(route);

self.report.write().accrue_block(&block);

let duration_ns = precise_time_ns() - start;

let mut last_import = self.last_import.lock();
if Instant::now() > *last_import + Duration::from_secs(1) {
let queue_info = self.queue_info();
let importing = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3;
if !importing {
let skipped = self.skipped.load(AtomicOrdering::Relaxed);
info!(target: "import", "Imported {} {} ({} txs, {} Mgas, {} ms, {} KiB){}",
Colour::White.bold().paint(format!("#{}", header.number())),
Colour::White.bold().paint(format!("{}", header.hash())),
Colour::Yellow.bold().paint(format!("{}", tx_count)),
Colour::Yellow.bold().paint(format!("{:.2}", header.gas_used.low_u64() as f32 / 1000000f32)),
Colour::Purple.bold().paint(format!("{:.2}", duration_ns as f32 / 1000000f32)),
Colour::Blue.bold().paint(format!("{:.2}", size as f32 / 1024f32)),
if skipped > 0 { format!(" + another {} block(s)", Colour::Red.bold().paint(format!("{}", skipped))) } else { String::new() }
);
*last_import = Instant::now();
}
self.skipped.store(0, AtomicOrdering::Relaxed);
} else {
self.skipped.fetch_add(1, AtomicOrdering::Relaxed);
}
}

let imported = imported_blocks.len();
Expand All @@ -429,7 +399,8 @@ impl Client {
self.block_queue.mark_as_good(&imported_blocks);
}
}
(imported_blocks, import_results, invalid_blocks, original_best, imported)
let duration_ns = precise_time_ns() - start;
(imported_blocks, import_results, invalid_blocks, original_best, imported, duration_ns)
};

{
Expand All @@ -440,15 +411,16 @@ impl Client {
self.miner.chain_new_blocks(self, &imported_blocks, &invalid_blocks, &enacted, &retracted);
}

if let Some(notify) = self.notify() {
self.notify(|notify| {
notify.new_blocks(
imported_blocks,
invalid_blocks,
enacted,
retracted,
imported_blocks.clone(),
invalid_blocks.clone(),
enacted.clone(),
retracted.clone(),
Vec::new(),
duration,
);
}
});
}
}

Expand Down Expand Up @@ -640,9 +612,7 @@ impl Client {
fn wake_up(&self) {
if !self.liveness.load(AtomicOrdering::Relaxed) {
self.liveness.store(true, AtomicOrdering::Relaxed);
if let Some(notify) = self.notify() {
notify.start();
}
self.notify(|n| n.start());
trace!(target: "mode", "wake_up: Waking.");
}
}
Expand All @@ -652,9 +622,7 @@ impl Client {
// only sleep if the import queue is mostly empty.
if self.queue_info().total_queue_size() <= MAX_QUEUE_SIZE_TO_SLEEP_ON {
self.liveness.store(false, AtomicOrdering::Relaxed);
if let Some(notify) = self.notify() {
notify.stop();
}
self.notify(|n| n.stop());
trace!(target: "mode", "sleep: Sleeping.");
} else {
trace!(target: "mode", "sleep: Cannot sleep - syncing ongoing.");
Expand Down Expand Up @@ -1029,6 +997,7 @@ impl MiningBlockChainClient for Client {
fn import_sealed_block(&self, block: SealedBlock) -> ImportResult {
let _import_lock = self.import_lock.lock();
let _timer = PerfTimer::new("import_sealed_block");
let start = precise_time_ns();

let original_best = self.chain_info().best_block_hash;

Expand All @@ -1043,15 +1012,16 @@ impl MiningBlockChainClient for Client {
let (enacted, retracted) = self.calculate_enacted_retracted(&[route]);
self.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted);

if let Some(notify) = self.notify() {
self.notify(|notify| {
notify.new_blocks(
vec![h.clone()],
vec![],
enacted,
retracted,
enacted.clone(),
retracted.clone(),
vec![h.clone()],
precise_time_ns() - start,
);
}
});
}

if self.chain_info().best_block_hash != original_best {
Expand Down
4 changes: 2 additions & 2 deletions ethcore/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ impl ClientService {
}

/// Set the actor to be notified on certain chain events
pub fn set_notify(&self, notify: &Arc<ChainNotify>) {
self.client.set_notify(notify);
pub fn add_notify(&self, notify: &Arc<ChainNotify>) {
self.client.add_notify(notify);
}
}

Expand Down
91 changes: 64 additions & 27 deletions parity/informant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ extern crate ansi_term;
use self::ansi_term::Colour::{White, Yellow, Green, Cyan, Blue};
use self::ansi_term::Style;

use std::sync::{Arc};
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::time::{Instant, Duration};
use std::ops::{Deref, DerefMut};
use isatty::{stdout_isatty};
use ethsync::{SyncStatus, NetworkConfiguration};
use util::{Uint, RwLock};
use ethsync::{SyncProvider, ManageNetwork};
use util::{Uint, RwLock, Mutex, H256, Colour};
use ethcore::client::*;
use ethcore::views::BlockView;
use number_prefix::{binary_prefix, Standalone, Prefixed};

pub struct Informant {
Expand All @@ -32,18 +35,11 @@ pub struct Informant {
report: RwLock<Option<ClientReport>>,
last_tick: RwLock<Instant>,
with_color: bool,
}

impl Default for Informant {
fn default() -> Self {
Informant {
chain_info: RwLock::new(None),
cache_info: RwLock::new(None),
report: RwLock::new(None),
last_tick: RwLock::new(Instant::now()),
with_color: true,
}
}
client: Arc<Client>,
sync: Option<Arc<SyncProvider>>,
net: Option<Arc<ManageNetwork>>,
last_import: Mutex<Instant>,
skipped: AtomicUsize,
}

trait MillisecondDuration {
Expand All @@ -58,13 +54,18 @@ impl MillisecondDuration for Duration {

impl Informant {
/// Make a new instance potentially `with_color` output.
pub fn new(with_color: bool) -> Self {
pub fn new(client: Arc<Client>, sync: Option<Arc<SyncProvider>>, net: Option<Arc<ManageNetwork>>, with_color: bool) -> Self {
Informant {
chain_info: RwLock::new(None),
cache_info: RwLock::new(None),
report: RwLock::new(None),
last_tick: RwLock::new(Instant::now()),
with_color: with_color,
client: client,
sync: sync,
net: net,
last_import: Mutex::new(Instant::now()),
skipped: AtomicUsize::new(0),
}
}

Expand All @@ -77,25 +78,28 @@ impl Informant {


#[cfg_attr(feature="dev", allow(match_bool))]
pub fn tick(&self, client: &Client, maybe_status: Option<(SyncStatus, NetworkConfiguration)>) {
pub fn tick(&self) {
let elapsed = self.last_tick.read().elapsed();
if elapsed < Duration::from_secs(5) {
return;
}

let chain_info = client.chain_info();
let queue_info = client.queue_info();
let cache_info = client.blockchain_cache_info();
let chain_info = self.client.chain_info();
let queue_info = self.client.queue_info();
let cache_info = self.client.blockchain_cache_info();
let network_config = self.net.as_ref().map(|n| n.network_config());
let sync_status = self.sync.as_ref().map(|s| s.status());

let importing = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3;
let importing = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3
|| self.sync.as_ref().map_or(false, |s| s.status().is_major_syncing());
if !importing && elapsed < Duration::from_secs(30) {
return;
}

*self.last_tick.write() = Instant::now();

let mut write_report = self.report.write();
let report = client.report();
let report = self.client.report();

let paint = |c: Style, t: String| match self.with_color && stdout_isatty() {
true => format!("{}", c.paint(t)),
Expand All @@ -120,8 +124,8 @@ impl Informant {
),
false => String::new(),
},
match maybe_status {
Some((ref sync_info, ref net_config)) => format!("{}{}/{}/{} peers",
match (&sync_status, &network_config) {
(&Some(ref sync_info), &Some(ref net_config)) => format!("{}{}/{}/{} peers",
match importing {
true => format!("{} ", paint(Green.bold(), format!("{:>8}", format!("#{}", sync_info.last_imported_block_number.unwrap_or(chain_info.best_block_number))))),
false => String::new(),
Expand All @@ -130,14 +134,14 @@ impl Informant {
paint(Cyan.bold(), format!("{:2}", sync_info.num_peers)),
paint(Cyan.bold(), format!("{:2}", net_config.ideal_peers))
),
None => String::new(),
_ => String::new(),
},
format!("{} db {} chain {} queue{}",
format!("{} db {} chain {} queue{}",
paint(Blue.bold(), format!("{:>8}", Informant::format_bytes(report.state_db_mem))),
paint(Blue.bold(), format!("{:>8}", Informant::format_bytes(cache_info.total()))),
paint(Blue.bold(), format!("{:>8}", Informant::format_bytes(queue_info.mem_used))),
match maybe_status {
Some((ref sync_info, _)) => format!(" {} sync", paint(Blue.bold(), format!("{:>8}", Informant::format_bytes(sync_info.mem_used)))),
match sync_status {
Some(ref sync_info) => format!(" {} sync", paint(Blue.bold(), format!("{:>8}", Informant::format_bytes(sync_info.mem_used)))),
_ => String::new(),
}
)
Expand All @@ -149,3 +153,36 @@ impl Informant {
}
}

impl ChainNotify for Informant {
fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, enacted: Vec<H256>, _retracted: Vec<H256>, _sealed: Vec<H256>, duration: u64) {
let mut last_import = self.last_import.lock();
if Instant::now() > *last_import + Duration::from_secs(1) {
let queue_info = self.client.queue_info();
let importing = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3
|| self.sync.as_ref().map_or(false, |s| s.status().is_major_syncing());
if !importing {
if let Some(block) = enacted.last().and_then(|h| self.client.block(BlockID::Hash(h.clone()))) {
let view = BlockView::new(&block);
let header = view.header();
let tx_count = view.transactions_count();
let size = block.len();
let skipped = self.skipped.load(AtomicOrdering::Relaxed);
info!(target: "import", "Imported {} {} ({} txs, {} Mgas, {} ms, {} KiB){}",
Colour::White.bold().paint(format!("#{}", header.number())),
Colour::White.bold().paint(format!("{}", header.hash())),
Colour::Yellow.bold().paint(format!("{}", tx_count)),
Colour::Yellow.bold().paint(format!("{:.2}", header.gas_used.low_u64() as f32 / 1000000f32)),
Colour::Purple.bold().paint(format!("{:.2}", duration as f32 / 1000000f32)),
Colour::Blue.bold().paint(format!("{:.2}", size as f32 / 1024f32)),
if skipped > 0 { format!(" + another {} block(s)", Colour::Red.bold().paint(format!("{}", skipped))) } else { String::new() }
);
*last_import = Instant::now();
}
}
self.skipped.store(0, AtomicOrdering::Relaxed);
} else {
self.skipped.fetch_add(enacted.len(), AtomicOrdering::Relaxed);
}
}
}

Loading

0 comments on commit b007770

Please sign in to comment.