Skip to content

Commit

Permalink
chore(sinks): Remove obsolete Acker (vectordotdev#13457)
Browse files Browse the repository at this point in the history
* chore(sinks): Remove obsolete `Acker`

* Simplify the custom `FuturesUnordered` wrapper for the sink driver

* Handle errors in v1 buffer migration with `ManuallyDrop`

* Fix up handling of finalization in socket sinks
  • Loading branch information
bruceg authored Jul 13, 2022
1 parent 176060c commit 6ed436c
Show file tree
Hide file tree
Showing 142 changed files with 879 additions and 1,831 deletions.
5 changes: 0 additions & 5 deletions benches/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use vector::{
},
test_util::{random_lines, runtime},
};
use vector_buffers::Acker;

fn benchmark_batch(c: &mut Criterion) {
let event_len: usize = 100;
Expand Down Expand Up @@ -42,7 +41,6 @@ fn benchmark_batch(c: &mut Criterion) {
b.iter_batched(
|| {
let rt = runtime();
let (acker, _) = Acker::basic();
let mut batch = BatchSettings::default();
batch.size.bytes = *batch_size;
batch.size.events = num_events;
Expand All @@ -51,7 +49,6 @@ fn benchmark_batch(c: &mut Criterion) {
tower::service_fn(|_| future::ok::<_, Infallible>(())),
PartitionedBuffer::new(batch.size, *compression),
Duration::from_secs(1),
acker,
)
.sink_map_err(|error| panic!("{}", error));

Expand All @@ -76,7 +73,6 @@ fn benchmark_batch(c: &mut Criterion) {
b.iter_batched(
|| {
let rt = runtime();
let (acker, _) = Acker::basic();
let mut batch = BatchSettings::default();
batch.size.bytes = *batch_size;
batch.size.events = num_events;
Expand All @@ -85,7 +81,6 @@ fn benchmark_batch(c: &mut Criterion) {
tower::service_fn(|_| future::ok::<_, Infallible>(())),
Buffer::new(batch.size, *compression),
Duration::from_secs(1),
acker,
)
.sink_map_err(|error| panic!("{}", error));

Expand Down
10 changes: 8 additions & 2 deletions lib/vector-buffers/benches/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use vector_buffers::{
BufferType, EventCount,
};
use vector_common::byte_size_of::ByteSizeOf;
use vector_common::finalization::{AddBatchNotifier, BatchNotifier};
use vector_common::finalization::{AddBatchNotifier, BatchNotifier, EventFinalizers, Finalizable};

#[derive(Clone, Copy, Debug)]
pub struct Message<const N: usize> {
Expand Down Expand Up @@ -49,6 +49,12 @@ impl<const N: usize> EventCount for Message<N> {
}
}

impl<const N: usize> Finalizable for Message<N> {
fn take_finalizers(&mut self) -> EventFinalizers {
Default::default() // This benchmark doesn't need finalization
}
}

#[derive(Debug)]
pub struct EncodeError;

Expand Down Expand Up @@ -123,7 +129,7 @@ pub async fn setup<const N: usize>(
variant
.add_to_builder(&mut builder, data_dir, id)
.expect("should not fail to add variant to builder");
let (tx, rx, _acker) = builder
let (tx, rx) = builder
.build(String::from("benches"), Span::none())
.await
.expect("should not fail to build topology");
Expand Down
26 changes: 17 additions & 9 deletions lib/vector-buffers/examples/buffer_perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,26 @@ use vector_buffers::{
builder::TopologyBuilder,
channel::{BufferReceiver, BufferSender},
},
Acker, BufferType, Bufferable, EventCount, WhenFull,
BufferType, Bufferable, EventCount, WhenFull,
};
use vector_common::byte_size_of::ByteSizeOf;
use vector_common::finalization::{
AddBatchNotifier, BatchNotifier, EventFinalizer, EventFinalizers,
AddBatchNotifier, BatchNotifier, EventFinalizer, EventFinalizers, EventStatus, Finalizable,
};

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct VariableMessage {
id: u64,
payload: Vec<u8>,
finalizer: EventFinalizers,
finalizers: EventFinalizers,
}

impl VariableMessage {
pub fn new(id: u64, payload: Vec<u8>) -> Self {
VariableMessage {
id,
payload,
finalizer: Default::default(),
finalizers: Default::default(),
}
}

Expand All @@ -51,7 +51,7 @@ impl VariableMessage {

impl AddBatchNotifier for VariableMessage {
fn add_batch_notifier(&mut self, batch: BatchNotifier) {
self.finalizer.add(EventFinalizer::new(batch));
self.finalizers.add(EventFinalizer::new(batch));
}
}

Expand All @@ -67,6 +67,12 @@ impl EventCount for VariableMessage {
}
}

impl Finalizable for VariableMessage {
fn take_finalizers(&mut self) -> EventFinalizers {
std::mem::take(&mut self.finalizers)
}
}

impl FixedEncodable for VariableMessage {
type EncodeError = EncodeError;
type DecodeError = DecodeError;
Expand Down Expand Up @@ -236,9 +242,9 @@ fn generate_record_cache(min: usize, max: usize) -> Vec<VariableMessage> {
records
}

async fn generate_buffer<T>(buffer_type: &str) -> (BufferSender<T>, BufferReceiver<T>, Acker)
async fn generate_buffer<T>(buffer_type: &str) -> (BufferSender<T>, BufferReceiver<T>)
where
T: Bufferable + Clone,
T: Bufferable + Clone + Finalizable,
{
let data_dir = PathBuf::from("/tmp/vector");
let id = format!("{}-buffer-perf-testing", buffer_type);
Expand Down Expand Up @@ -332,7 +338,7 @@ async fn main() {
);

let buffer_start = Instant::now();
let (mut writer, mut reader, acker) = generate_buffer(config.buffer_type.as_str()).await;
let (mut writer, mut reader) = generate_buffer(config.buffer_type.as_str()).await;
let buffer_delta = buffer_start.elapsed();

info!(
Expand Down Expand Up @@ -400,7 +406,9 @@ async fn main() {
let read_start = Instant::now();

match reader.next().await {
Some(_) => acker.ack(1),
Some(mut record) => record
.take_finalizers()
.update_status(EventStatus::Delivered),
None => {
info!("[buffer-perf] reader hit end of buffer, closing...");
break;
Expand Down
141 changes: 0 additions & 141 deletions lib/vector-buffers/src/acknowledgements.rs

This file was deleted.

13 changes: 6 additions & 7 deletions lib/vector-buffers/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ use std::{
use serde::{de, ser, Deserialize, Deserializer, Serialize, Serializer};
use snafu::{ResultExt, Snafu};
use tracing::Span;
use vector_common::finalization::Finalizable;

use crate::{
topology::{
builder::{TopologyBuilder, TopologyError},
channel::{BufferReceiver, BufferSender},
},
variants::{DiskV1Buffer, DiskV2Buffer, MemoryBuffer},
Acker, Bufferable, WhenFull,
Bufferable, WhenFull,
};

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -250,7 +251,7 @@ impl BufferType {
id: String,
) -> Result<(), BufferBuildError>
where
T: Bufferable + Clone,
T: Bufferable + Clone + Finalizable,
{
match *self {
BufferType::Memory {
Expand Down Expand Up @@ -322,9 +323,7 @@ impl BufferConfig {
/// Builds the buffer components represented by this configuration.
///
/// The caller gets back a `Sink` and `Stream` implementation that represent a way to push items
/// into the buffer, as well as pop items out of the buffer, respectively. The `Acker` is
/// provided to callers in order to update the buffer when popped items have been processed and
/// can be dropped or deleted, depending on the underlying buffer implementation.
/// into the buffer, as well as pop items out of the buffer, respectively.
///
/// # Errors
///
Expand All @@ -339,9 +338,9 @@ impl BufferConfig {
data_dir: Option<PathBuf>,
buffer_id: String,
span: Span,
) -> Result<(BufferSender<T>, BufferReceiver<T>, Acker), BufferBuildError>
) -> Result<(BufferSender<T>, BufferReceiver<T>), BufferBuildError>
where
T: Bufferable + Clone,
T: Bufferable + Clone + Finalizable,
{
let mut builder = TopologyBuilder::default();

Expand Down
3 changes: 0 additions & 3 deletions lib/vector-buffers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
#[macro_use]
extern crate tracing;

mod acknowledgements;
pub use acknowledgements::{Ackable, Acker};

mod buffer_usage_data;

pub mod config;
Expand Down
9 changes: 9 additions & 0 deletions lib/vector-buffers/src/test/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use once_cell::sync::Lazy;
use temp_dir::TempDir;
use tracing_fluent_assertions::{AssertionRegistry, AssertionsLayer};
use tracing_subscriber::{filter::LevelFilter, layer::SubscriberExt, Layer, Registry};
use vector_common::finalization::{EventStatus, Finalizable};

#[macro_export]
macro_rules! assert_file_does_not_exist_async {
Expand Down Expand Up @@ -91,3 +92,11 @@ pub fn install_tracing_helpers() -> AssertionRegistry {

ASSERTION_REGISTRY.clone()
}

pub(crate) async fn acknowledge(mut event: impl Finalizable) {
event
.take_finalizers()
.update_status(EventStatus::Delivered);
// Finalizers are implicitly dropped here, sending the status update.
tokio::task::yield_now().await;
}
Loading

0 comments on commit 6ed436c

Please sign in to comment.