Skip to content

Commit

Permalink
chore(dedupe): add two test cases (vectordotdev#21306)
Browse files Browse the repository at this point in the history
  • Loading branch information
pront authored Sep 17, 2024
1 parent 0291b64 commit da1d02d
Showing 1 changed file with 100 additions and 0 deletions.
100 changes: 100 additions & 0 deletions src/transforms/dedupe/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,103 @@ impl TaskTransform<Event> for Dedupe {
Box::pin(task.filter_map(move |v| ready(inner.transform_one(v))))
}
}

#[cfg(test)]
mod tests {
use crate::event::LogEvent;
use crate::test_util::components::assert_transform_compliance;
use crate::transforms::dedupe::common::{default_cache_config, FieldMatchConfig};
use crate::transforms::dedupe::config::DedupeConfig;
use crate::transforms::test::create_topology;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use vector_lib::lookup::lookup_v2::ConfigTargetPath;
use vrl::value::Value;

pub fn assert_eq_values(left: LogEvent, right: LogEvent) {
let inner_left = left.into_parts().0;
let inner_right = right.into_parts().0;
assert_eq!(inner_left, inner_right);
}

#[tokio::test]
async fn default_match() {
let config = DedupeConfig {
cache: default_cache_config(),
fields: None,
};

assert_transform_compliance(async move {
let (tx, rx) = mpsc::channel(1);

let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;

let event1 = LogEvent::from(btreemap! {
"message" => "foo",
"host" => "bar",
"timestamp" => "t1",
});
tx.send(event1.clone().into()).await.unwrap();
let output = out.recv().await.unwrap().into_log();
assert_eq_values(event1.clone(), output);

let event2 = event1.clone();
tx.send(event2.into()).await.unwrap();

let mut event3 = event1.clone();
event3.insert("message", Value::from("another"));
tx.send(event3.clone().into()).await.unwrap();

let output = out.recv().await.unwrap().into_log();
assert_eq_values(event3, output);

drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
})
.await
}

#[tokio::test]
async fn custom_match() {
let config = DedupeConfig {
cache: default_cache_config(),
fields: Some(FieldMatchConfig::MatchFields(vec![
ConfigTargetPath::from("a"),
ConfigTargetPath::from("b"),
])),
};

assert_transform_compliance(async move {
let (tx, rx) = mpsc::channel(1);

let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;

let event1 = LogEvent::from(btreemap! {
"message" => "foo",
"a" => 1,
"b" => 2,
});
tx.send(event1.clone().into()).await.unwrap();
let output = out.recv().await.unwrap().into_log();
assert_eq_values(event1.clone(), output);

let event2 = event1.clone();
tx.send(event2.into()).await.unwrap();

let event3 = LogEvent::from(btreemap! {
"message" => "bar",
"a" => 3,
"b" => 2,
});
tx.send(event3.clone().into()).await.unwrap();
let output = out.recv().await.unwrap().into_log();
assert_eq_values(event3, output);

drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
})
.await
}
}

0 comments on commit da1d02d

Please sign in to comment.