Skip to content

Commit

Permalink
fix(kafka source): Reorder message consume loop to avoid memory growth (
Browse files Browse the repository at this point in the history
  • Loading branch information
bruceg authored May 9, 2024
1 parent 783ed1f commit 8301101
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 20 deletions.
2 changes: 2 additions & 0 deletions changelog.d/kafka-source-ack-leak.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
The kafka source main loop has been biased to handle acknowledgements before new
messages to avoid a memory leak under high load.
47 changes: 27 additions & 20 deletions src/sources/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,9 +571,10 @@ impl ConsumerStateInner<Consuming> {
}
}

/// Spawn a task on the provided JoinSet to consume the kafka StreamPartitionQueue, and handle acknowledgements for the messages consumed
/// Returns a channel sender that can be used to signal that the consumer should stop and drain pending acknowledgements,
/// and an AbortHandle that can be used to forcefully end the task.
/// Spawn a task on the provided JoinSet to consume the kafka StreamPartitionQueue, and handle
/// acknowledgements for the messages consumed Returns a channel sender that can be used to
/// signal that the consumer should stop and drain pending acknowledgements, and an AbortHandle
/// that can be used to forcefully end the task.
fn consume_partition(
&self,
join_set: &mut JoinSet<(TopicPartition, PartitionConsumerStatus)>,
Expand Down Expand Up @@ -603,10 +604,33 @@ impl ConsumerStateInner<Consuming> {

loop {
tokio::select!(
// Make sure to handle the acknowledgement stream before new messages to prevent
// unbounded memory growth caused by those acks being handled slower than
// incoming messages when the load is high.
biased;

// is_some() checks prevent polling end_signal after it completes
_ = &mut end_signal, if finalizer.is_some() => {
finalizer.take();
},

ack = ack_stream.next() => match ack {
Some((status, entry)) => {
if status == BatchStatus::Delivered {
if let Err(error) = consumer.store_offset(&entry.topic, entry.partition, entry.offset) {
emit!(KafkaOffsetUpdateError { error });
}
}
}
None if finalizer.is_none() => {
debug!("Acknowledgement stream complete for partition {}:{}.", &tp.0, tp.1);
break
}
None => {
debug!("Acknowledgement stream empty for {}:{}", &tp.0, tp.1);
}
},

message = messages.next(), if finalizer.is_some() => match message {
None => unreachable!("MessageStream never calls Ready(None)"),
Some(Err(error)) => match error {
Expand All @@ -627,23 +651,6 @@ impl ConsumerStateInner<Consuming> {
parse_message(msg, decoder.clone(), &keys, &mut out, acknowledgements, &finalizer, log_namespace).await;
}
},

ack = ack_stream.next() => match ack {
Some((status, entry)) => {
if status == BatchStatus::Delivered {
if let Err(error) = consumer.store_offset(&entry.topic, entry.partition, entry.offset) {
emit!(KafkaOffsetUpdateError { error });
}
}
}
None if finalizer.is_none() => {
debug!("Acknowledgement stream complete for partition {}:{}.", &tp.0, tp.1);
break
}
None => {
debug!("Acknowledgement stream empty for {}:{}", &tp.0, tp.1);
}
}
)
}
(tp, status)
Expand Down

0 comments on commit 8301101

Please sign in to comment.