Skip to content

Commit

Permalink
improve propagate, allow names
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Aug 23, 2019
1 parent c78b860 commit 718971d
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 42 deletions.
4 changes: 2 additions & 2 deletions examples/monoid-bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ where G::Timestamp: Lattice+Ord {
.join_map(&edges, |_k,&(),d| *d)
.concat(&roots)
.map(|x| (x,()))
.reduce_core::<_,DefaultKeyTrace<_,_,_>>(|_key, input, output, updates| {
.reduce_core::<_,DefaultKeyTrace<_,_,_>>("Reduce", |_key, input, output, updates| {
if output.is_empty() || input[0].1 < output[0].1 {
updates.push(((), input[0].1));
}
Expand All @@ -169,4 +169,4 @@ where G::Timestamp: Lattice+Ord {
variable.set(&result);
result.leave()
})
}
}
91 changes: 73 additions & 18 deletions src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,25 @@ use ::operators::*;
use ::lattice::Lattice;

/// Propagates labels forward, retaining the minimum label.
///
/// This algorithm naively propagates all labels at once, much like standard label propagation.
/// To more carefully control the label propagation, consider `propagate_core` which supports a
/// method to limit the introduction of labels.
pub fn propagate<G, N, L>(edges: &Collection<G, (N,N)>, nodes: &Collection<G,(N,L)>) -> Collection<G,(N,L)>
where
G: Scope,
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
L: ExchangeData,
{
nodes.filter(|_| false)
.iterate(|inner| {
let edges = edges.enter(&inner.scope());
let nodes = nodes.enter(&inner.scope());

inner.join_map(&edges, |_k,l,d| (d.clone(),l.clone()))
.concat(&nodes)
.reduce(|_, s, t| t.push((s[0].0.clone(), 1)))

})
propagate_core(&edges.arrange_by_key(), nodes, |_label| 0)
}

/// Propagates labels forward, retaining the minimum label.
///
/// This algorithm naively propagates all labels at once, much like standard label propagation.
/// To more carefully control the label propagation, consider `propagate_core` which supports a
/// method to limit the introduction of labels.
pub fn propagate_at<G, N, L, F>(edges: &Collection<G, (N,N)>, nodes: &Collection<G,(N,L)>, logic: F) -> Collection<G,(N,L)>
where
G: Scope,
Expand All @@ -37,14 +36,70 @@ where
L: ExchangeData,
F: Fn(&L)->u64+'static,
{
nodes.filter(|_| false)
.iterate(|inner| {
let edges = edges.enter(&inner.scope());
let nodes = nodes.enter_at(&inner.scope(), move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as u64));
propagate_core(&edges.arrange_by_key(), nodes, logic)
}

use trace::TraceReader;
use operators::arrange::arrangement::Arranged;
use operators::arrange::arrangement::ArrangeByKey;

/// Propagates labels forward, retaining the minimum label.
///
/// This variant takes a pre-arranged edge collection, to facilitate re-use, and allows
/// a method `logic` to specify the rounds in which we introduce various labels. The output
/// of `logic should be a number in the interval [0,64],
pub fn propagate_core<G, N, L, Tr, F>(edges: &Arranged<G,Tr>, nodes: &Collection<G,(N,L)>, logic: F) -> Collection<G,(N,L)>
where
G: Scope,
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
L: ExchangeData,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, R=isize>+Clone+'static,
Tr::Batch: crate::trace::BatchReader<N, N, G::Timestamp, Tr::R>+'static,
Tr::Cursor: crate::trace::Cursor<N, N, G::Timestamp, Tr::R>+'static,
F: Fn(&L)->u64+'static,
{
// Morally the code performs the following iterative computation. However, in the interest of a simplified
// dataflow graph and reduced memory footprint we instead have a wordier version below. The core differences
// between the two are that 1. the former filters its input and pretends to perform non-monotonic computation,
// whereas the latter creates an initially empty monotonic iteration variable, and 2. the latter rotates the
// iterative computation so that the arrangement produced by `reduce` can be re-used.

// nodes.filter(|_| false)
// .iterate(|inner| {
// let edges = edges.enter(&inner.scope());
// let nodes = nodes.enter_at(&inner.scope(), move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as u64));
// inner.join_map(&edges, |_k,l,d| (d.clone(),l.clone()))
// .concat(&nodes)
// .reduce(|_, s, t| t.push((s[0].0.clone(), 1)))
// })

nodes.scope().iterative::<usize,_,_>(|scope| {

use crate::operators::reduce::ReduceCore;
use crate::operators::iterate::SemigroupVariable;
use crate::trace::implementations::ord::OrdValSpine as DefaultValTrace;

use timely::order::Product;

let edges = edges.enter(scope);
let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize));

let proposals = SemigroupVariable::new(scope, Product::new(Default::default(), 1usize));

let labels =
proposals
.concat(&nodes)
.reduce_abelian::<_,DefaultValTrace<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), 1)));

let propagate: Collection<_, (N, L)> =
labels
.join_core(&edges, |_k, l: &L, d| Some((d.clone(), l.clone())));

inner.join_map(&edges, |_k,l,d| (d.clone(),l.clone()))
.concat(&nodes)
.reduce(|_, s, t| t.push((s[0].0.clone(), 1)))
proposals.set(&propagate);

})
labels
.as_collection(|k,v| (k.clone(), v.clone()))
.leave()
})
}
54 changes: 32 additions & 22 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ pub trait Reduce<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestamp: L
/// }
/// ```
fn reduce<L, V2: Data, R2: Abelian>(&self, logic: L) -> Collection<G, (K, V2), R2>
where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
self.reduce_named("Reduce", logic)
}

/// As `reduce` with the ability to name the operator.
fn reduce_named<L, V2: Data, R2: Abelian>(&self, name: &str, logic: L) -> Collection<G, (K, V2), R2>
where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static;
}

Expand All @@ -72,11 +78,10 @@ impl<G, K, V, R> Reduce<G, K, V, R> for Collection<G, (K, V), R>
V: ExchangeData,
R: ExchangeData+Semigroup,
{
fn reduce<L, V2: Data, R2: Abelian>(&self, logic: L) -> Collection<G, (K, V2), R2>
fn reduce_named<L, V2: Data, R2: Abelian>(&self, name: &str, logic: L) -> Collection<G, (K, V2), R2>
where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
self.arrange_by_key()
.reduce_abelian::<_,DefaultValTrace<_,_,_,_>>(logic)
.as_collection(|k,v| (k.clone(), v.clone()))
.reduce_named(name, logic)
}
}

Expand All @@ -87,9 +92,9 @@ where
T1::Batch: BatchReader<K, V, G::Timestamp, R>,
T1::Cursor: Cursor<K, V, G::Timestamp, R>,
{
fn reduce<L, V2: Data, R2: Abelian>(&self, logic: L) -> Collection<G, (K, V2), R2>
fn reduce_named<L, V2: Data, R2: Abelian>(&self, name: &str, logic: L) -> Collection<G, (K, V2), R2>
where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
self.reduce_abelian::<_,DefaultValTrace<_,_,_,_>>(logic)
self.reduce_abelian::<_,DefaultValTrace<_,_,_,_>>(name, logic)
.as_collection(|k,v| (k.clone(), v.clone()))
}
}
Expand Down Expand Up @@ -120,7 +125,13 @@ pub trait Threshold<G: Scope, K: Data, R1: Semigroup> where G::Timestamp: Lattic
/// });
/// }
/// ```
fn threshold<R2: Abelian, F: FnMut(&K, &R1)->R2+'static>(&self, thresh: F) -> Collection<G, K, R2>;
fn threshold<R2: Abelian, F: FnMut(&K, &R1)->R2+'static>(&self, thresh: F) -> Collection<G, K, R2> {
self.threshold_named("Threshold", thresh)
}

/// A `threshold` with the ability to name the operator.
fn threshold_named<R2: Abelian, F: FnMut(&K, &R1)->R2+'static>(&self, name: &str, thresh: F) -> Collection<G, K, R2>;

/// Reduces the collection to one occurrence of each distinct element.
///
/// # Examples
Expand All @@ -142,16 +153,15 @@ pub trait Threshold<G: Scope, K: Data, R1: Semigroup> where G::Timestamp: Lattic
/// }
/// ```
fn distinct(&self) -> Collection<G, K, isize> {
self.threshold(|_,c| if c.is_zero() { 0 } else { 1 })
self.threshold_named("Distinct", |_,c| if c.is_zero() { 0 } else { 1 })
}
}

impl<G: Scope, K: ExchangeData+Hashable, R1: ExchangeData+Semigroup> Threshold<G, K, R1> for Collection<G, K, R1>
where G::Timestamp: Lattice+Ord {
fn threshold<R2: Abelian, F: FnMut(&K,&R1)->R2+'static>(&self, mut thresh: F) -> Collection<G, K, R2> {
fn threshold_named<R2: Abelian, F: FnMut(&K,&R1)->R2+'static>(&self, name: &str, thresh: F) -> Collection<G, K, R2> {
self.arrange_by_self()
.reduce_abelian::<_,DefaultKeyTrace<_,_,_>>(move |k,s,t| t.push(((), thresh(k, &s[0].1))))
.as_collection(|k,_| k.clone())
.threshold_named(name, thresh)
}
}

Expand All @@ -162,8 +172,8 @@ where
T1::Batch: BatchReader<K, (), G::Timestamp, R1>,
T1::Cursor: Cursor<K, (), G::Timestamp, R1>,
{
fn threshold<R2: Abelian, F: FnMut(&K,&R1)->R2+'static>(&self, mut thresh: F) -> Collection<G, K, R2> {
self.reduce_abelian::<_,DefaultKeyTrace<_,_,_>>(move |k,s,t| t.push(((), thresh(k, &s[0].1))))
fn threshold_named<R2: Abelian, F: FnMut(&K,&R1)->R2+'static>(&self, name: &str, mut thresh: F) -> Collection<G, K, R2> {
self.reduce_abelian::<_,DefaultKeyTrace<_,_,_>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1))))
.as_collection(|k,_| k.clone())
}
}
Expand Down Expand Up @@ -199,8 +209,7 @@ where
{
fn count(&self) -> Collection<G, (K, R), isize> {
self.arrange_by_self()
.reduce_abelian::<_,DefaultValTrace<_,_,_,_>>(|_k,s,t| t.push((s[0].1.clone(), 1)))
.as_collection(|k,c| (k.clone(), c.clone()))
.count()
}
}

Expand All @@ -212,7 +221,7 @@ where
T1::Cursor: Cursor<K, (), G::Timestamp, R>,
{
fn count(&self) -> Collection<G, (K, R), isize> {
self.reduce_abelian::<_,DefaultValTrace<_,_,_,_>>(|_k,s,t| t.push((s[0].1.clone(), 1)))
self.reduce_abelian::<_,DefaultValTrace<_,_,_,_>>("Count", |_k,s,t| t.push((s[0].1.clone(), 1)))
.as_collection(|k,c| (k.clone(), c.clone()))
}
}
Expand Down Expand Up @@ -243,13 +252,14 @@ pub trait ReduceCore<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestam
/// scope.new_collection_from(1 .. 10u32).1
/// .map(|x| (x, x))
/// .reduce_abelian::<_,OrdValSpine<_,_,_,_>>(
/// "Example",
/// move |_key, src, dst| dst.push((*src[0].0, 1))
/// )
/// .trace;
/// });
/// }
/// ```
fn reduce_abelian<L, T2>(&self, mut logic: L) -> Arranged<G, TraceAgent<T2>>
fn reduce_abelian<L, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
where
T2: Trace+TraceReader<Key=K, Time=G::Timestamp>+'static,
T2::Val: Data,
Expand All @@ -258,7 +268,7 @@ pub trait ReduceCore<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestam
T2::Cursor: Cursor<K, T2::Val, G::Timestamp, T2::R>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>)+'static,
{
self.reduce_core::<_,T2>(move |key, input, output, change| {
self.reduce_core::<_,T2>(name, move |key, input, output, change| {
if !input.is_empty() {
logic(key, input, change);
}
Expand All @@ -272,7 +282,7 @@ pub trait ReduceCore<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestam
/// Unlike `reduce_arranged`, this method may be called with an empty `input`,
/// and it may not be safe to index into the first element.
/// At least one of the two collections will be non-empty.
fn reduce_core<L, T2>(&self, logic: L) -> Arranged<G, TraceAgent<T2>>
fn reduce_core<L, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
where
T2: Trace+TraceReader<Key=K, Time=G::Timestamp>+'static,
T2::Val: Data,
Expand All @@ -291,7 +301,7 @@ where
V: ExchangeData,
R: ExchangeData+Semigroup,
{
fn reduce_core<L, T2>(&self, logic: L) -> Arranged<G, TraceAgent<T2>>
fn reduce_core<L, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
where
T2::Val: Data,
T2::R: Semigroup,
Expand All @@ -301,7 +311,7 @@ where
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static
{
self.arrange_by_key()
.reduce_core(logic)
.reduce_core(name, logic)
}
}

Expand All @@ -312,7 +322,7 @@ where
T1::Batch: BatchReader<K, V, G::Timestamp, R>,
T1::Cursor: Cursor<K, V, G::Timestamp, R>,
{
fn reduce_core<L, T2>(&self, mut logic: L) -> Arranged<G, TraceAgent<T2>>
fn reduce_core<L, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
where
T2: Trace+TraceReader<Key=K, Time=G::Timestamp>+'static,
T2::Val: Data,
Expand All @@ -327,7 +337,7 @@ where
let stream = {

let result_trace = &mut result_trace;
self.stream.unary_frontier(Pipeline, "Reduce", move |_capability, operator_info| {
self.stream.unary_frontier(Pipeline, name, move |_capability, operator_info| {

let logger = {
let scope = self.stream.scope();
Expand Down

0 comments on commit 718971d

Please sign in to comment.