Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Sep 13, 2018
1 parent 49e70cc commit 816858b
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 47 deletions.
11 changes: 1 addition & 10 deletions examples/arrange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,12 @@ use rand::{Rng, SeedableRng, StdRng};

use differential_dataflow::input::Input;
use differential_dataflow::AsCollection;
use differential_dataflow::operators::arrange::{Arrange, ArrangeByKey};
use differential_dataflow::operators::arrange::ArrangeByKey;
use differential_dataflow::operators::group::Group;
use differential_dataflow::operators::join::JoinCore;
use differential_dataflow::operators::Iterate;
use differential_dataflow::operators::Consolidate;

use differential_dataflow::trace::Trace;
use differential_dataflow::trace::implementations::ord::OrdValSpineAbom;

// use differential_dataflow::trace::implementations::ord::OrdValSpine;
// use differential_dataflow::trace::{Cursor, Trace};
// use differential_dataflow::trace::Batch;
// use differential_dataflow::hashable::OrdWrapper;
// use differential_dataflow::trace::TraceReader;

fn main() {

let nodes: u32 = std::env::args().nth(1).unwrap().parse().unwrap();
Expand Down
118 changes: 118 additions & 0 deletions examples/bfs_threadless.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
extern crate rand;
extern crate timely;
// extern crate timely_communication;
extern crate differential_dataflow;

use rand::{Rng, SeedableRng, StdRng};

use timely::dataflow::*;
use timely::dataflow::operators::probe::Handle;

use differential_dataflow::input::Input;
use differential_dataflow::Collection;
use differential_dataflow::operators::*;
use differential_dataflow::lattice::Lattice;

type Node = u32;
type Edge = (Node, Node);

fn main() {

let nodes: u32 = std::env::args().nth(1).unwrap().parse().unwrap();
let edges: u32 = std::env::args().nth(2).unwrap().parse().unwrap();
let batch: u32 = std::env::args().nth(3).unwrap().parse().unwrap();
let rounds: u32 = std::env::args().nth(4).unwrap().parse().unwrap();
let inspect: bool = std::env::args().nth(5).unwrap() == "inspect";

let allocator = timely::communication::allocator::Thread;
// let logging_config: timely::logging::LoggerConfig = Default::default();
// let timely_logging = logging_config.timely_logging;
// let now = ::std::time::Instant::now();
let mut worker = timely::dataflow::scopes::Root::new(allocator);

let timer = ::std::time::Instant::now();

// define BFS dataflow; return handles to roots and edges inputs
let mut probe = Handle::new();
let (mut roots, mut graph) = worker.dataflow(|scope| {

let (root_input, roots) = scope.new_collection();
let (edge_input, graph) = scope.new_collection();

let mut result = bfs(&graph, &roots);

if !inspect {
result = result.filter(|_| false);
}

result.map(|(_,l)| l)
.consolidate()
.inspect(|x| println!("\t{:?}", x))
.probe_with(&mut probe);

(root_input, edge_input)
});

let seed: &[_] = &[1, 2, 3, 4];
let mut rng1: StdRng = SeedableRng::from_seed(seed); // rng for edge additions
let mut rng2: StdRng = SeedableRng::from_seed(seed); // rng for edge deletions

roots.insert(0);
roots.close();

println!("performing BFS on {} nodes, {} edges:", nodes, edges);

if worker.index() == 0 {
for _ in 0 .. edges {
graph.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)));
}
}

println!("{:?}\tloaded", timer.elapsed());

graph.advance_to(1);
graph.flush();
worker.step_while(|| probe.less_than(graph.time()));

println!("{:?}\tstable", timer.elapsed());

for round in 0 .. rounds {
for element in 0 .. batch {
if worker.index() == 0 {
graph.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)));
graph.remove((rng2.gen_range(0, nodes), rng2.gen_range(0, nodes)));
}
graph.advance_to(2 + round * batch + element);
}
graph.flush();

let timer2 = ::std::time::Instant::now();
worker.step_while(|| probe.less_than(&graph.time()));

if worker.index() == 0 {
let elapsed = timer2.elapsed();
println!("{:?}\t{:?}:\t{}", timer.elapsed(), round, elapsed.as_secs() * 1000000000 + (elapsed.subsec_nanos() as u64));
}
}
println!("finished; elapsed: {:?}", timer.elapsed());

}

// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn bfs<G: Scope>(edges: &Collection<G, Edge>, roots: &Collection<G, Node>) -> Collection<G, (Node, u32)>
where G::Timestamp: Lattice+Ord {

// initialize roots as reaching themselves at distance 0
let nodes = roots.map(|x| (x, 0));

// repeatedly update minimal distances each node can be reached from each root
nodes.iterate(|inner| {

let edges = edges.enter(&inner.scope());
let nodes = nodes.enter(&inner.scope());

inner.join_map(&edges, |_k,l,d| (*d, l+1))
.concat(&nodes)
.group(|_, s, t| t.push((*s[0].0, 1)))
})
}
18 changes: 9 additions & 9 deletions examples/bijkstra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fn main() {

// define a new computational scope, in which to run BFS
timely::execute_from_args(std::env::args().skip(6), move |worker| {

let timer = ::std::time::Instant::now();

// define BFS dataflow; return handles to roots and edges inputs
Expand Down Expand Up @@ -80,7 +80,7 @@ fn main() {
graph.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)));
graph.remove((rng2.gen_range(0, nodes), rng2.gen_range(0, nodes)));
}
graph.advance_to(3 + round * batch + element);
graph.advance_to(3 + round * batch + element);
}
graph.flush();

Expand All @@ -100,15 +100,15 @@ fn main() {
fn bidijkstra<G: Scope>(edges: &Collection<G, Edge>, goals: &Collection<G, (Node, Node)>) -> Collection<G, ((Node, Node), u32)>
where G::Timestamp: Lattice+Ord {

edges.scope().scoped(|inner| {
edges.scope().scoped::<u64,_,_>(|inner| {

// Our plan is to start evolving distances from both sources and destinations.
// Our plan is to start evolving distances from both sources and destinations.
// The evolution from a source or destination should continue as long as there
// is a corresponding destination or source that has not yet been reached.

// forward and reverse (node, (root, dist))
let forward = Variable::from(goals.map(|(x,_)| (x,(x,0))).enter(inner));
let reverse = Variable::from(goals.map(|(_,y)| (y,(y,0))).enter(inner));
let forward = Variable::new_from(goals.map(|(x,_)| (x,(x,0))).enter(inner), u64::max_value(), 1);
let reverse = Variable::new_from(goals.map(|(_,y)| (y,(y,0))).enter(inner), u64::max_value(), 1);

let goals = goals.enter(inner);
let edges = edges.enter(inner);
Expand All @@ -118,7 +118,7 @@ where G::Timestamp: Lattice+Ord {
// done(src, dst) := forward(src, med), reverse(dst, med), goal(src, dst).
//
// This is a cyclic join, which should scare us a bunch.
let reached =
let reached =
forward
.join_map(&reverse, |_, &(src,d1), &(dst,d2)| ((src, dst), d1 + d2))
.group(|_key, s, t| t.push((*s[0].0, 1)));
Expand All @@ -133,7 +133,7 @@ where G::Timestamp: Lattice+Ord {

// Let's expand out forward queries that are active.
let forward_active = active.map(|(x,_y)| x).distinct();
let forward_next =
let forward_next =
forward
.map(|(med, (src, dist))| (src, (med, dist)))
.semijoin(&forward_active)
Expand All @@ -148,7 +148,7 @@ where G::Timestamp: Lattice+Ord {

// Let's expand out reverse queries that are active.
let reverse_active = active.map(|(_x,y)| y).distinct();
let reverse_next =
let reverse_next =
reverse
.map(|(med, (rev, dist))| (rev, (med, dist)))
.semijoin(&reverse_active)
Expand Down
4 changes: 2 additions & 2 deletions examples/graspan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type Arrange<G, K, V, R> = Arranged<G, K, V, R, TraceValHandle<K, V, <G as Scope
/// An edge variable provides arranged representations of its contents, even before they are
/// completely defined, in support of recursively defined productions.
pub struct EdgeVariable<'a, G: Scope> where G::Timestamp : Lattice {
variable: Variable<'a, G, Edge, isize>,
variable: Variable<'a, G, Edge, u64, isize>,
current: Collection<Child<'a, G, u64>, Edge, isize>,
forward: Option<Arrange<Child<'a, G, u64>, Node, Node, isize>>,
reverse: Option<Arrange<Child<'a, G, u64>, Node, Node, isize>>,
Expand All @@ -97,7 +97,7 @@ pub struct EdgeVariable<'a, G: Scope> where G::Timestamp : Lattice {
impl<'a, G: Scope> EdgeVariable<'a, G> where G::Timestamp : Lattice {
/// Creates a new variable initialized with `source`.
pub fn from(source: &Collection<Child<'a, G, u64>, Edge>) -> Self {
let variable = Variable::from(source.filter(|_| false));
let variable = Variable::new_from(source.filter(|_| false), u64::max_value(), 1);
EdgeVariable {
variable: variable,
current: source.clone(),
Expand Down
8 changes: 1 addition & 7 deletions examples/scc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,15 @@ use std::mem;
use rand::{Rng, SeedableRng, StdRng};

use timely::dataflow::*;
use timely::dataflow::operators::capture::EventWriter;

use differential_dataflow::input::Input;
use differential_dataflow::Collection;
use differential_dataflow::operators::*;
use differential_dataflow::lattice::Lattice;

use differential_dataflow::trace::Trace;
// use differential_dataflow::operators::join::JoinArranged;
use differential_dataflow::operators::group::GroupArranged;
use differential_dataflow::operators::arrange::{ArrangeByKey, ArrangeBySelf};
// use differential_dataflow::hashable::UnsignedWrapper;

// use differential_dataflow::trace::implementations::ord::OrdValSpine;// as HashSpine;
use differential_dataflow::trace::implementations::ord::OrdKeySpine;// as OrdKeyHashSpine;
use differential_dataflow::trace::implementations::ord::OrdKeySpine;


type Node = u32;
Expand Down
1 change: 1 addition & 0 deletions mdbook/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
book
39 changes: 25 additions & 14 deletions src/operators/iterate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl<G: Scope, D: Ord+Data+Debug, R: Diff> Iterate<G, D, R> for Collection<G, D,
// wrapped by `variable`, but it also results in substantially more
// diffs produced; `result` is post-consolidation, and means fewer
// records are yielded out of the loop.
let variable = Variable::from_args(u64::max_value(), 1, self.enter(subgraph));
let variable = Variable::new_from(self.enter(subgraph), u64::max_value(), 1);
let result = logic(&variable);
variable.set(&result);
result.leave()
Expand Down Expand Up @@ -129,24 +129,24 @@ impl<G: Scope, D: Ord+Data+Debug, R: Diff> Iterate<G, D, R> for Collection<G, D,
/// })
/// }
/// ```
pub struct Variable<'a, G: Scope, D: Data, R: Diff, T: Timestamp+Lattice>
pub struct Variable<'a, G: Scope, D: Data, T: Timestamp+Lattice, R: Diff>
where G::Timestamp: Lattice {
collection: Collection<Child<'a, G, T>, D, R>,
feedback: Handle<G::Timestamp, T,(D, Product<G::Timestamp, T>, R)>,
source: Collection<Child<'a, G, T>, D, R>,
step: <T as Timestamp>::Summary,
}

impl<'a, G: Scope, D: Data, R: Diff, T: Timestamp+Lattice> Variable<'a, G, D, R, T> where G::Timestamp: Lattice {
impl<'a, G: Scope, D: Data, R: Diff, T: Timestamp+Lattice> Variable<'a, G, D, T, R> where G::Timestamp: Lattice {
/// Creates a new initially empty `Variable`.
pub fn new(max_steps: T, step: <T as Timestamp>::Summary, scope: &mut Child<'a, G, T>) -> Self {
pub fn new(scope: &mut Child<'a, G, T>, max_steps: T, step: <T as Timestamp>::Summary) -> Self {
use collection::AsCollection;
let empty = ::timely::dataflow::operators::generic::operator::empty(scope).as_collection();
Self::from_args(max_steps, step, empty)
Self::new_from(empty, max_steps, step)
}

/// Creates a new `Variable` from a supplied `source` stream.
pub fn from_args(max_steps: T, step: <T as Timestamp>::Summary, source: Collection<Child<'a, G, T>, D, R>) -> Self {
pub fn new_from(source: Collection<Child<'a, G, T>, D, R>, max_steps: T, step: <T as Timestamp>::Summary) -> Self {
let (feedback, updates) = source.inner.scope().loop_variable(max_steps, step.clone());
let collection = Collection::new(updates).concat(&source);
Variable { collection: collection, feedback: feedback, source: source, step: step }
Expand All @@ -166,17 +166,28 @@ impl<'a, G: Scope, D: Data, R: Diff, T: Timestamp+Lattice> Variable<'a, G, D, R,
}
}

impl<'a, G: Scope, D: Data, R: Diff, T: Timestamp+Lattice> Deref for Variable<'a, G, D, R, T> where G::Timestamp: Lattice {
impl<'a, G: Scope, D: Data, R: Diff, T: Timestamp+Lattice> Deref for Variable<'a, G, D, T, R> where G::Timestamp: Lattice {
type Target = Collection<Child<'a, G, T>, D, R>;
fn deref(&self) -> &Self::Target {
&self.collection
}
}

impl<'a, G: Scope, D: Data, R: Diff> Variable<'a, G, D, R, u64> where G::Timestamp: Lattice {
/// Compatibility implementation of `from_args(u64::max_value(), 1, source)`.
#[deprecated]
pub fn from(source: Collection<Child<'a, G, u64>, D, R>) -> Self {
Self::from_args(u64::max_value(), 1, source)
}
}
// impl<'a, G: Scope, D: Data, R: Diff> Variable<'a, G, D, R, u64> where G::Timestamp: Lattice {
// /// Allocates a new variable from a source collection.
// pub fn from(source: Collection<Child<'a, G, u64>, D, R>) -> Self {
// Self::new_from(u64::max_value(), 1, source)
// }
// }
// impl<'a, G: Scope, D: Data, R: Diff> Variable<'a, G, D, R, u32> where G::Timestamp: Lattice {
// /// Allocates a new variable from a source collection.
// pub fn from(source: Collection<Child<'a, G, u32>, D, R>) -> Self {
// Self::new_from(u32::max_value(), 1, source)
// }
// }
// impl<'a, G: Scope, D: Data, R: Diff> Variable<'a, G, D, R, usize> where G::Timestamp: Lattice {
// /// Allocates a new variable from a source collection.
// pub fn from(source: Collection<Child<'a, G, usize>, D, R>) -> Self {
// Self::new_from(usize::max_value(), 1, source)
// }
// }
8 changes: 3 additions & 5 deletions tests/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ use timely::dataflow::operators::capture::Extract;
use timely::progress::timestamp::RootTimestamp;
use timely::progress::nested::product::Product;
use differential_dataflow::collection::AsCollection;
use differential_dataflow::operators::arrange::{ArrangeByKey, Arrange};
use differential_dataflow::operators::group::{Group, GroupArranged};
use differential_dataflow::trace::implementations::ord::OrdValSpine;
use differential_dataflow::trace::{Trace, TraceReader};
// use differential_dataflow::hashable::{OrdWrapper, UnsignedWrapper};
use differential_dataflow::operators::arrange::ArrangeByKey;
use differential_dataflow::operators::group::Group;
use differential_dataflow::trace::TraceReader;
use itertools::Itertools;

type Result = std::sync::mpsc::Receiver<timely::dataflow::operators::capture::Event<timely::progress::nested::product::Product<timely::progress::timestamp::RootTimestamp, usize>, ((u64, i64), timely::progress::nested::product::Product<timely::progress::timestamp::RootTimestamp, usize>, i64)>>;
Expand Down

0 comments on commit 816858b

Please sign in to comment.