Skip to content

Commit

Permalink
chore(config): Pass the extra context to sources and transforms too (v…
Browse files Browse the repository at this point in the history
…ectordotdev#19779)

* chore(config): Pass the extra context to sources and transforms too

* Add comments on new members

* Fix compile error in socket source
  • Loading branch information
bruceg authored Feb 2, 2024
1 parent ac80d1e commit a215d59
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 2 deletions.
3 changes: 3 additions & 0 deletions src/config/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ pub struct SinkContext {
pub schema: schema::Options,
pub app_name: String,
pub app_name_slug: String,

/// Extra context data provided by the running app and shared across all components. This can be
/// used to pass shared settings or other data from outside the components.
pub extra_context: ExtraContext,
}

Expand Down
8 changes: 7 additions & 1 deletion src/config/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use vector_lib::{
};

use super::{schema, ComponentKey, ProxyConfig, Resource};
use crate::{shutdown::ShutdownSignal, SourceSender};
use crate::{extra_context::ExtraContext, shutdown::ShutdownSignal, SourceSender};

pub type BoxedSource = Box<dyn SourceConfig>;

Expand Down Expand Up @@ -131,6 +131,10 @@ pub struct SourceContext {
/// Given a source can expose multiple [`SourceOutput`] channels, the ID is tied to the identifier of
/// that `SourceOutput`.
pub schema_definitions: HashMap<Option<String>, schema::Definition>,

/// Extra context data provided by the running app and shared across all components. This can be
/// used to pass shared settings or other data from outside the components.
pub extra_context: ExtraContext,
}

impl SourceContext {
Expand All @@ -151,6 +155,7 @@ impl SourceContext {
acknowledgements: false,
schema_definitions: HashMap::default(),
schema: Default::default(),
extra_context: Default::default(),
},
shutdown,
)
Expand All @@ -170,6 +175,7 @@ impl SourceContext {
acknowledgements: false,
schema_definitions: schema_definitions.unwrap_or_default(),
schema: Default::default(),
extra_context: Default::default(),
}
}

Expand Down
7 changes: 6 additions & 1 deletion src/config/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use vector_lib::{
use super::schema::Options as SchemaOptions;
use super::OutputId;
use super::{id::Inputs, ComponentKey};
use crate::extra_context::ExtraContext;

pub type BoxedTransform = Box<dyn TransformConfig>;

Expand Down Expand Up @@ -96,7 +97,6 @@ where
}
}

#[derive(Debug)]
pub struct TransformContext {
// This is optional because currently there are a lot of places we use `TransformContext` that
// may not have the relevant data available (e.g. tests). In the future it'd be nice to make it
Expand All @@ -121,6 +121,10 @@ pub struct TransformContext {
pub merged_schema_definition: schema::Definition,

pub schema: SchemaOptions,

/// Extra context data provided by the running app and shared across all components. This can be
/// used to pass shared settings or other data from outside the components.
pub extra_context: ExtraContext,
}

impl Default for TransformContext {
Expand All @@ -132,6 +136,7 @@ impl Default for TransformContext {
schema_definitions: HashMap::from([(None, HashMap::new())]),
merged_schema_definition: schema::Definition::any(),
schema: SchemaOptions::default(),
extra_context: Default::default(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/sources/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,7 @@ mod test {
acknowledgements: false,
schema: Default::default(),
schema_definitions: HashMap::default(),
extra_context: Default::default(),
})
.await
.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ impl<'a> Builder<'a> {
acknowledgements: source.sink_acknowledgements,
schema_definitions,
schema: self.config.schema,
extra_context: self.extra_context.clone(),
};
let source = source.inner.build(context).await;
let server = match source {
Expand Down Expand Up @@ -464,6 +465,7 @@ impl<'a> Builder<'a> {
schema_definitions,
merged_schema_definition: merged_definition.clone(),
schema: self.config.schema,
extra_context: self.extra_context.clone(),
};

let node = TransformNode::from_parts(
Expand Down

0 comments on commit a215d59

Please sign in to comment.