Skip to content

Commit

Permalink
Allow setting consumer explicitly for testing (#204)
Browse files Browse the repository at this point in the history
* Keep mapping of topic->consumer via consumer_for

* Code review fix + changelog entry

* Update minitest as well
  • Loading branch information
dorner authored May 27, 2024
1 parent 523a994 commit b0626a3
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 20 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Karafka Test gem changelog

## UNRELEASED

- [Enhancement] Memoize `consumer_for` so consumers can be set up for multiple topics and `let(:consumer)` is no longer a requirement. (dorner)

## 2.4.3 (2024-05-06)
- [Fix] Fix: raw_key is not being assigned for rspec (CaioPenhalver)
- [Fix] Fix: raw_key is not being assigned for minitest
Expand Down
35 changes: 25 additions & 10 deletions lib/karafka/testing/minitest/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ def included(base)
@_karafka_consumer_messages.clear
@_karafka_producer_client.reset

@_karafka_consumer_mappings = {}

Karafka.producer.stubs(:client).returns(@_karafka_producer_client)
end

Expand Down Expand Up @@ -87,18 +89,23 @@ def _karafka_consumer_for(requested_topic, requested_consumer_group = nil)
# @example Send a json message to consumer and simulate, that it is partition 6
# @karafka.produce({ 'hello' => 'world' }.to_json, 'partition' => 6)
def _karafka_add_message_to_consumer_if_needed(message)
consumer_obj = if defined?(@consumer)
@consumer
else
@_karafka_consumer_mappings&.dig(message[:topic])
end
# Consumer needs to be defined in order to pass messages to it
return unless defined?(@consumer)
return unless defined?(consumer_obj)
# We're interested in adding message to consumer only when it is a Karafka consumer
# Users may want to test other things (models producing messages for example) and in
# their case consumer will not be a consumer
return unless @consumer.is_a?(Karafka::BaseConsumer)
return unless consumer_obj.is_a?(Karafka::BaseConsumer)
# We target to the consumer only messages that were produced to it, since specs may also
# produce other messages targeting other topics
return unless message[:topic] == @consumer.topic.name
return unless message[:topic] == consumer_obj.topic.name

# Build message metadata and copy any metadata that would come from the message
metadata = _karafka_message_metadata_defaults
metadata = _karafka_message_metadata_defaults(consumer_obj)

metadata.keys.each do |key|
message_key = METADATA_DISPATCH_MAPPINGS.fetch(key, key)
Expand All @@ -116,13 +123,13 @@ def _karafka_add_message_to_consumer_if_needed(message)
# Update batch metadata
batch_metadata = Karafka::Messages::Builders::BatchMetadata.call(
@_karafka_consumer_messages,
@consumer.topic,
consumer_obj.topic,
0,
Time.now
)

# Update consumer messages batch
@consumer.messages = Karafka::Messages::Messages.new(
consumer_obj.messages = Karafka::Messages::Messages.new(
@_karafka_consumer_messages,
batch_metadata
)
Expand All @@ -132,9 +139,16 @@ def _karafka_add_message_to_consumer_if_needed(message)
# @param payload [String] payload we want to dispatch
# @param metadata [Hash] any metadata we want to dispatch alongside the payload
def _karafka_produce(payload, metadata = {})
topic = if metadata[:topic]
metadata[:topic]
elsif defined?(@consumer)
@consumer.topic.name
else
@_karafka_consumer_mappings&.keys&.last
end
Karafka.producer.produce_sync(
{
topic: @consumer.topic.name,
topic: topic,
payload: payload
}.merge(metadata)
)
Expand All @@ -148,16 +162,16 @@ def _karafka_produced_messages
private

# @return [Hash] message default options
def _karafka_message_metadata_defaults
def _karafka_message_metadata_defaults(consumer_obj)
{
deserializers: @consumer.topic.deserializers,
deserializers: consumer_obj.topic.deserializers,
timestamp: Time.now,
raw_headers: {},
raw_key: nil,
offset: @_karafka_consumer_messages.size,
partition: 0,
received_at: Time.now,
topic: @consumer.topic.name
topic: consumer_obj.topic.name
}
end

Expand All @@ -179,6 +193,7 @@ def _karafka_build_consumer_for(topic)
@consumer.coordinator.seek_offset = 0
# Indicate usage as for tests no direct enqueuing happens
@consumer.instance_variable_set('@used', true)
@_karafka_consumer_mappings[topic.name] = @consumer
@consumer
end
end
Expand Down
35 changes: 25 additions & 10 deletions lib/karafka/testing/rspec/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def included(base)

_karafka_consumer_messages.clear
_karafka_producer_client.reset
@_karafka_consumer_mappings = {}

if Object.const_defined?('Mocha', false)
Karafka.producer.stubs(:client).returns(_karafka_producer_client)
Expand Down Expand Up @@ -94,18 +95,23 @@ def _karafka_consumer_for(requested_topic, requested_consumer_group = nil)
# karafka.produce({ 'hello' => 'world' }.to_json, 'partition' => 6)
# end
def _karafka_add_message_to_consumer_if_needed(message)
consumer_obj = if defined?(consumer)
consumer
else
@_karafka_consumer_mappings&.dig(message[:topic])
end
# Consumer needs to be defined in order to pass messages to it
return unless defined?(consumer)
return unless consumer_obj
# We're interested in adding message to consumer only when it is a Karafka consumer
# Users may want to test other things (models producing messages for example) and in
# their case consumer will not be a consumer
return unless consumer.is_a?(Karafka::BaseConsumer)
return unless consumer_obj.is_a?(Karafka::BaseConsumer)
# We target to the consumer only messages that were produced to it, since specs may also
# produce other messages targeting other topics
return unless message[:topic] == consumer.topic.name
return unless message[:topic] == consumer_obj.topic.name

# Build message metadata and copy any metadata that would come from the message
metadata = _karafka_message_metadata_defaults
metadata = _karafka_message_metadata_defaults(consumer_obj)

metadata.keys.each do |key|
message_key = METADATA_DISPATCH_MAPPINGS.fetch(key, key)
Expand All @@ -124,13 +130,13 @@ def _karafka_add_message_to_consumer_if_needed(message)
# Update batch metadata
batch_metadata = Karafka::Messages::Builders::BatchMetadata.call(
_karafka_consumer_messages,
consumer.topic,
consumer_obj.topic,
0,
Time.now
)

# Update consumer messages batch
consumer.messages = Karafka::Messages::Messages.new(
consumer_obj.messages = Karafka::Messages::Messages.new(
_karafka_consumer_messages,
batch_metadata
)
Expand All @@ -140,9 +146,16 @@ def _karafka_add_message_to_consumer_if_needed(message)
# @param payload [String] payload we want to dispatch
# @param metadata [Hash] any metadata we want to dispatch alongside the payload
def _karafka_produce(payload, metadata = {})
topic = if metadata[:topic]
metadata[:topic]
elsif defined?(consumer)
consumer.topic.name
else
@_karafka_consumer_mappings&.keys&.last
end
Karafka.producer.produce_sync(
{
topic: consumer.topic.name,
topic: topic,
payload: payload
}.merge(metadata)
)
Expand All @@ -156,16 +169,16 @@ def _karafka_produced_messages
private

# @return [Hash] message default options
def _karafka_message_metadata_defaults
def _karafka_message_metadata_defaults(consumer_obj)
{
deserializers: consumer.topic.deserializers,
deserializers: consumer_obj.topic.deserializers,
timestamp: Time.now,
raw_headers: {},
raw_key: nil,
offset: _karafka_consumer_messages.size,
partition: 0,
received_at: Time.now,
topic: consumer.topic.name
topic: consumer_obj.topic.name
}
end

Expand All @@ -188,6 +201,8 @@ def _karafka_build_consumer_for(topic)
consumer.coordinator.seek_offset = 0
# Indicate usage as for tests no direct enqueuing happens
consumer.instance_variable_set('@used', true)

@_karafka_consumer_mappings[topic.name] = consumer
consumer
end
end
Expand Down

0 comments on commit b0626a3

Please sign in to comment.