Skip to content

Commit

Permalink
chore(lua transform): Don't finish when timers finish (vectordotdev#2809
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ktff authored Jun 13, 2020
1 parent 6159953 commit 9efa014
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 4 deletions.
32 changes: 32 additions & 0 deletions src/transforms/lua/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,10 @@ mod tests {
metric::{Metric, MetricKind, MetricValue},
Event, Value,
},
test_util::runtime,
transforms::Transform,
};
use futures01::{stream, Stream};

fn from_config(config: &str) -> crate::Result<Lua> {
Lua::new(&toml::from_str(config).unwrap())
Expand Down Expand Up @@ -733,4 +735,34 @@ mod tests {

assert_eq!(event, expected);
}

#[test]
fn lua_multiple_events() {
crate::test_util::trace_init();
let transform = from_config(
r#"
hooks.process = """function (event, emit)
event["log"]["hello"] = "goodbye"
emit(event)
end
"""
"#,
)
.unwrap();

let n = 10;

let events = (0..n)
.into_iter()
.map(|i| Event::from(format!("program me {}", i)));

let stream =
Transform::transform_stream(Box::new(transform), Box::new(stream::iter_ok(events)));

let mut rt = runtime();

let results = rt.block_on(stream.collect()).unwrap();

assert_eq!(results.len(), n);
}
}
12 changes: 8 additions & 4 deletions src/transforms/util/runtime_transform.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{event::Event, stream::StreamExt, transforms::Transform};
use futures01::{stream, Future, Stream as FutureStream};
use futures01::{stream, Async, Future, Stream as FutureStream};
use std::time::Duration;
use tokio01::timer::Interval;

Expand Down Expand Up @@ -95,9 +95,13 @@ where
// A stream of `Message::Timer(..)` events generated by timers.
let timer_msgs = make_timer_msgs_stream(timers);

init_msg
.chain(first_event)
.chain(rest_events_and_shutdown_msg.weak_select(timer_msgs))
init_msg.chain(first_event).chain(
// We need to finish when `rest_events_and_shutdown_msg` finishes so
// not to hang on timers, but not finish when `timer_msgs` finishes
// as there may not be any timer.
rest_events_and_shutdown_msg
.weak_select(timer_msgs.chain(stream::poll_fn(|| Ok(Async::NotReady)))),
)
})
.map_err(|_| ())
.into_stream()
Expand Down

0 comments on commit 9efa014

Please sign in to comment.