Skip to content

Commit

Permalink
document Dag, and rework its visibility
Browse files Browse the repository at this point in the history
  • Loading branch information
intarga committed Oct 12, 2023
1 parent a674d7e commit 03077b1
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 20 deletions.
109 changes: 94 additions & 15 deletions src/dag.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,89 @@
use std::collections::{BTreeSet, HashMap};
use std::hash::Hash;

/// Node in a DAG
#[derive(Debug)]
pub struct Node<T> {
pub elem: T,
pub(crate) struct Node<Elem> {
/// Element of the node, in ROVE's case the name of a QC test
pub elem: Elem,
/// QC tests this test depends on
pub children: BTreeSet<NodeId>,
/// QC tests that depend on this test
pub parents: BTreeSet<NodeId>,
}

pub type NodeId = usize;

/// Unique identifier for each node in a DAG
///
/// These are essentially indices of the nodes vector in the DAG
pub(crate) type NodeId = usize;

/// [Directed acyclic graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph)
/// representation
///
/// DAGs are used to define dependencies and pipelines between QC tests in ROVE.
/// Each node in the DAG represents a QC test, and edges between nodes encode
/// dependencies, where the parent node is dependent on the child node.
///
/// The generic parameter `Elem` represents the data held by a node in the graph.
/// For most use cases we expect `&'static str` to work here. Strings
/// containing test names seem a reasonable way to represent QC tests, and these
/// strings can be reasonably expected to be known at compile time, hence
/// `'static`
///
/// The following code sample shows how to construct a DAG:
///
/// ```
/// use rove::Dag;
///
/// let dag = {
/// // create empty dag
/// let mut dag: Dag<&'static str> = Dag::new();
///
/// // add free-standing node
/// let test6 = dag.add_node("test6");
///
/// // add a node with a dependency on the previously defined node
/// let test4 = dag.add_node_with_children("test4", vec![test6]);
/// let test5 = dag.add_node_with_children("test5", vec![test6]);
///
/// let test2 = dag.add_node_with_children("test2", vec![test4]);
/// let test3 = dag.add_node_with_children("test3", vec![test5]);
///
/// let _test1 = dag.add_node_with_children("test1", vec![test2, test3]);
///
/// dag
/// };
///
/// // Resulting dag should look like:
/// //
/// // 6
/// // ^
/// // / \
/// // 4 5
/// // ^ ^
/// // | |
/// // 2 3
/// // ^ ^
/// // \ /
/// // 1
/// ```
#[derive(Debug)]
pub struct Dag<T: Ord + Hash + Clone> {
pub roots: BTreeSet<NodeId>,
pub leaves: BTreeSet<NodeId>,
pub nodes: Vec<Node<T>>,
pub index_lookup: HashMap<T, NodeId>,
pub struct Dag<Elem: Ord + Hash + Clone> {
/// A vector of all nodes in the graph
pub(crate) nodes: Vec<Node<Elem>>,
/// A set of IDs of the nodes that have no parents
pub(crate) roots: BTreeSet<NodeId>,
/// A set of IDs of the nodes that have no children
pub(crate) leaves: BTreeSet<NodeId>,
/// A hashmap of elements (test names in the case of ROVE) to NodeIds
///
/// This is useful for finding a node in the graph that represents a
/// certain test, without having to walk the whole nodes vector
pub(crate) index_lookup: HashMap<Elem, NodeId>,
}

impl<T: Ord + Hash + Clone> Node<T> {
pub fn new(elem: T) -> Self {
impl<Elem: Ord + Hash + Clone> Node<Elem> {
fn new(elem: Elem) -> Self {
Node {
elem,
children: BTreeSet::new(),
Expand All @@ -28,7 +92,8 @@ impl<T: Ord + Hash + Clone> Node<T> {
}
}

impl<T: Ord + Hash + Clone> Dag<T> {
impl<Elem: Ord + Hash + Clone> Dag<Elem> {
/// Create a new (empty) DAG
pub fn new() -> Self {
Dag {
roots: BTreeSet::new(),
Expand All @@ -38,7 +103,8 @@ impl<T: Ord + Hash + Clone> Dag<T> {
}
}

pub fn add_node(&mut self, elem: T) -> NodeId {
/// Add a free-standing node to a DAG
pub fn add_node(&mut self, elem: Elem) -> NodeId {
let index = self.nodes.len();
self.nodes.push(Node::new(elem.clone()));

Expand All @@ -50,6 +116,8 @@ impl<T: Ord + Hash + Clone> Dag<T> {
index
}

/// Add an edge to the DAG. This defines a dependency, where the parent is
/// dependent on the child
pub fn add_edge(&mut self, parent: NodeId, child: NodeId) {
// TODO: we can do better than unwrapping here
self.nodes.get_mut(parent).unwrap().children.insert(child);
Expand All @@ -59,7 +127,8 @@ impl<T: Ord + Hash + Clone> Dag<T> {
self.leaves.remove(&parent);
}

pub fn add_node_with_children(&mut self, elem: T, children: Vec<NodeId>) -> NodeId {
/// Add a node to the DAG, along with edges representing its dependencies (children)
pub fn add_node_with_children(&mut self, elem: Elem, children: Vec<NodeId>) -> NodeId {
let new_node = self.add_node(elem);

for child in children.into_iter() {
Expand All @@ -69,6 +138,7 @@ impl<T: Ord + Hash + Clone> Dag<T> {
new_node
}

/// Removes an edge from the DAG
fn remove_edge(&mut self, parent: NodeId, child: NodeId) {
// TODO: we can do better than unwrapping here
self.nodes.get_mut(parent).unwrap().children.remove(&child);
Expand Down Expand Up @@ -99,6 +169,7 @@ impl<T: Ord + Hash + Clone> Dag<T> {
edge_count
}

/// Counts the number of edges in the DAG
#[cfg(test)]
pub fn count_edges(&self) -> u32 {
let mut edge_count = 0;
Expand Down Expand Up @@ -132,6 +203,10 @@ impl<T: Ord + Hash + Clone> Dag<T> {
}
}

/// Performs a [transitive reduction](https://en.wikipedia.org/wiki/Transitive_reduction)
/// on the DAG
///
/// This essentially removes any redundant dependencies in the graph
pub fn transitive_reduce(&mut self) {
for root in self.roots.clone().iter() {
self.transitive_reduce_iter(*root)
Expand All @@ -156,6 +231,10 @@ impl<T: Ord + Hash + Clone> Dag<T> {
false
}

/// Check for cycles in the DAG
///
/// This can be used to validate a DAG, as a DAG **must not** contain cycles.
/// Returns true if a cycle is detected, false otherwise.
pub fn cycle_check(&self) -> bool {
let mut ancestors: Vec<NodeId> = Vec::new();

Expand All @@ -169,7 +248,7 @@ impl<T: Ord + Hash + Clone> Dag<T> {
}
}

impl<T: Ord + Hash + Clone> Default for Dag<T> {
impl<Elem: Ord + Hash + Clone> Default for Dag<Elem> {
fn default() -> Self {
Self::new()
}
Expand Down
9 changes: 6 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,15 @@
//! }
//! ```
pub mod dag;
#[warn(missing_docs)]
mod dag;
pub mod data_switch;
mod harness;
mod scheduler;
mod server;

pub use dag::Dag;

/// Receiver type for QC runs
///
/// Holds information about test dependencies and data sources
Expand All @@ -103,7 +106,7 @@ pub use scheduler::Scheduler;
///
/// Takes a [socket address](std::net::SocketAddr) to listen on, a
/// [data switch](data_switch::DataSwitch) to provide access to data sources,
/// and a [dag](dagmar::Dag) to encode dependencies between tests
/// and a [dag](dag::Dag) to encode dependencies between tests
pub use server::start_server;

#[doc(hidden)]
Expand Down Expand Up @@ -133,10 +136,10 @@ pub(crate) mod pb {
#[doc(hidden)]
pub mod dev_utils {
use crate::{
dag::Dag,
data_switch::{
self, DataConnector, GeoPoint, SeriesCache, SpatialCache, Timerange, Timestamp,
},
Dag,
};
use async_trait::async_trait;
use chronoutil::RelativeDuration;
Expand Down
3 changes: 1 addition & 2 deletions tests/integration_test.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use core::future::Future;
use pb::{rove_client::RoveClient, Flag, ValidateSeriesRequest, ValidateSpatialRequest};
use rove::{
dag::Dag,
data_switch::{DataConnector, DataSwitch},
dev_utils::{construct_fake_dag, construct_hardcoded_dag, TestDataSource},
start_server_unix_listener,
start_server_unix_listener, Dag,
};
use std::{collections::HashMap, sync::Arc};
use tempfile::NamedTempFile;
Expand Down

0 comments on commit 03077b1

Please sign in to comment.