Skip to content

Commit

Permalink
perf(file source): send batches of lines (vectordotdev#4719)
Browse files Browse the repository at this point in the history
Signed-off-by: Luke Steensen <[email protected]>
  • Loading branch information
lukesteensen authored Oct 23, 2020
1 parent f683f93 commit 4c015a0
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 45 deletions.
9 changes: 5 additions & 4 deletions lib/file-source/src/file_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ where
self,
mut chans: C,
mut shutdown: impl Future + Unpin,
) -> Result<Shutdown, <C as Sink<(Bytes, String)>>::Error>
) -> Result<Shutdown, <C as Sink<Vec<(Bytes, String)>>>::Error>
where
C: Sink<(Bytes, String)> + Unpin,
<C as Sink<(Bytes, String)>>::Error: std::error::Error,
C: Sink<Vec<(Bytes, String)>> + Unpin,
<C as Sink<Vec<(Bytes, String)>>>::Error: std::error::Error,
{
let mut fingerprint_buffer = Vec::new();

Expand Down Expand Up @@ -284,7 +284,8 @@ where
});

let start = time::Instant::now();
let mut stream = stream::iter(lines.drain(..).map(Ok));
let to_send = std::mem::take(&mut lines);
let mut stream = stream::once(futures::future::ok(to_send));
let result = block_on(chans.send_all(&mut stream));
match result {
Ok(()) => {}
Expand Down
6 changes: 5 additions & 1 deletion src/line_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ impl<K, C> Logic<K, C> {
}
}

impl<T, K, C> LineAgg<T, K, C> {
impl<T, K, C> LineAgg<T, K, C>
where
T: Stream<Item = (K, Bytes, C)> + Unpin,
K: Hash + Eq + Clone,
{
/// Create a new `LineAgg` using the specified `inner` stream and
/// preconfigured `logic`.
pub fn new(inner: T, logic: Logic<K, C>) -> Self {
Expand Down
55 changes: 29 additions & 26 deletions src/sources/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ use file_source::{
FileServer, Fingerprinter,
};
use futures::{
compat::{Compat, Compat01As03, Compat01As03Sink, Future01CompatExt},
compat::{Compat, Future01CompatExt},
future::{FutureExt, TryFutureExt},
stream::StreamExt,
stream::{Stream, StreamExt},
};
use futures01::{future, Future, Sink, Stream};
use futures01::{Future, Sink};
use regex::bytes::Regex;
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
Expand Down Expand Up @@ -225,25 +225,15 @@ pub fn file_source(
let multiline_config = config.multiline.clone();
let message_start_indicator = config.message_start_indicator.clone();
let multi_line_timeout = config.multi_line_timeout;
Box::new(future::lazy(move || {

Box::new(futures01::future::lazy(move || {
info!(message = "Starting file server.", ?include, ?exclude);

// sizing here is just a guess
let (tx, rx) = futures01::sync::mpsc::channel(100);

// This closure is overcomplicated because of the compatibility layer.
let wrap_with_line_agg = |rx, config| {
let rx = StreamExt::filter_map(Compat01As03::new(rx), |val| {
futures::future::ready(val.ok())
});
let logic = line_agg::Logic::new(config);
Box::new(Compat::new(
LineAgg::new(rx.map(|(line, src)| (src, line, ())), logic)
.map(|(src, line, _context)| (line, src))
.map(Ok),
))
};
let messages: Box<dyn Stream<Item = (Bytes, String), Error = ()> + Send> =
let (tx, rx) = futures::channel::mpsc::channel::<Vec<(Bytes, String)>>(2);
let rx = rx.map(futures::stream::iter).flatten();

let messages: Box<dyn Stream<Item = (Bytes, String)> + Send + std::marker::Unpin> =
if let Some(ref multiline_config) = multiline_config {
wrap_with_line_agg(
rx,
Expand All @@ -265,13 +255,15 @@ pub fn file_source(
// logs in the queue.
let span = current_span();
let span2 = span.clone();
let messages01 = Compat::new(StreamExt::map(
messages,
move |(msg, file): (Bytes, String)| {
let _enter = span2.enter();
Ok::<_, ()>(create_event(msg, file, &host_key, &hostname, &file_key))
},
));
tokio::spawn(
messages
.map(move |(msg, file): (Bytes, String)| {
let _enter = span2.enter();
create_event(msg, file, &host_key, &hostname, &file_key)
})
.forward(out.sink_map_err(|e| error!(%e)))
futures01::Stream::forward(messages01, out.sink_map_err(|e| error!(%e)))
.map(|_| ())
.compat()
.instrument(span),
Expand All @@ -280,7 +272,7 @@ pub fn file_source(
let span = info_span!("file_server");
spawn_blocking(move || {
let _enter = span.enter();
let result = file_server.run(Compat01As03Sink::new(tx), shutdown);
let result = file_server.run(tx, shutdown);
// Panic if we encounter any error originating from the file server.
// We're at the `spawn_blocking` call, the panic will be caught and
// passed to the `JoinHandle` error, similar to the usual threads.
Expand All @@ -292,6 +284,17 @@ pub fn file_source(
}))
}

fn wrap_with_line_agg(
rx: impl Stream<Item = (Bytes, String)> + Send + std::marker::Unpin + 'static,
config: line_agg::Config,
) -> Box<dyn Stream<Item = (Bytes, String)> + Send + std::marker::Unpin + 'static> {
let logic = line_agg::Logic::new(config);
Box::new(
LineAgg::new(rx.map(|(line, src)| (src, line, ())), logic)
.map(|(src, line, _context)| (line, src)),
)
}

fn create_event(
line: Bytes,
file: String,
Expand Down
28 changes: 16 additions & 12 deletions src/sources/kubernetes_logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,22 +268,26 @@ impl Source {
};

let (file_source_tx, file_source_rx) =
futures::channel::mpsc::channel::<(Bytes, String)>(100);
futures::channel::mpsc::channel::<Vec<(Bytes, String)>>(2);

let mut parser = parser::build();
let mut partial_events_merger = partial_events_merger::build(auto_partial_merge);

let events = file_source_rx.map(move |(bytes, file)| {
emit!(KubernetesLogsEventReceived {
file: &file,
byte_size: bytes.len(),
});
let mut event = create_event(bytes, &file);
if annotator.annotate(&mut event, &file).is_none() {
emit!(KubernetesLogsEventAnnotationFailed { event: &event });
}
event
});
let events =
file_source_rx
.map(futures::stream::iter)
.flatten()
.map(move |(bytes, file)| {
emit!(KubernetesLogsEventReceived {
file: &file,
byte_size: bytes.len(),
});
let mut event = create_event(bytes, &file);
if annotator.annotate(&mut event, &file).is_none() {
emit!(KubernetesLogsEventAnnotationFailed { event: &event });
}
event
});
let events = events
.filter_map(move |event| futures::future::ready(parser.transform(event)))
.filter_map(move |event| {
Expand Down
4 changes: 2 additions & 2 deletions src/sources/kubernetes_logs/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ pub async fn run_file_server<PP, E, C, S>(
where
PP: PathsProvider + Send + 'static,
E: FileSourceInternalEvents,
C: Sink<(Bytes, String)> + Unpin + Send + 'static,
<C as Sink<(Bytes, String)>>::Error: Error + Send,
C: Sink<Vec<(Bytes, String)>> + Unpin + Send + 'static,
<C as Sink<Vec<(Bytes, String)>>>::Error: Error + Send,
S: Future + Unpin + Send + 'static,
{
let span = info_span!("file_server");
Expand Down

0 comments on commit 4c015a0

Please sign in to comment.