Skip to content

Commit

Permalink
Regex type coercion (vectordotdev#542)
Browse files Browse the repository at this point in the history
  • Loading branch information
bruceg authored and LucioFranco committed Jun 27, 2019
1 parent 20b2efc commit cead166
Show file tree
Hide file tree
Showing 5 changed files with 409 additions and 20 deletions.
6 changes: 2 additions & 4 deletions benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,7 @@ fn benchmark_transforms(c: &mut Criterion) {
transforms::regex_parser::RegexParserConfig {
regex: r"status=(?P<status>\d+)".to_string(),
field: None,
drop_field: false,
drop_failed: false,
..Default::default()
},
);
config.add_transform(
Expand Down Expand Up @@ -402,8 +401,7 @@ fn benchmark_complex(c: &mut Criterion) {
transforms::regex_parser::RegexParserConfig {
regex: r"status=(?P<status>\d+)".to_string(),
field: None,
drop_field: false,
drop_failed: false,
..Default::default()
},
);
config.add_transform(
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ pub mod sources;
pub mod test_util;
pub mod topology;
pub mod transforms;
pub mod types;

pub use event::Event;
116 changes: 100 additions & 16 deletions src/transforms/regex_parser.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use super::Transform;
use crate::event::{self, Event};
use crate::event::{self, Event, ValueKind};
use crate::types::Conversion;
use regex::bytes::{CaptureLocations, Regex};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::str;
use string_cache::DefaultAtom as Atom;
use tokio_trace::field;

#[derive(Deserialize, Serialize, Debug, Default)]
#[serde(default, deny_unknown_fields)]
Expand All @@ -12,6 +15,7 @@ pub struct RegexParserConfig {
pub field: Option<Atom>,
pub drop_field: bool,
pub drop_failed: bool,
pub types: HashMap<Atom, String>,
}

#[typetag::serde(name = "regex_parser")]
Expand All @@ -23,6 +27,13 @@ impl crate::topology::config::TransformConfig for RegexParserConfig {
&event::MESSAGE
};

let types = self
.types
.iter()
.map(|(field, typename)| typename.parse::<Conversion>().map(|ct| (field.clone(), ct)))
.collect::<Result<HashMap<Atom, Conversion>, _>>()
.map_err(|err| format!("Invalid conversion type: {}", err))?;

Regex::new(&self.regex)
.map_err(|err| err.to_string())
.map::<Box<dyn Transform>, _>(|r| {
Expand All @@ -31,6 +42,7 @@ impl crate::topology::config::TransformConfig for RegexParserConfig {
field.clone(),
self.drop_field,
self.drop_failed,
types,
))
})
}
Expand All @@ -41,23 +53,57 @@ pub struct RegexParser {
field: Atom,
drop_field: bool,
drop_failed: bool,
capture_names: Vec<(usize, Atom)>,
capture_names: Vec<(usize, Atom, Conversion)>,
capture_locs: CaptureLocations,
}

impl RegexParser {
pub fn new(regex: Regex, field: Atom, mut drop_field: bool, drop_failed: bool) -> Self {
pub fn new(
regex: Regex,
field: Atom,
mut drop_field: bool,
drop_failed: bool,
types: HashMap<Atom, Conversion>,
) -> Self {
// Build a buffer of the regex capture locations to avoid
// repeated allocations.
let capture_locs = regex.capture_locations();
let capture_names: Vec<(usize, Atom)> = regex

// Check if any named type references a nonexistent capture
let capture_names: HashSet<Atom> = regex
.capture_names()
.filter_map(|s| s.map(|s| s.into()))
.collect();
for (name, _) in &types {
if !capture_names.contains(name) {
warn!(
message = "Field was specified in the types but not captured by the pattern.",
field = &name[..]
);
}
}

// Calculate the location (index into the capture locations) of
// each named capture, and the required type coercion.
let capture_names: Vec<(usize, Atom, Conversion)> = regex
.capture_names()
.enumerate()
.filter_map(|(idx, cn)| cn.map(|cn| (idx, cn.into())))
.filter_map(|(idx, cn)| {
cn.map(|cn| {
let cn: Atom = cn.into();
let conv = types.get(&cn).unwrap_or(&Conversion::Bytes);
(idx, cn, conv.clone())
})
})
.collect();
for (_, name) in &capture_names {

// Pre-calculate if the source field name should be dropped.
for (_, name, _) in &capture_names {
if *name == field {
drop_field = false;
}
}

Self {
regex,
field,
Expand All @@ -75,11 +121,19 @@ impl Transform for RegexParser {

if let Some(value) = &value {
if let Some(_) = self.regex.captures_read(&mut self.capture_locs, &value) {
for (idx, name) in &self.capture_names {
for (idx, name, conversion) in &self.capture_names {
if let Some((start, end)) = self.capture_locs.get(*idx) {
event
.as_mut_log()
.insert_explicit(name.clone(), value[start..end].into());
let capture: ValueKind = value[start..end].into();
match conversion.convert(capture) {
Ok(value) => event.as_mut_log().insert_explicit(name.clone(), value),
Err(err) => {
debug!(
message = "Could not convert types.",
name = &name[..],
error = &field::display(err)
);
}
}
}
}
if self.drop_field {
Expand Down Expand Up @@ -107,7 +161,7 @@ impl Transform for RegexParser {
#[cfg(test)]
mod tests {
use super::RegexParserConfig;
use crate::event::LogEvent;
use crate::event::{LogEvent, ValueKind};
use crate::{topology::config::TransformConfig, Event};

fn do_transform(
Expand All @@ -116,13 +170,15 @@ mod tests {
field: Option<&str>,
drop_field: bool,
drop_failed: bool,
types: &[(&str, &str)],
) -> Option<LogEvent> {
let event = Event::from(event);
let mut parser = RegexParserConfig {
regex: regex.into(),
field: field.map(|field| field.into()),
drop_field,
drop_failed,
types: types.iter().map(|&(k, v)| (k.into(), v.into())).collect(),
}
.build()
.unwrap();
Expand All @@ -138,6 +194,7 @@ mod tests {
None,
false,
false,
&[],
)
.unwrap();

Expand All @@ -148,7 +205,15 @@ mod tests {

#[test]
fn regex_parser_doesnt_do_anything_if_no_match() {
let log = do_transform("asdf1234", r"status=(?P<status>\d+)", None, false, false).unwrap();
let log = do_transform(
"asdf1234",
r"status=(?P<status>\d+)",
None,
false,
false,
&[],
)
.unwrap();

assert_eq!(log.get(&"status".into()), None);
assert!(log.get(&"message".into()).is_some());
Expand All @@ -162,6 +227,7 @@ mod tests {
Some("message"),
true,
false,
&[],
)
.unwrap();

Expand All @@ -178,6 +244,7 @@ mod tests {
Some("message"),
true,
false,
&[],
)
.unwrap();

Expand All @@ -193,6 +260,7 @@ mod tests {
Some("message"),
true,
false,
&[],
)
.unwrap();

Expand All @@ -201,25 +269,41 @@ mod tests {

#[test]
fn regex_parser_does_not_drop_event_if_match() {
let log = do_transform("asdf1234", r"asdf", None, false, true);
let log = do_transform("asdf1234", r"asdf", None, false, true, &[]);
assert!(log.is_some());
}

#[test]
fn regex_parser_does_drop_event_if_no_match() {
let log = do_transform("asdf1234", r"something", None, false, true);
let log = do_transform("asdf1234", r"something", None, false, true, &[]);
assert!(log.is_none());
}

#[test]
fn regex_parser_handles_valid_optional_capture() {
let log = do_transform("1234", r"(?P<status>\d+)?", None, false, false).unwrap();
let log = do_transform("1234", r"(?P<status>\d+)?", None, false, false, &[]).unwrap();
assert_eq!(log[&"status".into()], "1234".into());
}

#[test]
fn regex_parser_handles_missing_optional_capture() {
let log = do_transform("none", r"(?P<status>\d+)?", None, false, false).unwrap();
let log = do_transform("none", r"(?P<status>\d+)?", None, false, false, &[]).unwrap();
assert!(log.get(&"status".into()).is_none());
}

#[test]
fn regex_parser_coerces_fields_to_types() {
let log = do_transform(
"1234 6789.01 false",
r"(?P<status>\d+) (?P<time>[\d.]+) (?P<check>\S+)",
None,
false,
false,
&[("status", "int"), ("time", "float"), ("check", "boolean")],
)
.expect("Failed to parse log");
assert_eq!(log[&"check".into()], ValueKind::Boolean(false));
assert_eq!(log[&"status".into()], ValueKind::Integer(1234));
assert_eq!(log[&"time".into()], ValueKind::Float(6789.01));
}
}
Loading

0 comments on commit cead166

Please sign in to comment.