Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try one off time based consumer #338

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ gemspec
# We actually support version 7.x (see gemspec); this extra restriction is added just for running the test suite also
# on Ruby 2.6, which activesupport 7.0 does not support.
gem 'activesupport', '~> 6.1.0'
gem 'rdkafka', github: 'streemau/rdkafka-ruby', branch: 'add-offsets_for_times-method-on-consumer'
17 changes: 12 additions & 5 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
GIT
remote: https://github.com/streemau/rdkafka-ruby.git
revision: 5f0746979827f310af5bca2a9840a2770c08c446
branch: add-offsets_for_times-method-on-consumer
specs:
rdkafka (0.13.0.beta.3)
ffi (~> 1.15)
mini_portile2 (~> 2.6)
rake (> 12)

PATH
remote: .
specs:
racecar (2.9.0.beta1)
king_konf (~> 1.0.0)
rdkafka (~> 0.12.0)
rdkafka

GEM
remote: https://rubygems.org/
Expand All @@ -29,10 +39,6 @@ GEM
coderay (~> 1.1)
method_source (~> 1.0)
rake (13.0.6)
rdkafka (0.12.0)
ffi (~> 1.15)
mini_portile2 (~> 2.6)
rake (> 12)
rspec (3.12.0)
rspec-core (~> 3.12.0)
rspec-expectations (~> 3.12.0)
Expand Down Expand Up @@ -61,6 +67,7 @@ DEPENDENCIES
pry
racecar!
rake (> 10.0)
rdkafka!
rspec (~> 3.0)
timecop

Expand Down
2 changes: 1 addition & 1 deletion racecar.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Gem::Specification.new do |spec|
spec.required_ruby_version = '>= 2.6'

spec.add_runtime_dependency "king_konf", "~> 1.0.0"
spec.add_runtime_dependency "rdkafka", "~> 0.12.0"
spec.add_runtime_dependency "rdkafka"#, "~> 0.12.0"

spec.add_development_dependency "bundler", [">= 1.13", "< 3"]
spec.add_development_dependency "pry"
Expand Down
77 changes: 77 additions & 0 deletions time_period_consumer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
require 'rdkafka'

class TimePeriodConsumer
attr_reader :brokers, :group_id, :track

def initialize(brokers:, group_id:)
@brokers = brokers
@group_id = group_id
@track = {}
end

def consume(topic:, from:, to:)
consumer.subscribe(topic)

wait_for_assignment

topic_assignments = consumer.assignment.to_h.fetch(topic)
partitions_with_time = topic_assignments.map { |partition| [partition.partition, from] }

tpl = Rdkafka::Consumer::TopicPartitionList.new.tap do |list|
list.add_topic_and_partitions_with_offsets(
topic, partitions_with_time
)
end

offsets = consumer.offsets_for_times(tpl).to_h.fetch(topic)

puts "Offsets at #{from}"
pp offsets

offsets.each do |partition|
fake_msg = OpenStruct.new(topic: topic, partition: partition.partition, offset: partition.offset)
puts "Seeking to #{partition}"
consumer.seek(fake_msg)
end

consumer.each do |msg|
p({ key: msg.key, offset: msg.offset, partition: msg.partition, timestamp: msg.timestamp })

track[msg.partition] = true if msg.timestamp > to || msg.offset == -1

if offsets.all? { |partition| track.key? partition.partition }
puts 'Done with all msgs in the given time window'
consumer.unsubscribe
break
end

if track.key? msg.partition
puts "skipping #{msg.key}"
else
yield msg
end
end
ensure
puts 'Closing consumer'
consumer.close
end

private

def wait_for_assignment
10.times do
break unless consumer.assignment.empty?

puts 'sleeping'
sleep 1
end
end

def config
{ "bootstrap.servers": brokers.join(','), "group.id": group_id } # , debug: "cgrp,topic,fetch" }
end

def consumer
@consumer ||= Rdkafka::Config.new(config).consumer
end
end