Skip to content

Commit

Permalink
chore: Split transforms into streaming and non-streaming. (vectordotd…
Browse files Browse the repository at this point in the history
…ev#4684)

* wip

Signed-off-by: Ana Hobden <[email protected]>

* A couple mystery errors

Signed-off-by: Ana Hobden <[email protected]>

* fmt

Signed-off-by: Ana Hobden <[email protected]>

* No more mystery errors!

Signed-off-by: Ana Hobden <[email protected]>

* It builds yay

Signed-off-by: Ana Hobden <[email protected]>

* Wip

Signed-off-by: Ana Hobden <[email protected]>

* Wrap up

Signed-off-by: Ana Hobden <[email protected]>

* Fixups

Signed-off-by: Ana Hobden <[email protected]>

* Fixups

Signed-off-by: Ana Hobden <[email protected]>

* Fixup Rfc

Signed-off-by: Ana Hobden <[email protected]>

* Reflect https://twitter.com/khuey_/status/1319477390385426433?s=20

Signed-off-by: Ana Hobden <[email protected]>

* checkpoint some fixups

Signed-off-by: Ana Hobden <[email protected]>

* It builds!

Signed-off-by: Ana Hobden <[email protected]>

* Fixups

Signed-off-by: Ana Hobden <[email protected]>

* fixup k8s

Signed-off-by: Ana Hobden <[email protected]>

* Maybe??

Signed-off-by: Ana Hobden <[email protected]>

* Fixup unit tests

Signed-off-by: Ana Hobden <[email protected]>

* fmt

Signed-off-by: Ana Hobden <[email protected]>

* Fixup @lukesteensen's RFC list style :)

Signed-off-by: Ana Hobden <[email protected]>

* fixup

Signed-off-by: Ana Hobden <[email protected]>

* fmt

Signed-off-by: Ana Hobden <[email protected]>
  • Loading branch information
Hoverbear authored Nov 3, 2020
1 parent 497c76d commit 9954ef8
Show file tree
Hide file tree
Showing 107 changed files with 1,460 additions and 1,101 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ uom = { version = "0.29.0", optional = true }
rust_decimal = "1.8.1"
mongodb = { version = "1.1.1", optional = true }
anyhow = { version = "1.0.28" }
dyn-clone = "1.0.3"

# For WASM
vector-wasm = { path = "lib/vector-wasm", optional = true }
Expand Down
60 changes: 31 additions & 29 deletions benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use vector::transforms::{
coercer::CoercerConfig,
json_parser::{JsonParser, JsonParserConfig},
remap::{Remap, RemapConfig},
Transform,
FunctionTransform, Transform,
};
use vector::{
config::{self, log_schema, TransformConfig},
Expand Down Expand Up @@ -442,14 +442,16 @@ fn benchmark_regex(c: &mut Criterion) {
.take(num_lines)
.collect::<Vec<String>>();

(parser, src_lines)
let output = Vec::with_capacity(1);
(parser, src_lines, output)
},
|(mut parser, src_lines)| {
let out_lines = src_lines.iter()
.filter_map(|line| parser.transform(Event::from(&line[..])))
.fold(0, |accum, _| accum + 1);

assert_eq!(out_lines, num_lines);
|(mut parser, src_lines, mut output)| {
src_lines
.into_iter()
.for_each(|line| {
parser.as_function().transform(&mut output, Event::from(&line[..]))
});
assert_eq!(output.len(), num_lines);
},
);
})
Expand Down Expand Up @@ -683,21 +685,20 @@ fn bench_elasticsearch_index(c: &mut Criterion) {

fn benchmark_remap(c: &mut Criterion) {
let mut rt = runtime();
let add_fields_runner = |mut tform: Box<dyn Transform>| {
let add_fields_runner = |mut tform: Box<dyn FunctionTransform>| {
let event = {
let mut event = Event::from("augment me");
event.as_mut_log().insert("copy_from", "buz".to_owned());
event
};

move || {
let result = tform.transform(event.clone()).unwrap();
assert_eq!(result.as_log().get("foo").unwrap().to_string_lossy(), "bar");
assert_eq!(result.as_log().get("bar").unwrap().to_string_lossy(), "baz");
assert_eq!(
result.as_log().get("copy").unwrap().to_string_lossy(),
"buz"
);
let mut result = Vec::with_capacity(1);
tform.transform(&mut result, event.clone());
let output_1 = result[0].as_log();
assert_eq!(output_1.get("foo").unwrap().to_string_lossy(), "bar");
assert_eq!(output_1.get("bar").unwrap().to_string_lossy(), "baz");
assert_eq!(output_1.get("copy").unwrap().to_string_lossy(), "buz");
}
};

Expand All @@ -724,7 +725,7 @@ fn benchmark_remap(c: &mut Criterion) {
b.iter(add_fields_runner(Box::new(tform)))
});

let json_parser_runner = |mut tform: Box<dyn Transform>| {
let json_parser_runner = |mut tform: Box<dyn FunctionTransform>| {
let event = {
let mut event = Event::from("parse me");
event
Expand All @@ -734,13 +735,15 @@ fn benchmark_remap(c: &mut Criterion) {
};

move || {
let result = tform.transform(event.clone()).unwrap();
let mut result = Vec::with_capacity(1);
tform.transform(&mut result, event.clone());
let output_1 = result[0].as_log();
assert_eq!(
result.as_log().get("foo").unwrap().to_string_lossy(),
output_1.get("foo").unwrap().to_string_lossy(),
r#"{"key": "value"}"#
);
assert_eq!(
result.as_log().get("bar").unwrap().to_string_lossy(),
output_1.get("bar").unwrap().to_string_lossy(),
r#"{"key":"value"}"#
);
}
Expand All @@ -767,7 +770,7 @@ fn benchmark_remap(c: &mut Criterion) {
b.iter(json_parser_runner(Box::new(tform)))
});

let coerce_runner = |mut tform: Box<dyn Transform>| {
let coerce_runner = |mut tform: Transform| {
let mut event = Event::from("coerce me");
for &(key, value) in &[
("number", "1234"),
Expand All @@ -783,14 +786,13 @@ fn benchmark_remap(c: &mut Criterion) {
.with_timezone(&Utc);

move || {
let result = tform.transform(event.clone()).unwrap();
assert_eq!(
result.as_log().get("number").unwrap(),
&Value::Integer(1234)
);
assert_eq!(result.as_log().get("bool").unwrap(), &Value::Boolean(true));
let mut result = Vec::with_capacity(1);
tform.as_function().transform(&mut result, event.clone());
let output_1 = result[0].as_log();
assert_eq!(output_1.get("number").unwrap(), &Value::Integer(1234));
assert_eq!(output_1.get("bool").unwrap(), &Value::Boolean(true));
assert_eq!(
result.as_log().get("timestamp").unwrap(),
output_1.get("timestamp").unwrap(),
&Value::Timestamp(timestamp),
);
}
Expand All @@ -807,7 +809,7 @@ fn benchmark_remap(c: &mut Criterion) {
})
.unwrap();

b.iter(coerce_runner(Box::new(tform)))
b.iter(coerce_runner(Transform::function(tform)))
});

c.bench_function("remap: coerce with coercer", |b| {
Expand Down
6 changes: 4 additions & 2 deletions benches/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use vector::{
event::{Event, LogEvent},
transforms::{
json_parser::{JsonParser, JsonParserConfig},
Transform,
FunctionTransform,
},
};

Expand Down Expand Up @@ -90,7 +90,9 @@ fn create_event(json: Value) -> LogEvent {
event.as_mut_log().insert(log_schema().message_key(), s);

let mut parser = JsonParser::from(JsonParserConfig::default());
parser.transform(event).unwrap().into_log()
let mut output = Vec::with_capacity(1);
parser.transform(&mut output, event);
output.into_iter().next().unwrap().into_log()
}

criterion_group!(event, benchmark_event);
76 changes: 47 additions & 29 deletions benches/lua.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use bytes::Bytes;
use criterion::{criterion_group, Benchmark, Criterion};
use futures::{compat::Stream01CompatExt, StreamExt};
use indexmap::IndexMap;
use transforms::lua::v2::LuaConfig;
use vector::{
config::TransformConfig,
test_util::runtime,
transforms::{self, Transform},
transforms::{
self, util::runtime_transform::RuntimeTransform, FunctionTransform, TaskTransform,
},
Event,
};

Expand All @@ -31,8 +34,9 @@ fn add_fields(c: &mut Criterion) {
|mut transform| {
for _ in 0..num_events {
let event = Event::new_empty_log();
let event = transform.transform(event).unwrap();
assert_eq!(event.as_log()[key], value_bytes_native);
let mut output = vec![];
transform.transform(&mut output, event);
assert_eq!(output[0].as_log()[key], value_bytes_native);
}
},
)
Expand All @@ -41,13 +45,14 @@ fn add_fields(c: &mut Criterion) {
b.iter_with_setup(
|| {
let source = format!("event['{}'] = '{}'", key, value);
transforms::lua::v1::Lua::new(&source, vec![]).unwrap()
transforms::lua::v1::Lua::new(source, vec![]).unwrap()
},
|mut transform| {
for _ in 0..num_events {
let event = Event::new_empty_log();
let event = transform.transform(event).unwrap();
assert_eq!(event.as_log()[key], value_bytes_v1);
let mut output = Vec::with_capacity(1);
transform.transform(&mut output, event);
assert_eq!(output[0].as_log()[key], value_bytes_v1);
}
},
)
Expand All @@ -71,8 +76,9 @@ fn add_fields(c: &mut Criterion) {
|mut transform| {
for _ in 0..num_events {
let event = Event::new_empty_log();
let event = transform.transform(event).unwrap();
assert_eq!(event.as_log()[key], value_bytes_v2);
let mut output = Vec::with_capacity(1);
transform.transform(&mut output, event);
assert_eq!(output[0].as_log()[key], value_bytes_v2);
}
},
)
Expand Down Expand Up @@ -100,15 +106,20 @@ fn field_filter(c: &mut Criterion) {
.unwrap()
})
},
|mut transform| {
let num = (0..num_events)
.map(|i| {
let mut event = Event::new_empty_log();
event.as_mut_log().insert("the_field", (i % 10).to_string());
event
})
.filter_map(|r| transform.transform(r))
.count();
|transform| {
let inputs = (0..num_events).map(|i| {
let mut event = Event::new_empty_log();
event.as_mut_log().insert("the_field", (i % 10).to_string());
event
});
let in_stream = futures01::stream::iter_ok(inputs);
let out_stream = transform
.into_task()
.transform(Box::new(in_stream))
.compat()
.collect::<Vec<_>>();
let blocked = futures::executor::block_on(out_stream);
let num = blocked.len();
assert_eq!(num, num_events / 10);
},
)
Expand All @@ -121,7 +132,7 @@ fn field_filter(c: &mut Criterion) {
event = nil
end
"#;
transforms::lua::v1::Lua::new(&source, vec![]).unwrap()
transforms::lua::v1::Lua::new(source.to_string(), vec![]).unwrap()
},
|mut transform| {
let num = (0..num_events)
Expand All @@ -130,8 +141,11 @@ fn field_filter(c: &mut Criterion) {
event.as_mut_log().insert("the_field", (i % 10).to_string());
event
})
.filter_map(|r| transform.transform(r))
.count();
.fold(Vec::new(), |mut acc, r| {
transform.transform(&mut acc, r);
acc
})
.len();
assert_eq!(num, num_events / 10);
},
)
Expand All @@ -148,15 +162,19 @@ fn field_filter(c: &mut Criterion) {
"#;
transforms::lua::v2::Lua::new(&toml::from_str(config).unwrap()).unwrap()
},
|mut transform| {
let num = (0..num_events)
.map(|i| {
let mut event = Event::new_empty_log();
event.as_mut_log().insert("the_field", (i % 10).to_string());
event
})
.filter_map(|r| transform.transform(r))
.count();
|transform| {
let inputs = (0..num_events).map(|i| {
let mut event = Event::new_empty_log();
event.as_mut_log().insert("the_field", (i % 10).to_string());
event
});
let in_stream = futures01::stream::iter_ok(inputs);
let out_stream =
TaskTransform::transform(Box::new(transform), Box::new(in_stream))
.compat()
.collect::<Vec<_>>();
let blocked = futures::executor::block_on(out_stream);
let num = blocked.len();
assert_eq!(num, num_events / 10);
},
)
Expand Down
2 changes: 1 addition & 1 deletion benches/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub fn drop(criterion: &mut Criterion) {
"#,
)
.unwrap(),
)
)r
.unwrap(),
),
),
Expand Down
1 change: 1 addition & 0 deletions lib/remap-lang/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ pest = "2"
pest_derive = "2"
regex = "1"
thiserror = "1"
dyn-clone = "1.0.3"
6 changes: 4 additions & 2 deletions lib/remap-lang/src/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ pub enum Error {
Variable(#[from] variable::Error),
}

pub trait Expression: Send + Sync + std::fmt::Debug {
pub trait Expression: Send + Sync + std::fmt::Debug + dyn_clone::DynClone {
fn execute(&self, state: &mut State, object: &mut dyn Object) -> Result<Option<Value>>;
}

dyn_clone::clone_trait_object!(Expression);

macro_rules! expression_dispatch {
($($expr:tt),+ $(,)?) => (
/// The list of implemented expressions.
Expand All @@ -63,7 +65,7 @@ macro_rules! expression_dispatch {
///
/// Any expression that stores other expressions internally will still
/// have to box this enum, to avoid infinite recursion.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) enum Expr {
$($expr($expr)),+
}
Expand Down
2 changes: 1 addition & 1 deletion lib/remap-lang/src/expression/arithmetic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{Expr, Expression, Object, Result, State, Value};
use crate::Operator;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Arithmetic {
lhs: Box<Expr>,
rhs: Box<Expr>,
Expand Down
4 changes: 2 additions & 2 deletions lib/remap-lang/src/expression/assignment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ pub enum Error {
PathInsertion(String),
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) enum Target {
Path(Vec<Vec<String>>),
Variable(String),
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) struct Assignment {
target: Target,
value: Box<Expr>,
Expand Down
2 changes: 1 addition & 1 deletion lib/remap-lang/src/expression/block.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{Expr, Expression, Object, Result, State, Value};

#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) struct Block {
expressions: Vec<Expr>,
}
Expand Down
Loading

0 comments on commit 9954ef8

Please sign in to comment.