Skip to content

Commit

Permalink
enhancement(topology): Update transforms to handle multiple definitio…
Browse files Browse the repository at this point in the history
…ns (vectordotdev#16793)

* Update transforms to handle multiple definitions

Signed-off-by: Stephen Wakely <[email protected]>

* Remap transform needs to copy metadata kind and relevant meanings to output definition

Signed-off-by: Stephen Wakely <[email protected]>

* Spelling

Signed-off-by: Stephen Wakely <[email protected]>

* Replace Output::default with functions appropriate for the given scenario

Signed-off-by: Stephen Wakely <[email protected]>

* Clippy

Signed-off-by: Stephen Wakely <[email protected]>

* Updated comments

Signed-off-by: Stephen Wakely <[email protected]>

* Further restrain sources to a single definition

Signed-off-by: Stephen Wakely <[email protected]>

* Remove redundant comments

Signed-off-by: Stephen Wakely <[email protected]>

* Remove metrics definition from datadog agent

Signed-off-by: Stephen Wakely <[email protected]>

* Move vrl compiling outside the definition loop

Signed-off-by: Stephen Wakely <[email protected]>

* Dont panic on empty definitions

Signed-off-by: Stephen Wakely <[email protected]>

* WIP transform outputs need to be a map of port to a list of definitions

Signed-off-by: Stephen Wakely <[email protected]>

* Temporarily fix remap errors

Signed-off-by: Stephen Wakely <[email protected]>

* No inputs should have a Definition::any

Signed-off-by: Stephen Wakely <[email protected]>

* Add Output Id to input definition

Signed-off-by: Stephen Wakely <[email protected]>

* Pass reference definitions instead of owned

Signed-off-by: Stephen Wakely <[email protected]>

* Sources should output SourceOutput type instead

Signed-off-by: Stephen Wakely <[email protected]>

* Source definitions returns an owned object

Signed-off-by: Stephen Wakely <[email protected]>

* Fixed feedback

Signed-off-by: Stephen Wakely <[email protected]>

* Clippy

Signed-off-by: Stephen Wakely <[email protected]>

* Update new_ functions to enforce the datatype

Signed-off-by: Stephen Wakely <[email protected]>

* Feedback from Kyle

Signed-off-by: Stephen Wakely <[email protected]>

* Warnings and rust-doc errors

Signed-off-by: Stephen Wakely <[email protected]>

* Responding to feedback

Signed-off-by: Stephen Wakely <[email protected]>

* Vector schema should have any metadata

Signed-off-by: Stephen Wakely <[email protected]>

* Clippy

Signed-off-by: Stephen Wakely <[email protected]>

* Remove into_schema_definition

Signed-off-by: Stephen Wakely <[email protected]>

* Add todo for dummy output id

Signed-off-by: Stephen Wakely <[email protected]>

* Fixed test

Signed-off-by: Stephen Wakely <[email protected]>

* Added text to unreachables

Signed-off-by: Stephen Wakely <[email protected]>

---------

Signed-off-by: Stephen Wakely <[email protected]>
  • Loading branch information
StephenWakely authored Apr 3, 2023
1 parent 00c0316 commit e19f4fc
Show file tree
Hide file tree
Showing 87 changed files with 2,071 additions and 1,564 deletions.
20 changes: 13 additions & 7 deletions benches/remap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use chrono::{DateTime, Utc};
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use indexmap::IndexMap;
use vector::{
config::{DataType, Output},
config::{DataType, TransformOutput},
event::{Event, LogEvent, Value},
transforms::{
remap::{Remap, RemapConfig},
Expand All @@ -27,8 +27,10 @@ fn benchmark_remap(c: &mut Criterion) {
let mut group = c.benchmark_group("remap");

let add_fields_runner = |tform: &mut Box<dyn SyncTransform>, event: Event| {
let mut outputs =
TransformOutputsBuf::new_with_capacity(vec![Output::default(DataType::all())], 1);
let mut outputs = TransformOutputsBuf::new_with_capacity(
vec![TransformOutput::new(DataType::all(), vec![])],
1,
);
tform.transform(event, &mut outputs);
let result = outputs.take_primary();
let output_1 = result.first().unwrap().as_log();
Expand Down Expand Up @@ -77,8 +79,10 @@ fn benchmark_remap(c: &mut Criterion) {
});

let json_parser_runner = |tform: &mut Box<dyn SyncTransform>, event: Event| {
let mut outputs =
TransformOutputsBuf::new_with_capacity(vec![Output::default(DataType::all())], 1);
let mut outputs = TransformOutputsBuf::new_with_capacity(
vec![TransformOutput::new(DataType::all(), vec![])],
1,
);
tform.transform(event, &mut outputs);
let result = outputs.take_primary();
let output_1 = result.first().unwrap().as_log();
Expand Down Expand Up @@ -129,8 +133,10 @@ fn benchmark_remap(c: &mut Criterion) {

let coerce_runner =
|tform: &mut Box<dyn SyncTransform>, event: Event, timestamp: DateTime<Utc>| {
let mut outputs =
TransformOutputsBuf::new_with_capacity(vec![Output::default(DataType::all())], 1);
let mut outputs = TransformOutputsBuf::new_with_capacity(
vec![TransformOutput::new(DataType::all(), vec![])],
1,
);
tform.transform(event, &mut outputs);
let result = outputs.take_primary();
let output_1 = result.first().unwrap().as_log();
Expand Down
6 changes: 3 additions & 3 deletions benches/transform/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use vector::transforms::{
TransformOutputsBuf,
};
use vector_core::{
config::{DataType, Output},
config::{DataType, TransformOutput},
event::{Event, EventContainer, EventMetadata, LogEvent},
transform::{SyncTransform, TransformContext},
};
Expand Down Expand Up @@ -54,10 +54,10 @@ fn route(c: &mut Criterion) {
"bba", "bbca", "dba", "bea", "fba", "gba", "hba", "iba", "jba", "bka", "bal", "bma", "bna",
"boa", "bpa", "bqa", "bra", "bsa", "bta", "bua", "bva", "bwa", "xba", "aby", "zba",
] {
outputs.push(Output {
outputs.push(TransformOutput {
port: Some(String::from(name)),
ty: DataType::Log,
log_schema_definition: None,
log_schema_definitions: Vec::new(),
});
}
let output_buffer: TransformOutputsBuf = TransformOutputsBuf::new_with_capacity(outputs, 10);
Expand Down
220 changes: 201 additions & 19 deletions lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,42 +100,119 @@ impl Input {
}

#[derive(Debug, Clone, PartialEq)]
pub struct Output {
pub struct SourceOutput {
pub port: Option<String>,
pub ty: DataType,

// NOTE: schema definitions are only implemented/supported for log-type events. There is no
// inherent blocker to support other types as well, but it'll require additional work to add
// the relevant schemas, and store them separately in this type.
pub schema_definition: Option<schema::Definition>,
}

impl SourceOutput {
/// Create a `SourceOutput` of the given data type that contains a single output `Definition`.
/// Designed for use in log sources.
///
/// The `None` variant of a schema definition has two distinct meanings for a source component
/// versus a transform component:
///
/// For *sources*, a `None` schema is identical to a `Some(Definition::source_default())`.
/// # Panics
///
/// For a *transform*, a schema [`schema::Definition`] is required if `Datatype` is Log.
pub log_schema_definition: Option<schema::Definition>,
}
/// Panics if `ty` does not contain [`DataType::Log`].
#[must_use]
pub fn new_logs(ty: DataType, schema_definition: schema::Definition) -> Self {
assert!(ty.contains(DataType::Log));

impl Output {
/// Create a default `Output` of the given data type.
///
/// A default output is one without a port identifier (i.e. not a named output) and the default
/// output consumers will receive if they declare the component itself as an input.
pub fn default(ty: DataType) -> Self {
Self {
port: None,
ty,
log_schema_definition: None,
schema_definition: Some(schema_definition),
}
}

/// Create a `SourceOutput` of the given data type that contains no output `Definition`s.
/// Designed for use in metrics sources.
///
/// Sets the datatype to be [`DataType::Metric`].
#[must_use]
pub fn new_metrics() -> Self {
Self {
port: None,
ty: DataType::Metric,
schema_definition: None,
}
}

/// Set the schema definition for this `Output`.
/// Create a `SourceOutput` of the given data type that contains no output `Definition`s.
/// Designed for use in trace sources.
///
/// Sets the datatype to be [`DataType::Trace`].
#[must_use]
pub fn with_schema_definition(mut self, schema_definition: schema::Definition) -> Self {
self.log_schema_definition = Some(schema_definition);
pub fn new_traces() -> Self {
Self {
port: None,
ty: DataType::Trace,
schema_definition: None,
}
}

/// Return the schema [`schema::Definition`] from this output.
///
/// Takes a `schema_enabled` flag to determine if the full definition including the fields
/// and associated types should be returned, or if a simple definition should be returned.
/// A simple definition is just the default for the namespace. For the Vector namespace the
/// meanings are included.
/// Schema enabled is set in the users configuration.
#[must_use]
pub fn schema_definition(&self, schema_enabled: bool) -> Option<schema::Definition> {
self.schema_definition.as_ref().map(|definition| {
if schema_enabled {
definition.clone()
} else {
let mut new_definition =
schema::Definition::default_for_namespace(definition.log_namespaces());

if definition.log_namespaces().contains(&LogNamespace::Vector) {
new_definition.add_meanings(definition.meanings());
}

new_definition
}
})
}
}

impl SourceOutput {
/// Set the port name for this `SourceOutput`.
#[must_use]
pub fn with_port(mut self, name: impl Into<String>) -> Self {
self.port = Some(name.into());
self
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct TransformOutput {
pub port: Option<String>,
pub ty: DataType,

/// For *transforms* if `Datatype` is [`DataType::Log`], if schema is
/// enabled, at least one definition should be output. If the transform
/// has multiple connected sources, it is possible to have multiple output
/// definitions - one for each input.
pub log_schema_definitions: Vec<schema::Definition>,
}

impl TransformOutput {
/// Create a `TransformOutput` of the given data type that contains multiple [`schema::Definition`]s.
/// Designed for use in transforms.
#[must_use]
pub fn new(ty: DataType, schema_definitions: Vec<schema::Definition>) -> Self {
Self {
port: None,
ty,
log_schema_definitions: schema_definitions,
}
}

/// Set the port name for this `Output`.
#[must_use]
Expand Down Expand Up @@ -427,10 +504,12 @@ impl LogNamespace {

#[cfg(test)]
mod test {
use crate::config::{init_log_schema, LogNamespace, LogSchema};
use super::*;
use crate::event::LogEvent;
use chrono::Utc;
use lookup::event_path;
use lookup::{event_path, owned_value_path, OwnedTargetPath};
use value::Kind;
use vector_common::btreemap;

#[test]
fn test_insert_standard_vector_source_metadata() {
Expand All @@ -446,4 +525,107 @@ mod test {

assert!(event.get(event_path!("a", "b", "c", "d")).is_some());
}

#[test]
fn test_source_definitions_legacy() {
let definition = schema::Definition::empty_legacy_namespace()
.with_event_field(&owned_value_path!("zork"), Kind::bytes(), Some("zork"))
.with_event_field(&owned_value_path!("nork"), Kind::integer(), None);
let output = SourceOutput::new_logs(DataType::Log, definition);

let valid_event = LogEvent::from(Value::from(btreemap! {
"zork" => "norknoog",
"nork" => 32
}))
.into();

let invalid_event = LogEvent::from(Value::from(btreemap! {
"nork" => 32
}))
.into();

// Get a definition with schema enabled.
let new_definition = output.schema_definition(true).unwrap();

// Meanings should still exist.
assert_eq!(
Some(&OwnedTargetPath::event(owned_value_path!("zork"))),
new_definition.meaning_path("zork")
);

// Events should have the schema validated.
new_definition.assert_valid_for_event(&valid_event);
new_definition.assert_invalid_for_event(&invalid_event);

// There should be the default legacy definition without schemas enabled.
assert_eq!(
Some(schema::Definition::default_legacy_namespace()),
output.schema_definition(false)
);
}

#[test]
fn test_source_definitons_vector() {
let definition = schema::Definition::default_for_namespace(&[LogNamespace::Vector].into())
.with_metadata_field(
&owned_value_path!("vector", "zork"),
Kind::integer(),
Some("zork"),
)
.with_event_field(&owned_value_path!("nork"), Kind::integer(), None);

let output = SourceOutput::new_logs(DataType::Log, definition);

let mut valid_event = LogEvent::from(Value::from(btreemap! {
"nork" => 32
}));

valid_event
.metadata_mut()
.value_mut()
.insert(path!("vector").concat("zork"), 32);

let valid_event = valid_event.into();

let mut invalid_event = LogEvent::from(Value::from(btreemap! {
"nork" => 32
}));

invalid_event
.metadata_mut()
.value_mut()
.insert(path!("vector").concat("zork"), "noog");

let invalid_event = invalid_event.into();

// Get a definition with schema enabled.
let new_definition = output.schema_definition(true).unwrap();

// Meanings should still exist.
assert_eq!(
Some(&OwnedTargetPath::metadata(owned_value_path!(
"vector", "zork"
))),
new_definition.meaning_path("zork")
);

// Events should have the schema validated.
new_definition.assert_valid_for_event(&valid_event);
new_definition.assert_invalid_for_event(&invalid_event);

// Get a definition without schema enabled.
let new_definition = output.schema_definition(false).unwrap();

// Meanings should still exist.
assert_eq!(
Some(&OwnedTargetPath::metadata(owned_value_path!(
"vector", "zork"
))),
new_definition.meaning_path("zork")
);

// Events should not have the schema validated.
new_definition.assert_valid_for_event(&valid_event);
new_definition.assert_valid_for_event(&invalid_event);
}
}
Loading

0 comments on commit e19f4fc

Please sign in to comment.