Skip to content

Commit

Permalink
Merge pull request rubyonjets#700 from rubyonjets/event-helpers
Browse files Browse the repository at this point in the history
update s3 sns and sqs event helpers to return all events
  • Loading branch information
tongueroo authored Dec 23, 2023
2 parents e586e80 + 7e021d6 commit d952915
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 16 deletions.
4 changes: 2 additions & 2 deletions lib/jets/generators/job/templates/event_types/s3.rb.tt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ class <%= class_name %>Job < ApplicationJob
s3_event "my-bucket" # new or existing bucket
def <%= options[:name] %>
puts "event #{JSON.dump(event)}"
puts "s3_event #{JSON.dump(s3_event)}"
puts "s3_object #{JSON.dump(s3_object)}"
puts "s3_events #{JSON.dump(s3_events)}"
puts "s3_objects #{JSON.dump(s3_objects)}"
end
end
<% end -%>
4 changes: 4 additions & 0 deletions lib/jets/job/helpers/kinesis_event_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,9 @@ def kinesis_data
Base64.decode64(encoded) # data
end
end

def kinesis_data?
event["Records"]&.any? { |r| r.dig("kinesis", "data") }
end
end
end
4 changes: 4 additions & 0 deletions lib/jets/job/helpers/log_event_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,9 @@ def log_event
data = JSON.load(uncompressed_string)
ActiveSupport::HashWithIndifferentAccess.new(data)
end

def log_event?
!!event.dig("awslogs", "data")
end
end
end
38 changes: 34 additions & 4 deletions lib/jets/job/helpers/s3_event_helper.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,43 @@
module Jets::Job::Helpers
module S3EventHelper
def s3_events
messages = event["Records"].map do |record|
record["Sns"]["Message"]
end
message.map do |message|
h = JSON.load(message)
ActiveSupport::HashWithIndifferentAccess.new(h)
end
end

def s3_events?
event["Records"]&.any? { |r| r.dig("Sns", "Message") }
end

def s3_objects
records = s3_event["Records"]
records.map do |record|
record["s3"]["object"]
end
end

def s3_objects?
s3_event["Records"]&.any? { |r| r.dig("s3", "object") }
end

# Deprecated methods below
def s3_event
message = event["Records"][0]["Sns"]["Message"]
h = JSON.load(message)
ActiveSupport::HashWithIndifferentAccess.new(h)
puts "WARN: s3_event is deprecated".color(:yellow)
puts "It can possibly drop events when come in extremely fast."
puts "Use s3_events instead"
s3_events.first
end

def s3_object
s3_event["Records"][0]["s3"]["object"]
puts "WARN: s3_object is deprecated".color(:yellow)
puts "It can possibly drop events when come in extremely fast."
puts "Use s3_objects instead"
s3_objects.first
end
end
end
24 changes: 20 additions & 4 deletions lib/jets/job/helpers/sns_event_helper.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
module Jets::Job::Helpers
module SnsEventHelper
def sns_event_payload
message = event&.dig("Records", 0, "Sns", "Message")
@sns_event_payload ||= ActiveSupport::HashWithIndifferentAccess.new(JSON.load(message))
module SnsEventHelper
def sns_event_payloads
records = event["Records"]
return [] unless records
records.map do |record|
message = record["Sns"]["Message"]
ActiveSupport::HashWithIndifferentAccess.new(JSON.load(message))
end
end

def sns_event_payloads?
event["Records"]&.any? { |r| r.dig("Sns", "Message") }
end

# Deprecated methods below
def sns_event_payload
puts "WARN: sns_event_payload is deprecated".color(:yellow)
puts "It can possibly drop events when come in extremely fast."
puts "Use sns_event_payloads instead"
sns_event_payloads.first
end
end
end
28 changes: 22 additions & 6 deletions lib/jets/job/helpers/sqs_event_helper.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
module Jets::Job::Helpers
module SqsEventHelper
def sqs_event_payload
message = event&.dig("Records", 0, "body")
@sqs_event_payload ||= ActiveSupport::HashWithIndifferentAccess.new(JSON.parse(message))
end
module SqsEventHelper
def sqs_event_payloads
records = event["Records"]
return [] unless records
records.map do |record|
message = record["body"]
ActiveSupport::HashWithIndifferentAccess.new(JSON.load(message))
end
end
end

def sqs_event_payloads?
event["Records"]&.any? { |r| r.dig("body") }
end

# Deprecated methods below
def sqs_event_payload
puts "WARN: sqs_event_payload is deprecated".color(:yellow)
puts "It can possibly drop events when come in extremely fast."
puts "Use sqs_event_payloads instead"
sqs_event_payloads.first
end
end
end

0 comments on commit d952915

Please sign in to comment.