forked from karafka/rdkafka-ruby
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmetadata.rb
115 lines (92 loc) · 3.47 KB
/
metadata.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# frozen_string_literal: true
module Rdkafka
class Metadata
attr_reader :brokers, :topics
# Errors upon which we retry the metadata fetch
RETRIED_ERRORS = %i[
timed_out
leader_not_available
].freeze
private_constant :RETRIED_ERRORS
def initialize(native_client, topic_name = nil, timeout_ms = 2_000)
attempt ||= 0
attempt += 1
native_topic = if topic_name
Rdkafka::Bindings.rd_kafka_topic_new(native_client, topic_name, nil)
end
ptr = FFI::MemoryPointer.new(:pointer)
# If topic_flag is 1, we request info about *all* topics in the cluster. If topic_flag is 0,
# we only request info about locally known topics (or a single topic if one is passed in).
topic_flag = topic_name.nil? ? 1 : 0
# Retrieve the Metadata
result = Rdkafka::Bindings.rd_kafka_metadata(native_client, topic_flag, native_topic, ptr, timeout_ms)
# Error Handling
raise Rdkafka::RdkafkaError.new(result) unless result.zero?
metadata_from_native(ptr.read_pointer)
rescue ::Rdkafka::RdkafkaError => e
raise unless RETRIED_ERRORS.include?(e.code)
raise if attempt > 10
backoff_factor = 2**attempt
timeout = backoff_factor * 0.1
sleep(timeout)
retry
ensure
Rdkafka::Bindings.rd_kafka_topic_destroy(native_topic) if topic_name
Rdkafka::Bindings.rd_kafka_metadata_destroy(ptr.read_pointer)
end
private
def metadata_from_native(ptr)
metadata = Metadata.new(ptr)
@brokers = Array.new(metadata[:brokers_count]) do |i|
BrokerMetadata.new(metadata[:brokers_metadata] + (i * BrokerMetadata.size)).to_h
end
@topics = Array.new(metadata[:topics_count]) do |i|
topic = TopicMetadata.new(metadata[:topics_metadata] + (i * TopicMetadata.size))
raise Rdkafka::RdkafkaError.new(topic[:rd_kafka_resp_err]) unless topic[:rd_kafka_resp_err].zero?
partitions = Array.new(topic[:partition_count]) do |j|
partition = PartitionMetadata.new(topic[:partitions_metadata] + (j * PartitionMetadata.size))
raise Rdkafka::RdkafkaError.new(partition[:rd_kafka_resp_err]) unless partition[:rd_kafka_resp_err].zero?
partition.to_h
end
topic.to_h.merge!(partitions: partitions)
end
end
class CustomFFIStruct < FFI::Struct
def to_h
members.each_with_object({}) do |mem, hsh|
val = self.[](mem)
next if val.is_a?(FFI::Pointer) || mem == :rd_kafka_resp_err
hsh[mem] = self.[](mem)
end
end
end
class Metadata < CustomFFIStruct
layout :brokers_count, :int,
:brokers_metadata, :pointer,
:topics_count, :int,
:topics_metadata, :pointer,
:broker_id, :int32,
:broker_name, :string
end
class BrokerMetadata < CustomFFIStruct
layout :broker_id, :int32,
:broker_name, :string,
:broker_port, :int
end
class TopicMetadata < CustomFFIStruct
layout :topic_name, :string,
:partition_count, :int,
:partitions_metadata, :pointer,
:rd_kafka_resp_err, :int
end
class PartitionMetadata < CustomFFIStruct
layout :partition_id, :int32,
:rd_kafka_resp_err, :int,
:leader, :int32,
:replica_count, :int,
:replicas, :pointer,
:in_sync_replica_brokers, :int,
:isrs, :pointer
end
end
end