Skip to content

Commit

Permalink
support fanout combinators in group by
Browse files Browse the repository at this point in the history
  • Loading branch information
drunkirishcoder committed May 10, 2020
1 parent d17746c commit 08c5290
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 15 deletions.
45 changes: 30 additions & 15 deletions core/src/batch/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use super::{Batch, Disposition};
use crate::packets::Packet;
use std::cell::Cell;
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::hash::Hash;
use std::rc::Rc;

Expand Down Expand Up @@ -83,6 +83,7 @@ where
bridge: Bridge<B::Item>,
groups: HashMap<D, Box<dyn Batch<Item = B::Item>>>,
catchall: Box<dyn Batch<Item = B::Item>>,
fanouts: VecDeque<Disposition<B::Item>>,
}

impl<B: Batch, D, S> GroupBy<B, D, S>
Expand Down Expand Up @@ -121,6 +122,7 @@ where
bridge,
groups,
catchall,
fanouts: VecDeque::new(),
}
}
}
Expand All @@ -139,21 +141,34 @@ where

#[inline]
fn next(&mut self) -> Option<Disposition<Self::Item>> {
self.batch.next().map(|disp| {
disp.map(|pkt| {
// get the discriminator key
let key = (self.selector)(&pkt);

// feed this packet through the bridge
self.bridge.set(pkt);

// run the packet through
match self.groups.get_mut(&key) {
Some(group) => group.next().unwrap(),
None => self.catchall.next().unwrap(),
}
if let Some(disp) = self.fanouts.pop_front() {
Some(disp)
} else {
self.batch.next().map(|disp| {
disp.map(|pkt| {
// gets the discriminator key
let key = (self.selector)(&pkt);

// feeds this packet through the bridge
self.bridge.set(pkt);

// runs the packet through. the sub-batch could be a fanout
// that produces multiple packets from one input. they are
// temporarily stored in a queue and returned in the subsequent
// iterations.
let batch = match self.groups.get_mut(&key) {
Some(group) => group,
None => &mut self.catchall,
};

while let Some(next) = batch.next() {
self.fanouts.push_back(next)
}

self.fanouts.pop_front().unwrap()
})
})
})
}
}
}

Expand Down
26 changes: 26 additions & 0 deletions core/src/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,32 @@ mod tests {
}
}

#[capsule::test]
fn group_by_fanout() {
let mut batch = new_batch(&[&IPV4_TCP_PACKET])
.map(|p| p.parse::<Ethernet>()?.parse::<Ipv4>())
.group_by(
|p| p.protocol(),
|groups| {
compose!( groups {
ProtocolNumbers::Tcp => |group| {
group.replace(|_| {
Mbuf::from_bytes(&IPV4_UDP_PACKET)?
.parse::<Ethernet>()?
.parse::<Ipv4>()
})
}
})
},
);

// replace inside group_by will produce a new UDP packet
// and marks the original TCP packet as dropped.
assert!(batch.next().unwrap().is_act());
assert!(batch.next().unwrap().is_drop());
assert!(batch.next().is_none());
}

#[capsule::test]
fn replace_batch() {
let mut batch =
Expand Down

0 comments on commit 08c5290

Please sign in to comment.