diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fd57fcb1..6a68913b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -22,7 +22,7 @@ jobs: fail-fast: false matrix: ruby: - - '3.4.0-preview1' + - '3.4.0-preview2' - '3.3' - '3.2' - '3.1' diff --git a/CHANGELOG.md b/CHANGELOG.md index c0f1933e..2212acc1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,100 +1,150 @@ # Rdkafka Changelog -## 0.19.1 (Unreleased) -- [Enhancement] Expose `rd_kafka_global_init` to mitigate macos forking issues. - -## 0.19.0 (2024-10-01) +## 0.18.0 (Unreleased) - **[Breaking]** Drop Ruby 3.0 support -- [Enhancement] Update `librdkafka` to `2.5.3` - [Enhancement] Use default oauth callback if none is passed (bachmanity1) +- [Enhancement] Expose `rd_kafka_global_init` to mitigate macos forking issues. + +## 0.17.6 (2024-09-03) - [Fix] Fix incorrectly behaving CI on failures. +- [Fix] Fix invalid patches librdkafka references. + +## 0.17.5 (2024-09-03) - [Patch] Patch with "Add forward declaration to fix compilation without ssl" fix -## 0.18.0 (2024-09-02) -- [Enhancement] Update `librdkafka` to `2.5.0` +## 0.17.4 (2024-09-02) +- [Enhancement] Bump librdkafka to 2.5.3 - [Enhancement] Do not release GVL on `rd_kafka_name` (ferrous26) -- [Patch] Patch cooperative-sticky assignments in librdkafka. -- [Fix] Mitigate a case where FFI would not restart the background events callback dispatcher in forks - [Fix] Fix unused variable reference in producer (lucasmvnascimento) -## 0.17.0 (2024-08-03) -- [Feature] Add `#seek_by` to be able to seek for a message by topic, partition and offset (zinahia) -- [Enhancement] Update `librdkafka` to `2.4.0` +## 0.17.3 (2024-08-09) +- [Fix] Mitigate a case where FFI would not restart the background events callback dispatcher in forks. + +## 0.17.2 (2024-08-07) +- [Enhancement] Support returning `#details` for errors that do have topic/partition related extra info. + +## 0.17.1 (2024-08-01) - [Enhancement] Support ability to release patches to librdkafka. -- [Change] Remove old producer timeout API warnings. -- [Fix] Switch to local release of librdkafka to mitigate its unavailability. +- [Patch] Patch cooperative-sticky assignments in librdkafka. + +## 0.17.0 (2024-07-21) +- [Enhancement] Bump librdkafka to 2.5.0 ## 0.16.1 (2024-07-10) +- [Feature] Add `#seek_by` to be able to seek for a message by topic, partition and offset (zinahia) +- [Change] Remove old producer timeout API warnings. - [Fix] Switch to local release of librdkafka to mitigate its unavailability. -## 0.16.0 (2024-06-13) -- **[Breaking]** Retire support for Ruby 2.7. +## 0.16.0 (2024-06-17) - **[Breaking]** Messages without headers returned by `#poll` contain frozen empty hash. - **[Breaking]** `HashWithSymbolKeysTreatedLikeStrings` has been removed so headers are regular hashes with string keys. -- **[Feature]** Support incremental config describe + alter API. -- **[Feature]** Oauthbearer token refresh callback (bruce-szalwinski-he) +- [Enhancement] Bump librdkafka to 2.4.0 +- [Enhancement] Save two objects on message produced and lower CPU usage on message produced with small improvements. +- [Fix] Remove support for Ruby 2.7. Supporting it was a bug since rest of the karafka ecosystem no longer supports it. + +## 0.15.2 (2024-07-10) +- [Fix] Switch to local release of librdkafka to mitigate its unavailability. + +## 0.15.1 (2024-05-09) - **[Feature]** Provide ability to use topic config on a producer for custom behaviors per dispatch. - [Enhancement] Use topic config reference cache for messages production to prevent topic objects allocation with each message. - [Enhancement] Provide `Rrdkafka::Admin#describe_errors` to get errors descriptions (mensfeld) + +## 0.15.0 (2024-04-26) +- **[Feature]** Oauthbearer token refresh callback (bruce-szalwinski-he) +- **[Feature]** Support incremental config describe + alter API (mensfeld) +- [Enhancement] name polling Thread as `rdkafka.native_kafka#` (nijikon) - [Enhancement] Replace time poll based wait engine with an event based to improve response times on blocking operations and wait (nijikon + mensfeld) - [Enhancement] Allow for usage of the second regex engine of librdkafka by setting `RDKAFKA_DISABLE_REGEX_EXT` during build (mensfeld) - [Enhancement] name polling Thread as `rdkafka.native_kafka#` (nijikon) -- [Enhancement] Save two objects on message produced and lower CPU usage on message produced with small improvements. - [Change] Allow for native kafka thread operations deferring and manual start for consumer, producer and admin. - [Change] The `wait_timeout` argument in `AbstractHandle.wait` method is deprecated and will be removed in future versions without replacement. We don't rely on it's value anymore (nijikon) -- [Fix] Background logger stops working after forking causing memory leaks (mensfeld) - [Fix] Fix bogus case/when syntax. Levels 1, 2, and 6 previously defaulted to UNKNOWN (jjowdy) -## 0.15.2 (2024-07-10) +## 0.14.11 (2024-07-10) - [Fix] Switch to local release of librdkafka to mitigate its unavailability. -## 0.15.1 (2024-01-30) +## 0.14.10 (2024-02-08) +- [Fix] Background logger stops working after forking causing memory leaks (mensfeld). + +## 0.14.9 (2024-01-29) +- [Fix] Partition cache caches invalid `nil` result for `PARTITIONS_COUNT_TTL`. +- [Enhancement] Report `-1` instead of `nil` in case `partition_count` failure. + +## 0.14.8 (2024-01-24) - [Enhancement] Provide support for Nix OS (alexandriainfantino) +- [Enhancement] Skip intermediate array creation on delivery report callback execution (one per message) (mensfeld) + +## 0.14.7 (2023-12-29) +- [Fix] Recognize that Karafka uses a custom partition object (fixed in 2.3.0) and ensure it is recognized. + +## 0.14.6 (2023-12-29) +- **[Feature]** Support storing metadata alongside offsets via `rd_kafka_offsets_store` in `#store_offset` (mensfeld) +- [Enhancement] Increase the `#committed` default timeout from 1_200ms to 2000ms. This will compensate for network glitches and remote clusters operations and will align with metadata query timeout. + +## 0.14.5 (2023-12-20) +- [Enhancement] Provide `label` producer handler and report reference for improved traceability. + +## 0.14.4 (2023-12-19) +- [Enhancement] Add ability to store offsets in a transaction (mensfeld) + +## 0.14.3 (2023-12-17) - [Enhancement] Replace `rd_kafka_offset_store` with `rd_kafka_offsets_store` (mensfeld) +- [Fix] Missing ACL `RD_KAFKA_RESOURCE_BROKER` constant reference (mensfeld) +- [Change] Rename `matching_acl_pattern_type` to `matching_acl_resource_pattern_type` to align the whole API (mensfeld) + +## 0.14.2 (2023-12-11) - [Enhancement] Alias `topic_name` as `topic` in the delivery report (mensfeld) -- [Enhancement] Provide `label` producer handler and report reference for improved traceability (mensfeld) -- [Enhancement] Include the error when invoking `create_result` on producer handle (mensfeld) -- [Enhancement] Skip intermediate array creation on delivery report callback execution (one per message) (mensfeld). -- [Enhancement] Report `-1` instead of `nil` in case `partition_count` failure (mensfeld). - [Fix] Fix return type on `#rd_kafka_poll` (mensfeld) - [Fix] `uint8_t` does not exist on Apple Silicon (mensfeld) -- [Fix] Missing ACL `RD_KAFKA_RESOURCE_BROKER` constant reference (mensfeld) -- [Fix] Partition cache caches invalid nil result for `PARTITIONS_COUNT_TTL` (mensfeld) -- [Change] Rename `matching_acl_pattern_type` to `matching_acl_resource_pattern_type` to align the whole API (mensfeld) -## 0.15.0 (2023-12-03) +## 0.14.1 (2023-12-02) - **[Feature]** Add `Admin#metadata` (mensfeld) - **[Feature]** Add `Admin#create_partitions` (mensfeld) - **[Feature]** Add `Admin#delete_group` utility (piotaixr) - **[Feature]** Add Create and Delete ACL Feature To Admin Functions (vgnanasekaran) -- **[Feature]** Support `#assignment_lost?` on a consumer to check for involuntary assignment revocation (mensfeld) -- [Enhancement] Expose alternative way of managing consumer events via a separate queue (mensfeld) -- [Enhancement] **Bump** librdkafka to 2.3.0 (mensfeld) -- [Enhancement] Increase the `#lag` and `#query_watermark_offsets` default timeouts from 100ms to 1000ms. This will compensate for network glitches and remote clusters operations (mensfeld) -- [Change] Use `SecureRandom.uuid` instead of `random` for test consumer groups (mensfeld) +- **[Enhancement]** Improve error reporting on `unknown_topic_or_part` and include missing topic (mensfeld) +- **[Enhancement]** Improve error reporting on consumer polling errors (mensfeld) + +## 0.14.0 (2023-11-17) +- [Enhancement] Bump librdkafka to 2.3.0 +- [Enhancement] Increase the `#lag` and `#query_watermark_offsets` default timeouts from 100ms to 1000ms. This will compensate for network glitches and remote clusters operations. -## 0.14.1 (2024-07-10) +## 0.13.10 (2024-07-10) - [Fix] Switch to local release of librdkafka to mitigate its unavailability. -## 0.14.0 (2023-11-21) -- [Enhancement] Add `raise_response_error` flag to the `Rdkafka::AbstractHandle`. -- [Enhancement] Allow for setting `statistics_callback` as nil to reset predefined settings configured by a different gem (mensfeld) +## 0.13.9 (2023-11-07) +- [Enhancement] Expose alternative way of managing consumer events via a separate queue. +- [Enhancement] Allow for setting `statistics_callback` as nil to reset predefined settings configured by a different gem. + +## 0.13.8 (2023-10-31) - [Enhancement] Get consumer position (thijsc & mensfeld) -- [Enhancement] Provide `#purge` to remove any outstanding requests from the producer (mensfeld) -- [Enhancement] Update `librdkafka` to `2.2.0` (mensfeld) -- [Enhancement] Introduce producer partitions count metadata cache (mensfeld) -- [Enhancement] Increase metadata timeout request from `250 ms` to `2000 ms` default to allow for remote cluster operations via `rdkafka-ruby` (mensfeld) -- [Enhancement] Introduce `#name` for producers and consumers (mensfeld) -- [Enhancement] Include backtrace in non-raised binded errors (mensfeld) -- [Fix] Reference to Opaque is not released when Admin, Consumer or Producer is closed (mensfeld) -- [Fix] Trigger `#poll` on native kafka creation to handle oauthbearer cb (mensfeld) -- [Fix] `#flush` does not handle the timeouts errors by making it return `true` if all flushed or `false` if failed. We do **not** raise an exception here to keep it backwards compatible (mensfeld) -- [Change] Remove support for Ruby 2.6 due to it being EOL and WeakMap incompatibilities (mensfeld) -- [Change] Update Kafka Docker with Confluent KRaft (mensfeld) -- [Change] Update librdkafka repo reference from edenhill to confluentinc (mensfeld) - -## 0.13.1 (2024-07-10) -- [Fix] Switch to local release of librdkafka to mitigate its unavailability. + +## 0.13.7 (2023-10-31) +- [Change] Drop support for Ruby 2.6 due to incompatibilities in usage of `ObjectSpace::WeakMap` +- [Fix] Fix dangling Opaque references. + +## 0.13.6 (2023-10-17) +- **[Feature]** Support transactions API in the producer +- [Enhancement] Add `raise_response_error` flag to the `Rdkafka::AbstractHandle`. +- [Enhancement] Provide `#purge` to remove any outstanding requests from the producer. +- [Enhancement] Fix `#flush` does not handle the timeouts errors by making it return true if all flushed or false if failed. We do **not** raise an exception here to keep it backwards compatible. + +## 0.13.5 +- Fix DeliveryReport `create_result#error` being nil despite an error being associated with it + +## 0.13.4 +- Always call initial poll on librdkafka to make sure oauth bearer cb is handled pre-operations. + +## 0.13.3 +- Bump librdkafka to 2.2.0 + +## 0.13.2 +- Ensure operations counter decrement is fully thread-safe +- Bump librdkafka to 2.1.1 + +## 0.13.1 +- Add offsets_for_times method on consumer (timflapper) ## 0.13.0 (2023-07-24) - Support cooperative sticky partition assignment in the rebalance callback (methodmissing) @@ -111,10 +161,26 @@ - Improve specs stability (mensfeld) - Make metadata request timeout configurable (mensfeld) - call_on_partitions_assigned and call_on_partitions_revoked only get a tpl passed in (thijsc) +- Support `#assignment_lost?` on a consumer to check for involuntary assignment revocation (mensfeld) +- Expose `#name` on the consumer and producer (mensfeld) +- Introduce producer partitions count metadata cache (mensfeld) +- Retry metadta fetches on certain errors with a backoff (mensfeld) +- Do not lock access to underlying native kafka client and rely on Karafka granular locking (mensfeld) -## 0.12.1 (2024-07-11) +## 0.12.4 (2024-07-10) - [Fix] Switch to local release of librdkafka to mitigate its unavailability. +## 0.12.3 +- Include backtrace in non-raised binded errors. +- Include topic name in the delivery reports + +## 0.12.2 +- Increase the metadata default timeout from 250ms to 2 seconds. This should allow for working with remote clusters. + +## 0.12.1 +- Bumps librdkafka to 2.0.2 (lmaia) +- Add support for adding more partitions via Admin API + ## 0.12.0 (2022-06-17) - Bumps librdkafka to 1.9.0 - Fix crash on empty partition key (mensfeld) diff --git a/README.md b/README.md index 4616166c..0b07ba7f 100644 --- a/README.md +++ b/README.md @@ -163,10 +163,10 @@ bundle exec rake produce_messages | rdkafka-ruby | librdkafka | patches | |-|-|-| -| 0.19.0 (2024-10-01) | 2.5.3 (2024-09-02) | yes | -| 0.18.0 (2024-09-02) | 2.5.0 (2024-06-10) | yes | -| 0.17.0 (2024-08-03) | 2.4.0 (2024-05-07) | no | -| 0.16.0 (2024-06-13) | 2.3.0 (2023-10-25) | no | +| 0.18.0 (Unreleased) | 2.5.3 (2024-09-02) | yes | +| 0.17.4 (2024-09-02) | 2.5.3 (2024-09-02) | yes | +| 0.17.0 (2024-08-01) | 2.5.0 (2024-07-10) | yes | +| 0.16.0 (2024-06-13) | 2.4.0 (2024-05-07) | no | | 0.15.0 (2023-12-03) | 2.3.0 (2023-10-25) | no | | 0.14.0 (2023-11-21) | 2.2.0 (2023-07-12) | no | | 0.13.0 (2023-07-24) | 2.0.2 (2023-01-20) | no | diff --git a/rdkafka.gemspec b/karafka-rdkafka.gemspec similarity index 81% rename from rdkafka.gemspec rename to karafka-rdkafka.gemspec index 40997fdc..3efd9ed8 100644 --- a/rdkafka.gemspec +++ b/karafka-rdkafka.gemspec @@ -12,7 +12,7 @@ Gem::Specification.new do |gem| gem.files = `git ls-files`.split($\) gem.executables = gem.files.grep(%r{^bin/}).map{ |f| File.basename(f) } gem.test_files = gem.files.grep(%r{^(test|spec|features)/}) - gem.name = 'rdkafka' + gem.name = 'karafka-rdkafka' gem.require_paths = ['lib'] gem.version = Rdkafka::VERSION gem.required_ruby_version = '>= 3.1' @@ -37,10 +37,10 @@ Gem::Specification.new do |gem| gem.metadata = { 'funding_uri' => 'https://karafka.io/#become-pro', 'homepage_uri' => 'https://karafka.io', - 'changelog_uri' => 'https://github.com/karafka/rdkafka-ruby/blob/main/CHANGELOG.md', - 'bug_tracker_uri' => 'https://github.com/karafka/rdkafka-ruby/issues', - 'source_code_uri' => 'https://github.com/karafka/rdkafka-ruby', - 'documentation_uri' => 'https://github.com/karafka/rdkafka-ruby/blob/main/README.md', + 'changelog_uri' => 'https://github.com/karafka/karafka-rdkafka/blob/master/CHANGELOG.md', + 'bug_tracker_uri' => 'https://github.com/karafka/karafka-rdkafka/issues', + 'source_code_uri' => 'https://github.com/karafka/karafka-rdkafka', + 'documentation_uri' => 'https://karafka.io/docs', 'rubygems_mfa_required' => 'true' } end diff --git a/lib/rdkafka/abstract_handle.rb b/lib/rdkafka/abstract_handle.rb index a7a01668..d834886b 100644 --- a/lib/rdkafka/abstract_handle.rb +++ b/lib/rdkafka/abstract_handle.rb @@ -106,7 +106,7 @@ def create_result # Allow subclasses to override def raise_error - raise RdkafkaError.new(self[:response]) + RdkafkaError.validate!(self[:response]) end # Error that is raised when waiting for the handle to complete diff --git a/lib/rdkafka/admin.rb b/lib/rdkafka/admin.rb index bd18a4ae..f278f24a 100644 --- a/lib/rdkafka/admin.rb +++ b/lib/rdkafka/admin.rb @@ -664,7 +664,6 @@ def describe_acl(resource_type:, resource_name:, resource_pattern_type:, princip describe_acl_handle end - # Describe configs # # @param resources [Array] Array where elements are hashes with two keys: diff --git a/lib/rdkafka/admin/config_binding_result.rb b/lib/rdkafka/admin/config_binding_result.rb index 4080c9e5..ad22311e 100644 --- a/lib/rdkafka/admin/config_binding_result.rb +++ b/lib/rdkafka/admin/config_binding_result.rb @@ -22,7 +22,7 @@ def initialize(config_ptr) synonyms_ptr = synonym_ptr.read_array_of_pointer(pointer_to_size_t.read_int) (1..pointer_to_size_t.read_int).map do |ar| - self.class.new synonyms_ptr[ar - 1] + @synonyms << self.class.new(synonyms_ptr[ar - 1]) end end end diff --git a/lib/rdkafka/admin/create_partitions_handle.rb b/lib/rdkafka/admin/create_partitions_handle.rb index c38c632b..b9740c72 100644 --- a/lib/rdkafka/admin/create_partitions_handle.rb +++ b/lib/rdkafka/admin/create_partitions_handle.rb @@ -17,9 +17,12 @@ def create_result end def raise_error - raise RdkafkaError.new( - self[:response], - broker_message: CreateTopicReport.new(self[:error_string], self[:result_name]).error_string + RdkafkaError.validate!( + self[:response], + broker_message: CreatePartitionsReport.new( + self[:error_string], + self[:result_name] + ).error_string ) end end diff --git a/lib/rdkafka/admin/create_topic_handle.rb b/lib/rdkafka/admin/create_topic_handle.rb index 460c6c00..700798db 100644 --- a/lib/rdkafka/admin/create_topic_handle.rb +++ b/lib/rdkafka/admin/create_topic_handle.rb @@ -19,9 +19,12 @@ def create_result end def raise_error - raise RdkafkaError.new( - self[:response], - broker_message: CreateTopicReport.new(self[:error_string], self[:result_name]).error_string + RdkafkaError.validate!( + self[:response], + broker_message: CreateTopicReport.new( + self[:error_string], + self[:result_name] + ).error_string ) end end diff --git a/lib/rdkafka/admin/delete_topic_handle.rb b/lib/rdkafka/admin/delete_topic_handle.rb index f0d1f1ec..3917641b 100644 --- a/lib/rdkafka/admin/delete_topic_handle.rb +++ b/lib/rdkafka/admin/delete_topic_handle.rb @@ -19,9 +19,12 @@ def create_result end def raise_error - raise RdkafkaError.new( - self[:response], - broker_message: DeleteTopicReport.new(self[:error_string], self[:result_name]).error_string + RdkafkaError.validate!( + self[:response], + broker_message: DeleteTopicReport.new( + self[:error_string], + self[:result_name] + ).error_string ) end end diff --git a/lib/rdkafka/admin/describe_configs_report.rb b/lib/rdkafka/admin/describe_configs_report.rb index 77eeb559..ddbfc599 100644 --- a/lib/rdkafka/admin/describe_configs_report.rb +++ b/lib/rdkafka/admin/describe_configs_report.rb @@ -38,15 +38,9 @@ def initialize(config_entries:, entry_count:) private def validate!(config_resource_result_ptr) - code = Bindings.rd_kafka_ConfigResource_error(config_resource_result_ptr) - - return if code.zero? - - raise( - RdkafkaError.new( - code, - Bindings.rd_kafka_ConfigResource_error_string(config_resource_result_ptr) - ) + RdkafkaError.validate!( + Bindings.rd_kafka_ConfigResource_error(config_resource_result_ptr), + Bindings.rd_kafka_ConfigResource_error_string(config_resource_result_ptr) ) end end diff --git a/lib/rdkafka/admin/incremental_alter_configs_report.rb b/lib/rdkafka/admin/incremental_alter_configs_report.rb index 2b25837b..08a62986 100644 --- a/lib/rdkafka/admin/incremental_alter_configs_report.rb +++ b/lib/rdkafka/admin/incremental_alter_configs_report.rb @@ -38,15 +38,9 @@ def initialize(config_entries:, entry_count:) private def validate!(config_resource_result_ptr) - code = Bindings.rd_kafka_ConfigResource_error(config_resource_result_ptr) - - return if code.zero? - - raise( - RdkafkaError.new( - code, - Bindings.rd_kafka_ConfigResource_error_string(config_resource_result_ptr) - ) + RdkafkaError.validate!( + Bindings.rd_kafka_ConfigResource_error(config_resource_result_ptr), + Bindings.rd_kafka_ConfigResource_error_string(config_resource_result_ptr) ) end end diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index 55481e6a..1df76ae3 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -2,6 +2,15 @@ module Rdkafka # @private + # + # @note + # There are two types of responses related to errors: + # - rd_kafka_error_t - a C object that we need to remap into an error or null when no error + # - rd_kafka_resp_err_t - response error code (numeric) that we can use directly + # + # It is critical to ensure, that we handle them correctly. The result type should be: + # - rd_kafka_error_t - :pointer + # - rd_kafka_resp_err_t - :int module Bindings extend FFI::Library @@ -89,7 +98,7 @@ class TopicPartitionList < FFI::Struct end attach_function :rd_kafka_topic_partition_list_new, [:int32], :pointer - attach_function :rd_kafka_topic_partition_list_add, [:pointer, :string, :int32], :void + attach_function :rd_kafka_topic_partition_list_add, [:pointer, :string, :int32], :pointer attach_function :rd_kafka_topic_partition_list_set_offset, [:pointer, :string, :int32, :int64], :void attach_function :rd_kafka_topic_partition_list_destroy, [:pointer], :void attach_function :rd_kafka_topic_partition_list_copy, [:pointer], :pointer @@ -145,6 +154,11 @@ class NativeErrorDesc < FFI::Struct attach_function :rd_kafka_err2name, [:int], :string attach_function :rd_kafka_err2str, [:int], :string + attach_function :rd_kafka_error_is_fatal, [:pointer], :int + attach_function :rd_kafka_error_is_retriable, [:pointer], :int + attach_function :rd_kafka_error_txn_requires_abort, [:pointer], :int + attach_function :rd_kafka_error_destroy, [:pointer], :void + attach_function :rd_kafka_error_code, [:pointer], :int attach_function :rd_kafka_get_err_descs, [:pointer, :pointer], :void # Configuration @@ -215,7 +229,7 @@ class NativeErrorDesc < FFI::Struct :void, [:pointer, :int, :string, :pointer] ) do |_client_prr, err_code, reason, _opaque| if Rdkafka::Config.error_callback - error = Rdkafka::RdkafkaError.new(err_code, broker_message: reason) + error = Rdkafka::RdkafkaError.build(err_code, broker_message: reason) error.set_backtrace(caller) Rdkafka::Config.error_callback.call(error) end @@ -352,6 +366,11 @@ class NativeErrorDesc < FFI::Struct attach_function :rd_kafka_purge, [:pointer, :int], :int, blocking: true callback :delivery_cb, [:pointer, :pointer, :pointer], :void attach_function :rd_kafka_conf_set_dr_msg_cb, [:pointer, :delivery_cb], :void + attach_function :rd_kafka_init_transactions, [:pointer, :int], :pointer, blocking: true + attach_function :rd_kafka_send_offsets_to_transaction, [:pointer, :pointer, :pointer, :int], :pointer, blocking: true + attach_function :rd_kafka_begin_transaction, [:pointer], :pointer, blocking: true + attach_function :rd_kafka_abort_transaction, [:pointer, :int], :pointer, blocking: true + attach_function :rd_kafka_commit_transaction, [:pointer, :int], :pointer, blocking: true # Partitioner PARTITIONERS = %w(random consistent consistent_random murmur2 murmur2_random fnv1a fnv1a_random).each_with_object({}) do |name, hsh| diff --git a/lib/rdkafka/consumer.rb b/lib/rdkafka/consumer.rb index 8c62c348..6b367f58 100644 --- a/lib/rdkafka/consumer.rb +++ b/lib/rdkafka/consumer.rb @@ -74,9 +74,8 @@ def subscribe(*topics) response = @native_kafka.with_inner do |inner| Rdkafka::Bindings.rd_kafka_subscribe(inner, tpl) end - if response != 0 - raise Rdkafka::RdkafkaError.new(response, "Error subscribing to '#{topics.join(', ')}'") - end + + Rdkafka::RdkafkaError.validate!(response, "Error subscribing to '#{topics.join(', ')}'") ensure Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) unless tpl.nil? end @@ -91,9 +90,10 @@ def unsubscribe response = @native_kafka.with_inner do |inner| Rdkafka::Bindings.rd_kafka_unsubscribe(inner) end - if response != 0 - raise Rdkafka::RdkafkaError.new(response) - end + + Rdkafka::RdkafkaError.validate!(response) + + nil end # Pause producing or consumption for the provided list of partitions @@ -142,9 +142,10 @@ def resume(list) response = @native_kafka.with_inner do |inner| Rdkafka::Bindings.rd_kafka_resume_partitions(inner, tpl) end - if response != 0 - raise Rdkafka::RdkafkaError.new(response, "Error resume '#{list.to_h}'") - end + + Rdkafka::RdkafkaError.validate!(response, "Error resume '#{list.to_h}'") + + nil ensure Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) end @@ -162,9 +163,7 @@ def subscription Rdkafka::Bindings.rd_kafka_subscription(inner, ptr) end - if response != 0 - raise Rdkafka::RdkafkaError.new(response) - end + Rdkafka::RdkafkaError.validate!(response) native = ptr.read_pointer @@ -192,9 +191,8 @@ def assign(list) response = @native_kafka.with_inner do |inner| Rdkafka::Bindings.rd_kafka_assign(inner, tpl) end - if response != 0 - raise Rdkafka::RdkafkaError.new(response, "Error assigning '#{list.to_h}'") - end + + Rdkafka::RdkafkaError.validate!(response, "Error assigning '#{list.to_h}'") ensure Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) end @@ -211,9 +209,8 @@ def assignment response = @native_kafka.with_inner do |inner| Rdkafka::Bindings.rd_kafka_assignment(inner, ptr) end - if response != 0 - raise Rdkafka::RdkafkaError.new(response) - end + + Rdkafka::RdkafkaError.validate!(response) tpl = ptr.read_pointer @@ -246,7 +243,7 @@ def assignment_lost? # @param timeout_ms [Integer] The timeout for fetching this information. # @return [TopicPartitionList] # @raise [RdkafkaError] When getting the committed positions fails. - def committed(list=nil, timeout_ms=2000) + def committed(list=nil, timeout_ms=2_000) closed_consumer_check(__method__) if list.nil? @@ -261,9 +258,9 @@ def committed(list=nil, timeout_ms=2000) response = @native_kafka.with_inner do |inner| Rdkafka::Bindings.rd_kafka_committed(inner, tpl, timeout_ms) end - if response != 0 - raise Rdkafka::RdkafkaError.new(response) - end + + Rdkafka::RdkafkaError.validate!(response) + TopicPartitionList.from_native_tpl(tpl) ensure Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) @@ -291,9 +288,7 @@ def position(list=nil) Rdkafka::Bindings.rd_kafka_position(inner, tpl) end - if response != 0 - raise Rdkafka::RdkafkaError.new(response) - end + Rdkafka::RdkafkaError.validate!(response) TopicPartitionList.from_native_tpl(tpl) end @@ -321,9 +316,8 @@ def query_watermark_offsets(topic, partition, timeout_ms=1000) timeout_ms, ) end - if response != 0 - raise Rdkafka::RdkafkaError.new(response, "Error querying watermark offsets for partition #{partition} of #{topic}") - end + + Rdkafka::RdkafkaError.validate!(response, "Error querying watermark offsets for partition #{partition} of #{topic}") return low.read_array_of_int64(1).first, high.read_array_of_int64(1).first ensure @@ -350,7 +344,7 @@ def lag(topic_partition_list, watermark_timeout_ms=1000) topic_out = {} partitions.each do |p| next if p.offset.nil? - low, high = query_watermark_offsets( + _, high = query_watermark_offsets( topic, p.partition, watermark_timeout_ms @@ -389,16 +383,34 @@ def member_id # When using this `enable.auto.offset.store` should be set to `false` in the config. # # @param message [Rdkafka::Consumer::Message] The message which offset will be stored + # @param metadata [String, nil] commit metadata string or nil if none # @return [nil] # @raise [RdkafkaError] When storing the offset fails - def store_offset(message) + def store_offset(message, metadata = nil) closed_consumer_check(__method__) list = TopicPartitionList.new - list.add_topic_and_partitions_with_offsets( - message.topic, - message.partition => message.offset + 1 - ) + + # For metadata aware commits we build the partition reference directly to save on + # objects allocations + if metadata + list.add_topic_and_partitions_with_offsets( + message.topic, + [ + Consumer::Partition.new( + message.partition, + message.offset + 1, + 0, + metadata + ) + ] + ) + else + list.add_topic_and_partitions_with_offsets( + message.topic, + message.partition => message.offset + 1 + ) + end tpl = list.to_native_tpl @@ -409,9 +421,9 @@ def store_offset(message) ) end - if response != 0 - raise Rdkafka::RdkafkaError.new(response) - end + Rdkafka::RdkafkaError.validate!(response) + + nil ensure Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) if tpl end @@ -453,9 +465,9 @@ def seek_by(topic, partition, offset) offset, 0 # timeout ) - if response != 0 - raise Rdkafka::RdkafkaError.new(response) - end + Rdkafka::RdkafkaError.validate!(response) + + nil ensure if native_topic && !native_topic.null? Rdkafka::Bindings.rd_kafka_topic_destroy(native_topic) @@ -486,9 +498,7 @@ def offsets_for_times(list, timeout_ms = 1000) ) end - if response != 0 - raise Rdkafka::RdkafkaError.new(response) - end + Rdkafka::RdkafkaError.validate!(response) TopicPartitionList.from_native_tpl(tpl) ensure @@ -521,9 +531,10 @@ def commit(list=nil, async=false) response = @native_kafka.with_inner do |inner| Rdkafka::Bindings.rd_kafka_commit(inner, tpl, async) end - if response != 0 - raise Rdkafka::RdkafkaError.new(response) - end + + Rdkafka::RdkafkaError.validate!(response) + + nil ensure Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) if tpl end @@ -540,18 +551,17 @@ def poll(timeout_ms) message_ptr = @native_kafka.with_inner do |inner| Rdkafka::Bindings.rd_kafka_consumer_poll(inner, timeout_ms) end - if message_ptr.null? - nil - else - # Create struct wrapper - native_message = Rdkafka::Bindings::Message.new(message_ptr) - # Raise error if needed - if native_message[:err] != 0 - raise Rdkafka::RdkafkaError.new(native_message[:err]) - end - # Create a message to pass out - Rdkafka::Consumer::Message.new(native_message) - end + + return nil if message_ptr.null? + + # Create struct wrapper + native_message = Rdkafka::Bindings::Message.new(message_ptr) + + # Create a message to pass out + return Rdkafka::Consumer::Message.new(native_message) if native_message[:err].zero? + + # Raise error if needed + Rdkafka::RdkafkaError.validate!(native_message) ensure # Clean up rdkafka message if there is one if message_ptr && !message_ptr.null? diff --git a/lib/rdkafka/consumer/partition.rb b/lib/rdkafka/consumer/partition.rb index 8e143486..25de331b 100644 --- a/lib/rdkafka/consumer/partition.rb +++ b/lib/rdkafka/consumer/partition.rb @@ -16,11 +16,16 @@ class Partition # @return [Integer] attr_reader :err + # Partition metadata in the context of a consumer + # @return [String, nil] + attr_reader :metadata + # @private - def initialize(partition, offset, err = 0) + def initialize(partition, offset, err = 0, metadata = nil) @partition = partition @offset = offset @err = err + @metadata = metadata end # Human readable representation of this partition. @@ -29,6 +34,7 @@ def to_s message = "] The topic's partitions and offsets + # @param partitions_with_offsets [Array] The topic's partitions with offsets + # and metadata (if any) # # @return [nil] def add_topic_and_partitions_with_offsets(topic, partitions_with_offsets) - @data[topic.to_s] = partitions_with_offsets.map { |p, o| Partition.new(p, o) } + @data[topic.to_s] = partitions_with_offsets.map do |p, o| + p.is_a?(Partition) ? p : Partition.new(p, o) + end end # Return a `Hash` with the topics as keys and and an array of partition information as the value if present. @@ -114,7 +118,13 @@ def self.from_native_tpl(pointer) else elem[:offset] end - partition = Partition.new(elem[:partition], offset, elem[:err]) + + partition = Partition.new( + elem[:partition], + offset, + elem[:err], + elem[:metadata].null? ? nil : elem[:metadata].read_string(elem[:metadata_size]) + ) partitions.push(partition) data[elem[:topic]] = partitions end @@ -136,12 +146,23 @@ def to_native_tpl @data.each do |topic, partitions| if partitions partitions.each do |p| - Rdkafka::Bindings.rd_kafka_topic_partition_list_add( + ref = Rdkafka::Bindings.rd_kafka_topic_partition_list_add( tpl, topic, p.partition ) + # Remove the respond to check after karafka 2.3.0 is released + if p.respond_to?(:metadata) && p.metadata + part = Rdkafka::Bindings::TopicPartition.new(ref) + str_ptr = FFI::MemoryPointer.from_string(p.metadata) + # released here: + # https://github.com/confluentinc/librdkafka/blob/e03d3bb91ed92a38f38d9806b8d8deffe78a1de5/src/rdkafka_partition.c#L2682C18-L2682C18 + str_ptr.autorelease = false + part[:metadata] = str_ptr + part[:metadata_size] = p.metadata.bytesize + end + if p.offset offset = p.offset.is_a?(Time) ? p.offset.to_f * 1_000 : p.offset diff --git a/lib/rdkafka/error.rb b/lib/rdkafka/error.rb index afae22c7..0d4e8f41 100644 --- a/lib/rdkafka/error.rb +++ b/lib/rdkafka/error.rb @@ -6,6 +6,9 @@ class BaseError < RuntimeError; end # Error returned by the underlying rdkafka library. class RdkafkaError < BaseError + # Empty hash for details default allocation + EMPTY_HASH = {}.freeze + # The underlying raw error response # @return [Integer] attr_reader :rdkafka_response @@ -18,12 +21,90 @@ class RdkafkaError < BaseError # @return [String] attr_reader :broker_message + # Optional details hash specific to a given error or empty hash if none or not supported + # @return [Hash] + attr_reader :details + + class << self + def build_from_c(response_ptr, message_prefix = nil, broker_message: nil) + code = Rdkafka::Bindings.rd_kafka_error_code(response_ptr) + + return false if code.zero? + + message = broker_message || Rdkafka::Bindings.rd_kafka_err2str(code) + fatal = !Rdkafka::Bindings.rd_kafka_error_is_fatal(response_ptr).zero? + retryable = !Rdkafka::Bindings.rd_kafka_error_is_retriable(response_ptr).zero? + abortable = !Rdkafka::Bindings.rd_kafka_error_txn_requires_abort(response_ptr).zero? + + Rdkafka::Bindings.rd_kafka_error_destroy(response_ptr) + + new( + code, + message_prefix, + broker_message: message, + fatal: fatal, + retryable: retryable, + abortable: abortable + ) + end + + def build(response_ptr_or_code, message_prefix = nil, broker_message: nil) + case response_ptr_or_code + when Integer + return false if response_ptr_or_code.zero? + + new(response_ptr_or_code, message_prefix, broker_message: broker_message) + when Bindings::Message + return false if response_ptr_or_code[:err].zero? + + unless response_ptr_or_code[:payload].null? + message_prefix ||= response_ptr_or_code[:payload].read_string(response_ptr_or_code[:len]) + end + + details = if response_ptr_or_code[:rkt].null? + EMPTY_HASH + else + { + partition: response_ptr_or_code[:partition], + offset: response_ptr_or_code[:offset], + topic: Bindings.rd_kafka_topic_name(response_ptr_or_code[:rkt]) + }.freeze + end + new( + response_ptr_or_code[:err], + message_prefix, + broker_message: broker_message, + details: details + ) + else + build_from_c(response_ptr_or_code, message_prefix) + end + end + + def validate!(response_ptr_or_code, message_prefix = nil, broker_message: nil) + error = build(response_ptr_or_code, message_prefix, broker_message: broker_message) + error ? raise(error) : false + end + end + # @private - def initialize(response, message_prefix=nil, broker_message: nil) + def initialize( + response, + message_prefix=nil, + broker_message: nil, + fatal: false, + retryable: false, + abortable: false, + details: EMPTY_HASH + ) raise TypeError.new("Response has to be an integer") unless response.is_a? Integer @rdkafka_response = response @message_prefix = message_prefix @broker_message = broker_message + @fatal = fatal + @retryable = retryable + @abortable = abortable + @details = details end # This error's code, for example `:partition_eof`, `:msg_size_too_large`. @@ -58,6 +139,18 @@ def is_partition_eof? def ==(another_error) another_error.is_a?(self.class) && (self.to_s == another_error.to_s) end + + def fatal? + @fatal + end + + def retryable? + @retryable + end + + def abortable? + @abortable + end end # Error with topic partition list returned by the underlying rdkafka library. diff --git a/lib/rdkafka/metadata.rb b/lib/rdkafka/metadata.rb index 10cca3be..8cac1a15 100644 --- a/lib/rdkafka/metadata.rb +++ b/lib/rdkafka/metadata.rb @@ -29,8 +29,7 @@ def initialize(native_client, topic_name = nil, timeout_ms = 2_000) # Retrieve the Metadata result = Rdkafka::Bindings.rd_kafka_metadata(native_client, topic_flag, native_topic, ptr, timeout_ms) - # Error Handling - raise Rdkafka::RdkafkaError.new(result) unless result.zero? + Rdkafka::RdkafkaError.validate!(result) metadata_from_native(ptr.read_pointer) rescue ::Rdkafka::RdkafkaError => e @@ -58,11 +57,12 @@ def metadata_from_native(ptr) @topics = Array.new(metadata[:topics_count]) do |i| topic = TopicMetadata.new(metadata[:topics_metadata] + (i * TopicMetadata.size)) - raise Rdkafka::RdkafkaError.new(topic[:rd_kafka_resp_err]) unless topic[:rd_kafka_resp_err].zero? + + RdkafkaError.validate!(topic[:rd_kafka_resp_err]) partitions = Array.new(topic[:partition_count]) do |j| partition = PartitionMetadata.new(topic[:partitions_metadata] + (j * PartitionMetadata.size)) - raise Rdkafka::RdkafkaError.new(partition[:rd_kafka_resp_err]) unless partition[:rd_kafka_resp_err].zero? + RdkafkaError.validate!(partition[:rd_kafka_resp_err]) partition.to_h end topic.to_h.merge!(partitions: partitions) diff --git a/lib/rdkafka/producer.rb b/lib/rdkafka/producer.rb index 74168210..9cb3221f 100644 --- a/lib/rdkafka/producer.rb +++ b/lib/rdkafka/producer.rb @@ -135,6 +135,74 @@ def delivery_callback=(callback) @delivery_callback_arity = arity(callback) end + # Init transactions + # Run once per producer + def init_transactions + closed_producer_check(__method__) + + @native_kafka.with_inner do |inner| + response_ptr = Rdkafka::Bindings.rd_kafka_init_transactions(inner, -1) + + Rdkafka::RdkafkaError.validate!(response_ptr) || true + end + end + + def begin_transaction + closed_producer_check(__method__) + + @native_kafka.with_inner do |inner| + response_ptr = Rdkafka::Bindings.rd_kafka_begin_transaction(inner) + + Rdkafka::RdkafkaError.validate!(response_ptr) || true + end + end + + def commit_transaction(timeout_ms = -1) + closed_producer_check(__method__) + + @native_kafka.with_inner do |inner| + response_ptr = Rdkafka::Bindings.rd_kafka_commit_transaction(inner, timeout_ms) + + Rdkafka::RdkafkaError.validate!(response_ptr) || true + end + end + + def abort_transaction(timeout_ms = -1) + closed_producer_check(__method__) + + @native_kafka.with_inner do |inner| + response_ptr = Rdkafka::Bindings.rd_kafka_abort_transaction(inner, timeout_ms) + Rdkafka::RdkafkaError.validate!(response_ptr) || true + end + end + + # Sends provided offsets of a consumer to the transaction for collective commit + # + # @param consumer [Consumer] consumer that owns the given tpls + # @param tpl [Consumer::TopicPartitionList] + # @param timeout_ms [Integer] offsets send timeout + # @note Use **only** in the context of an active transaction + def send_offsets_to_transaction(consumer, tpl, timeout_ms = 5_000) + closed_producer_check(__method__) + + return if tpl.empty? + + cgmetadata = consumer.consumer_group_metadata_pointer + native_tpl = tpl.to_native_tpl + + @native_kafka.with_inner do |inner| + response_ptr = Bindings.rd_kafka_send_offsets_to_transaction(inner, native_tpl, cgmetadata, timeout_ms) + + Rdkafka::RdkafkaError.validate!(response_ptr) + end + ensure + if cgmetadata && !cgmetadata.null? + Bindings.rd_kafka_consumer_group_metadata_destroy(cgmetadata) + end + + Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(native_tpl) unless native_tpl.nil? + end + # Close this producer and wait for the internal poll queue to empty. def close return if closed? @@ -172,17 +240,13 @@ def closed? def flush(timeout_ms=5_000) closed_producer_check(__method__) - code = nil - - @native_kafka.with_inner do |inner| - code = Rdkafka::Bindings.rd_kafka_flush(inner, timeout_ms) + error = @native_kafka.with_inner do |inner| + response = Rdkafka::Bindings.rd_kafka_flush(inner, timeout_ms) + Rdkafka::RdkafkaError.build(response) end # Early skip not to build the error message - return true if code.zero? - - error = Rdkafka::RdkafkaError.new(code) - + return true unless error return false if error.code == :timed_out raise(error) @@ -197,16 +261,14 @@ def flush(timeout_ms=5_000) def purge closed_producer_check(__method__) - code = nil - @native_kafka.with_inner do |inner| - code = Bindings.rd_kafka_purge( + response = Bindings.rd_kafka_purge( inner, Bindings::RD_KAFKA_PURGE_F_QUEUE | Bindings::RD_KAFKA_PURGE_F_INFLIGHT ) - end - code.zero? || raise(Rdkafka::RdkafkaError.new(code)) + Rdkafka::RdkafkaError.validate!(response) + end # Wait for the purge to affect everything sleep(0.001) until flush(100) @@ -360,7 +422,7 @@ def produce( # Raise error if the produce call was not successful if response != 0 DeliveryHandle.remove(delivery_handle.to_ptr.address) - raise RdkafkaError.new(response) + Rdkafka::RdkafkaError.validate!(response) end delivery_handle diff --git a/lib/rdkafka/producer/delivery_handle.rb b/lib/rdkafka/producer/delivery_handle.rb index b73890c9..936425b3 100644 --- a/lib/rdkafka/producer/delivery_handle.rb +++ b/lib/rdkafka/producer/delivery_handle.rb @@ -25,15 +25,23 @@ def operation_name # @return [DeliveryReport] a report on the delivery of the message def create_result - DeliveryReport.new( - self[:partition], - self[:offset], - # For part of errors, we will not get a topic name reference and in cases like this - # we should not return it - topic, - self[:response] != 0 ? RdkafkaError.new(self[:response]) : nil, - label - ) + if self[:response] == 0 + DeliveryReport.new( + self[:partition], + self[:offset], + topic, + nil, + label + ) + else + DeliveryReport.new( + self[:partition], + self[:offset], + topic, + Rdkafka::RdkafkaError.build(self[:response]), + label + ) + end end end end diff --git a/lib/rdkafka/producer/delivery_report.rb b/lib/rdkafka/producer/delivery_report.rb index 01d29f98..c4628225 100644 --- a/lib/rdkafka/producer/delivery_report.rb +++ b/lib/rdkafka/producer/delivery_report.rb @@ -12,9 +12,8 @@ class DeliveryReport # @return [Integer] attr_reader :offset - # The name of the topic this message was produced to or nil in case of reports with errors - # where topic was not reached. - # + # The name of the topic this message was produced to or nil in case delivery failed and we + # we not able to get the topic reference # @return [String, nil] attr_reader :topic_name diff --git a/lib/rdkafka/version.rb b/lib/rdkafka/version.rb index 3f4fd99d..3dba2da7 100644 --- a/lib/rdkafka/version.rb +++ b/lib/rdkafka/version.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true module Rdkafka - VERSION = "0.19.1" + VERSION = "0.18.0" LIBRDKAFKA_VERSION = "2.5.3" LIBRDKAFKA_SOURCE_SHA256 = "eaa1213fdddf9c43e28834d9a832d9dd732377d35121e42f875966305f52b8ff" end diff --git a/spec/rdkafka/admin/create_topic_handle_spec.rb b/spec/rdkafka/admin/create_topic_handle_spec.rb index 059ad1a2..2b630160 100644 --- a/spec/rdkafka/admin/create_topic_handle_spec.rb +++ b/spec/rdkafka/admin/create_topic_handle_spec.rb @@ -43,10 +43,12 @@ describe "#raise_error" do let(:pending_handle) { false } - it "should raise the appropriate error" do + before { subject[:response] = -1 } + + it "should raise the appropriate error when there is an error" do expect { subject.raise_error - }.to raise_exception(Rdkafka::RdkafkaError, /Success \(no_error\)/) + }.to raise_exception(Rdkafka::RdkafkaError, /Unknown broker error \(unknown\)/) end end end diff --git a/spec/rdkafka/admin/delete_topic_handle_spec.rb b/spec/rdkafka/admin/delete_topic_handle_spec.rb index 95ae2155..f2dba80b 100644 --- a/spec/rdkafka/admin/delete_topic_handle_spec.rb +++ b/spec/rdkafka/admin/delete_topic_handle_spec.rb @@ -43,10 +43,12 @@ describe "#raise_error" do let(:pending_handle) { false } + before { subject[:response] = -1 } + it "should raise the appropriate error" do expect { subject.raise_error - }.to raise_exception(Rdkafka::RdkafkaError, /Success \(no_error\)/) + }.to raise_exception(Rdkafka::RdkafkaError, /Unknown broker error \(unknown\)/) end end end diff --git a/spec/rdkafka/admin_spec.rb b/spec/rdkafka/admin_spec.rb index d501e6e2..fac30d6f 100644 --- a/spec/rdkafka/admin_spec.rb +++ b/spec/rdkafka/admin_spec.rb @@ -66,7 +66,7 @@ }.to raise_exception { |ex| expect(ex).to be_a(Rdkafka::RdkafkaError) expect(ex.message).to match(/Broker: Invalid topic \(topic_exception\)/) -expect(ex.broker_message).to match(/Topic name.*is invalid: .* contains one or more characters other than ASCII alphanumerics, '.', '_' and '-'/) + expect(ex.broker_message).to match(/Topic name.*is invalid: .* contains one or more characters other than ASCII alphanumerics, '.', '_' and '-'/) } end end diff --git a/spec/rdkafka/consumer_spec.rb b/spec/rdkafka/consumer_spec.rb index e5da460d..c54df7e8 100644 --- a/spec/rdkafka/consumer_spec.rb +++ b/spec/rdkafka/consumer_spec.rb @@ -467,7 +467,6 @@ def send_one_message(val) end end - describe "#position, #commit, #committed and #store_offset" do # Make sure there are messages to work with let!(:report) do @@ -588,12 +587,18 @@ def send_one_message(val) describe "#store_offset" do let(:consumer) { rdkafka_consumer_config('enable.auto.offset.store': false).consumer } + let(:metadata) { SecureRandom.uuid } + let(:group_id) { SecureRandom.uuid } + let(:base_config) do + { + 'group.id': group_id, + 'enable.auto.offset.store': false, + 'enable.auto.commit': false + } + end before do - config = {} - config[:'enable.auto.offset.store'] = false - config[:'enable.auto.commit'] = false - @new_consumer = rdkafka_consumer_config(config).consumer + @new_consumer = rdkafka_consumer_config(base_config).consumer @new_consumer.subscribe("consume_test_topic") wait_for_assignment(@new_consumer) end @@ -616,6 +621,19 @@ def send_one_message(val) expect(partitions[message.partition].offset).to eq(message.offset + 1) end + it "should store the offset for a message with metadata" do + @new_consumer.store_offset(message, metadata) + @new_consumer.commit + @new_consumer.close + + meta_consumer = rdkafka_consumer_config(base_config).consumer + meta_consumer.subscribe("consume_test_topic") + wait_for_assignment(meta_consumer) + meta_consumer.poll(1_000) + expect(meta_consumer.committed.to_h[message.topic][message.partition].metadata).to eq(metadata) + meta_consumer.close + end + it "should raise an error with invalid input" do allow(message).to receive(:partition).and_return(9999) expect { @@ -819,6 +837,15 @@ def send_one_message(val) consumer.poll(100) }.to raise_error Rdkafka::RdkafkaError end + + it "expect to raise error when polling non-existing topic" do + missing_topic = SecureRandom.uuid + consumer.subscribe(missing_topic) + + expect { + consumer.poll(1_000) + }.to raise_error Rdkafka::RdkafkaError, /Subscribed topic not available: #{missing_topic}/ + end end describe "#poll with headers" do @@ -1324,7 +1351,8 @@ def on_partitions_revoked(list) :assign => [ nil ], :assignment => nil, :committed => [], - :query_watermark_offsets => [ nil, nil ] + :query_watermark_offsets => [ nil, nil ], + :assignment_lost? => [] }.each do |method, args| it "raises an exception if #{method} is called" do expect { @@ -1439,4 +1467,39 @@ def collect(name, list) end end end + + describe "when reaching eof on a topic and eof reporting enabled" do + let(:consumer) { rdkafka_consumer_config(:"enable.partition.eof" => true).consumer } + + it "should return proper details" do + (0..2).each do |i| + producer.produce( + topic: "consume_test_topic", + key: "key lag #{i}", + partition: i + ).wait + end + + # Consume to the end + consumer.subscribe("consume_test_topic") + eof_count = 0 + eof_error = nil + + loop do + begin + consumer.poll(100) + rescue Rdkafka::RdkafkaError => error + if error.is_partition_eof? + eof_error = error + end + break if eof_error + end + end + + expect(eof_error.code).to eq(:partition_eof) + expect(eof_error.details[:topic]).to eq('consume_test_topic') + expect(eof_error.details[:partition]).to be_a(Integer) + expect(eof_error.details[:offset]).to be_a(Integer) + end + end end diff --git a/spec/rdkafka/error_spec.rb b/spec/rdkafka/error_spec.rb index 3f1567e3..af72f638 100644 --- a/spec/rdkafka/error_spec.rb +++ b/spec/rdkafka/error_spec.rb @@ -11,6 +11,12 @@ expect(Rdkafka::RdkafkaError.new(10, "message prefix").message_prefix).to eq "message prefix" end + it "should have empty frozen details by default" do + error = Rdkafka::RdkafkaError.new(10, "message prefix") + expect(error.details).to eq({}) + expect(error.details).to be_frozen + end + it "should create an error with a broker message" do expect(Rdkafka::RdkafkaError.new(10, broker_message: "broker message").broker_message).to eq "broker message" end diff --git a/spec/rdkafka/producer/delivery_handle_spec.rb b/spec/rdkafka/producer/delivery_handle_spec.rb index b9095a24..4641d8e6 100644 --- a/spec/rdkafka/producer/delivery_handle_spec.rb +++ b/spec/rdkafka/producer/delivery_handle_spec.rb @@ -42,4 +42,19 @@ end end end + + describe '#create_result' do + let(:pending_handle) { false } + let(:report) { subject.create_result } + + context 'when response is 0' do + it { expect(report.error).to eq(nil) } + end + + context 'when response is not 0' do + let(:response) { 1 } + + it { expect(report.error).to eq(Rdkafka::RdkafkaError.new(response)) } + end + end end diff --git a/spec/rdkafka/producer_spec.rb b/spec/rdkafka/producer_spec.rb index d86f6895..82c28aab 100644 --- a/spec/rdkafka/producer_spec.rb +++ b/spec/rdkafka/producer_spec.rb @@ -638,16 +638,16 @@ def call(_, handle) end describe '#partition_count' do - it { expect(producer.partition_count('consume_test_topic')).to eq(3) } + it { expect(producer.partition_count('example_topic')).to eq(1) } context 'when the partition count value is already cached' do before do - producer.partition_count('consume_test_topic') + producer.partition_count('example_topic') allow(::Rdkafka::Metadata).to receive(:new).and_call_original end it 'expect not to query it again' do - producer.partition_count('consume_test_topic') + producer.partition_count('example_topic') expect(::Rdkafka::Metadata).not_to have_received(:new) end end @@ -655,12 +655,12 @@ def call(_, handle) context 'when the partition count value was cached but time expired' do before do allow(::Process).to receive(:clock_gettime).and_return(0, 30.02) - producer.partition_count('consume_test_topic') + producer.partition_count('example_topic') allow(::Rdkafka::Metadata).to receive(:new).and_call_original end it 'expect not to query it again' do - producer.partition_count('consume_test_topic') + producer.partition_count('example_topic') expect(::Rdkafka::Metadata).to have_received(:new) end end @@ -668,17 +668,44 @@ def call(_, handle) context 'when the partition count value was cached and time did not expire' do before do allow(::Process).to receive(:clock_gettime).and_return(0, 29.001) - producer.partition_count('consume_test_topic') + producer.partition_count('example_topic') allow(::Rdkafka::Metadata).to receive(:new).and_call_original end it 'expect not to query it again' do - producer.partition_count('consume_test_topic') + producer.partition_count('example_topic') expect(::Rdkafka::Metadata).not_to have_received(:new) end end end + describe 'metadata fetch request recovery' do + subject(:partition_count) { producer.partition_count('example_topic') } + + describe 'metadata initialization recovery' do + context 'when all good' do + it { expect(partition_count).to eq(1) } + end + + context 'when we fail for the first time with handled error' do + before do + raised = false + + allow(Rdkafka::Bindings).to receive(:rd_kafka_metadata).and_wrap_original do |m, *args| + if raised + m.call(*args) + else + raised = true + -185 + end + end + end + + it { expect(partition_count).to eq(1) } + end + end + end + describe '#flush' do it "should return flush when it can flush all outstanding messages or when no messages" do producer.produce( @@ -790,6 +817,162 @@ def call(_, handle) end end + context 'when working with transactions' do + let(:producer) do + rdkafka_producer_config( + 'transactional.id': SecureRandom.uuid, + 'transaction.timeout.ms': 5_000 + ).producer + end + + it 'expect not to allow to produce without transaction init' do + expect do + producer.produce(topic: 'produce_test_topic', payload: 'data') + end.to raise_error(Rdkafka::RdkafkaError, /Erroneous state \(state\)/) + end + + it 'expect to raise error when transactions are initialized but producing not in one' do + producer.init_transactions + + expect do + producer.produce(topic: 'produce_test_topic', payload: 'data') + end.to raise_error(Rdkafka::RdkafkaError, /Erroneous state \(state\)/) + end + + it 'expect to allow to produce within a transaction, finalize and ship data' do + producer.init_transactions + producer.begin_transaction + handle1 = producer.produce(topic: 'produce_test_topic', payload: 'data1', partition: 1) + handle2 = producer.produce(topic: 'example_topic', payload: 'data2', partition: 0) + producer.commit_transaction + + report1 = handle1.wait(max_wait_timeout: 15) + report2 = handle2.wait(max_wait_timeout: 15) + + message1 = wait_for_message( + topic: "produce_test_topic", + delivery_report: report1, + consumer: consumer + ) + + expect(message1.partition).to eq 1 + expect(message1.payload).to eq "data1" + expect(message1.timestamp).to be_within(10).of(Time.now) + + message2 = wait_for_message( + topic: "example_topic", + delivery_report: report2, + consumer: consumer + ) + + expect(message2.partition).to eq 0 + expect(message2.payload).to eq "data2" + expect(message2.timestamp).to be_within(10).of(Time.now) + end + + it 'expect not to send data and propagate purge queue error on abort' do + producer.init_transactions + producer.begin_transaction + handle1 = producer.produce(topic: 'produce_test_topic', payload: 'data1', partition: 1) + handle2 = producer.produce(topic: 'example_topic', payload: 'data2', partition: 0) + producer.abort_transaction + + expect { handle1.wait(max_wait_timeout: 15) } + .to raise_error(Rdkafka::RdkafkaError, /Purged in queue \(purge_queue\)/) + expect { handle2.wait(max_wait_timeout: 15) } + .to raise_error(Rdkafka::RdkafkaError, /Purged in queue \(purge_queue\)/) + end + + it 'expect to have non retryable, non abortable and not fatal error on abort' do + producer.init_transactions + producer.begin_transaction + handle = producer.produce(topic: 'produce_test_topic', payload: 'data1', partition: 1) + producer.abort_transaction + + response = handle.wait(raise_response_error: false) + + expect(response.error).to be_a(Rdkafka::RdkafkaError) + expect(response.error.retryable?).to eq(false) + expect(response.error.fatal?).to eq(false) + expect(response.error.abortable?).to eq(false) + end + + context 'fencing against previous active producer with same transactional id' do + let(:transactional_id) { SecureRandom.uuid } + + let(:producer1) do + rdkafka_producer_config( + 'transactional.id': transactional_id, + 'transaction.timeout.ms': 10_000 + ).producer + end + + let(:producer2) do + rdkafka_producer_config( + 'transactional.id': transactional_id, + 'transaction.timeout.ms': 10_000 + ).producer + end + + after do + producer1.close + producer2.close + end + + it 'expect older producer not to be able to commit when fanced out' do + producer1.init_transactions + producer1.begin_transaction + producer1.produce(topic: 'produce_test_topic', payload: 'data1', partition: 1) + + producer2.init_transactions + producer2.begin_transaction + producer2.produce(topic: 'produce_test_topic', payload: 'data1', partition: 1) + + expect { producer1.commit_transaction } + .to raise_error(Rdkafka::RdkafkaError, /This instance has been fenced/) + + error = false + + begin + producer1.commit_transaction + rescue Rdkafka::RdkafkaError => e + error = e + end + + expect(error.fatal?).to eq(true) + expect(error.abortable?).to eq(false) + expect(error.retryable?).to eq(false) + + expect { producer2.commit_transaction }.not_to raise_error + end + end + + context 'when having a consumer with tpls for exactly once semantics' do + let(:tpl) do + producer.produce(topic: 'consume_test_topic', payload: 'data1', partition: 0).wait + result = producer.produce(topic: 'consume_test_topic', payload: 'data1', partition: 0).wait + + Rdkafka::Consumer::TopicPartitionList.new.tap do |list| + list.add_topic_and_partitions_with_offsets("consume_test_topic", 0 => result.offset + 1) + end + end + + before do + consumer.subscribe("consume_test_topic") + wait_for_assignment(consumer) + producer.init_transactions + producer.begin_transaction + end + + after { consumer.unsubscribe } + + it 'expect to store offsets and not crash' do + producer.send_offsets_to_transaction(consumer, tpl) + producer.commit_transaction + end + end + end + describe '#oauthbearer_set_token' do context 'when sasl not configured' do it 'should return RD_KAFKA_RESP_ERR__STATE' do