Skip to content

Commit

Permalink
skip intermediate callback array (karafka#388)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored Jan 11, 2024
1 parent 26dba3c commit 1a37b23
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- [Enhancement] Alias `topic_name` as `topic` in the delivery report (mensfeld)
- [Enhancement] Provide `label` producer handler and report reference for improved traceability (mensfeld)
- [Enhancement] Include the error when invoking `create_result` on producer handle (mensfeld)
- [Enhancement] Skip intermediate array creation on delivery report callback execution (one per message).
- [Fix] Fix return type on `#rd_kafka_poll` (mensfeld)
- [Fix] `uint8_t` does not exist on Apple Silicon (mensfeld)
- [Fix] Missing ACL `RD_KAFKA_RESOURCE_BROKER` constant reference (mensfeld)
Expand Down
25 changes: 23 additions & 2 deletions lib/rdkafka/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ class Producer
attr_reader :delivery_callback_arity

# @private
# @param native_kafka [NativeKafka]
# @param partitioner_name [String, nil] name of the partitioner we want to use or nil to use
# the "consistent_random" default
def initialize(native_kafka, partitioner_name)
@native_kafka = native_kafka
@partitioner_name = partitioner_name || "consistent_random"
Expand Down Expand Up @@ -258,13 +261,27 @@ def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil,
delivery_handle
end

# Calls (if registered) the delivery callback
#
# @param delivery_report [Producer::DeliveryReport]
# @param delivery_handle [Producer::DeliveryHandle]
def call_delivery_callback(delivery_report, delivery_handle)
return unless @delivery_callback

args = [delivery_report, delivery_handle].take(@delivery_callback_arity)
@delivery_callback.call(*args)
case @delivery_callback_arity
when 0
@delivery_callback.call
when 1
@delivery_callback.call(delivery_report)
else
@delivery_callback.call(delivery_report, delivery_handle)
end
end

# Figures out the arity of a given block/method
#
# @param callback [#call, Proc]
# @return [Integer] arity of the provided block/method
def arity(callback)
return callback.arity if callback.respond_to?(:arity)

Expand All @@ -273,6 +290,10 @@ def arity(callback)

private

# Ensures, no operations can happen on a closed producer
#
# @param method [Symbol] name of the method that invoked producer
# @raise [Rdkafka::ClosedProducerError]
def closed_producer_check(method)
raise Rdkafka::ClosedProducerError.new(method) if closed?
end
Expand Down

0 comments on commit 1a37b23

Please sign in to comment.