Skip to content

Commit

Permalink
update compact.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Aug 25, 2019
1 parent 30330ed commit 8fdace7
Showing 1 changed file with 77 additions and 0 deletions.
77 changes: 77 additions & 0 deletions examples/compact.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
extern crate timely;
extern crate differential_dataflow;

use differential_dataflow::input::Input;
use differential_dataflow::operators::Threshold;

fn main() {

let large: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let small: usize = std::env::args().nth(2).unwrap().parse().unwrap();
let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
let total: usize = std::env::args().nth(4).unwrap().parse().unwrap();

// define a new timely dataflow computation.
timely::execute_from_args(std::env::args().skip(3), move |worker| {

let timer = ::std::time::Instant::now();

let mut probe = timely::dataflow::operators::probe::Handle::new();

// create a dataflow managing an ever-changing edge collection.
let mut handle = worker.dataflow(|scope| {
let (handle, input) = scope.new_collection();
input.distinct().probe_with(&mut probe);
handle
});

println!("{:?}:\tloading edges", timer.elapsed());

let mut next = batch;
let mut value = worker.index();
while value < total {
if value >= next {
handle.advance_to(next);
handle.flush();
next += batch;
while probe.less_than(handle.time()) { worker.step(); }
// println!("{:?}\tround {} loaded", timer.elapsed(), next);
}
handle.advance_to(value);
handle.insert(value % large);
handle.insert(value % small);
value += worker.peers();
}

handle.advance_to(total);
handle.flush();
while probe.less_than(handle.time()) { worker.step(); }

println!("{:?}\tdata loaded", timer.elapsed());

let mut next = batch;
let mut value = worker.index();
while value < total {
if value >= next {
handle.advance_to(total + next);
handle.flush();
next += batch;
while probe.less_than(handle.time()) { worker.step(); }
// println!("{:?}\tround {} unloaded", timer.elapsed(), next);
}
handle.advance_to(total + value);
handle.remove(value % large);
handle.remove(value % small);
value += worker.peers();
}

handle.advance_to(total + total);
handle.flush();
while probe.less_than(handle.time()) { worker.step(); }

println!("{:?}\tdata unloaded", timer.elapsed());

while worker.step() { }

}).unwrap();
}

0 comments on commit 8fdace7

Please sign in to comment.