Skip to content

Commit

Permalink
Merge branch 'master' of github.com:phutchins/logstash-input-mongodb
Browse files Browse the repository at this point in the history
  • Loading branch information
phutchins committed Feb 2, 2017
2 parents 43e6fa0 + 7d4b53e commit 2f21016
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 29 deletions.
95 changes: 68 additions & 27 deletions lib/logstash/inputs/mongodb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ class LogStash::Inputs::MongoDB < LogStash::Inputs::Base

config :since_table, :validate => :string, :default => "logstash_since"

# This allows you to select the column you would like compare the since info
config :since_column, :validate => :string, :default => "_id"

# This allows you to select the type of since info, like "id", "date"
config :since_type, :validate => :string, :default => "id"

# The collection to use. Is turned into a regex so 'events' will match 'events_20150227'
# Example collection: events_20150227 or events_
config :collection, :validate => :string, :required => true
Expand Down Expand Up @@ -85,9 +91,16 @@ def init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
@logger.debug("init placeholder for #{since_table}_#{mongo_collection_name}")
since = sqlitedb[SINCE_TABLE]
mongo_collection = mongodb.collection(mongo_collection_name)
first_entry = mongo_collection.find({}).sort('_id' => 1).limit(1).first
first_entry_id = first_entry['_id'].to_s

first_entry = mongo_collection.find({}).sort(since_column => 1).limit(1).first
first_entry_id = ''
if since_type == 'id'
first_entry_id = first_entry[since_column].to_s
else
first_entry_id = first_entry[since_column].to_i
end
since.insert(:table => "#{since_table}_#{mongo_collection_name}", :place => first_entry_id)
@logger.info("init placeholder for #{since_table}_#{mongo_collection_name}: #{first_entry}")
return first_entry_id
end

Expand Down Expand Up @@ -134,7 +147,13 @@ def get_cursor_for_collection(mongodb, mongo_collection_name, last_id_object, ba
collection = mongodb.collection(mongo_collection_name)
# Need to make this sort by date in object id then get the first of the series
# db.events_20150320.find().limit(1).sort({ts:1})
return collection.find({:_id => {:$gt => last_id_object}}).limit(batch_size)

av = {}
if last_id_object != ''
av = {since_column => {:$gt => last_id_object}}
end

return collection.find(av).sort(since_column => 1).limit(batch_size)
end

public
Expand Down Expand Up @@ -224,17 +243,26 @@ def run(queue)
last_id = @collection_data[index][:last_id]
#@logger.debug("last_id is #{last_id}", :index => index, :collection => collection_name)
# get batch of events starting at the last_place if it is set
last_id_object = BSON::ObjectId(last_id)


last_id_object = last_id
if since_type == 'id'
last_id_object = BSON::ObjectId(last_id)
elsif since_type == 'time'
if last_id != ''
last_id_object = Time.at(last_id)
end
end
cursor = get_cursor_for_collection(@mongodb, collection_name, last_id_object, batch_size)
cursor.each do |doc|
logdate = DateTime.parse(doc['_id'].generation_time.to_s)
event = LogStash::Event.new("host" => @host)
decorate(event)
event["logdate"] = logdate.iso8601
event.set("logdate","logdate.iso8601")
log_entry = doc.to_h.to_s
log_entry['_id'] = log_entry['_id'].to_s
event["log_entry"] = log_entry
event["mongo_id"] = doc['_id'].to_s
event.set("log_entry", "log_entry")
event.set("mongo_id",doc['_id'].to_s)
@logger.debug("mongo_id: "+doc['_id'].to_s)
#@logger.debug("EVENT looks like: "+event.to_s)
#@logger.debug("Sent message: "+doc.to_h.to_s)
Expand All @@ -245,8 +273,8 @@ def run(queue)
doc_obj_bin = doc_hex_bytes.pack("C*").unpack("a4 a3 a2 a3")
host_id = doc_obj_bin[1].unpack("S")
process_id = doc_obj_bin[2].unpack("S")
event['host_id'] = host_id.first.to_i
event['process_id'] = process_id.first.to_i
event.set('host_id',host_id.first.to_i)
event.set('process_id',process_id.first.to_i)
end

if @parse_method == 'flatten'
Expand All @@ -265,21 +293,26 @@ def run(queue)
# Check for an integer
@logger.debug("key: #{k.to_s} value: #{v.to_s}")
if v.is_a? Numeric
event[k.to_s] = v
event.set(k.to_s,v)
elsif v.is_a? Time
event.set(k.to_s,v.iso8601)

elsif v.is_a? String
if v == "NaN"
event[k.to_s] = Float::NAN
event.set(k.to_s, Float::NAN)
elsif /\A[-+]?\d+[.][\d]+\z/ == v
event[k.to_s] = v.to_f
event.set(k.to_s, v.to_f)
elsif (/\A[-+]?\d+\z/ === v) || (v.is_a? Integer)
event[k.to_s] = v.to_i
event.set(k.to_s, v.to_i)
else
event[k.to_s] = v
event.set(k.to_s, v)
end
else
event[k.to_s] = v.to_s unless k.to_s == "_id" || k.to_s == "tags"
if k.to_s == "_id" || k.to_s == "tags"
event.set(k.to_s, v.to_s )
end
if (k.to_s == "tags") && (v.is_a? Array)
event['tags'] = v
event.set('tags',v)
end
end
end
Expand All @@ -292,44 +325,52 @@ def run(queue)
if (@dig_dig_fields.include? kk) && (vv.respond_to? :each)
vv.each do |kkk, vvv|
if /\A[-+]?\d+\z/ === vvv
event["#{k}_#{kk}_#{kkk}"] = vvv.to_i
event.set("#{k}_#{kk}_#{kkk}",vvv.to_i)
else
event["#{k}_#{kk}_#{kkk}"] = vvv.to_s
event.set("#{k}_#{kk}_#{kkk}", vvv.to_s)
end
end
else
if /\A[-+]?\d+\z/ === vv
event["#{k}_#{kk}"] = vv.to_i
event.set("#{k}_#{kk}", vv.to_i)
else
event["#{k}_#{kk}"] = vv.to_s
event.set("#{k}_#{kk}",vv.to_s)
end
end
end
else
if /\A[-+]?\d+\z/ === v
event[k] = v.to_i
event.set(k,v.to_i)
else
event[k] = v.to_s
event.set(k,v.to_s)
end
end
end
end
elsif @parse_method == 'simple'
doc.each do |k, v|
if v.is_a? Numeric
event[k] = v.abs
event.set(k, v.abs)
elsif v.is_a? Array
event[k] = v
event.set(k, v)
elsif v == "NaN"
event[k] = Float::NAN
event.set(k, Float::NAN)
else
event[k] = v.to_s
event.set(k, v.to_s)
end
end
end

queue << event
@collection_data[index][:last_id] = doc['_id'].to_s

since_id = doc[since_column]
if since_type == 'id'
since_id = doc[since_column].to_s
elsif since_type == 'time'
since_id = doc[since_column].to_i
end

@collection_data[index][:last_id] = since_id
end
# Store the last-seen doc in the database
update_placeholder(@sqlitedb, since_table, collection_name, @collection_data[index][:last_id])
Expand Down
4 changes: 2 additions & 2 deletions logstash-input-mongodb.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-input-mongodb'
s.version = '0.3.3'
s.version = '0.4.0'
s.licenses = ['Apache License (2.0)']
s.summary = "This takes entries from mongodb as an input to logstash."
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
Expand Down Expand Up @@ -28,7 +28,7 @@ Gem::Specification.new do |s|
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "input" }

# Gem dependencies
s.add_runtime_dependency 'logstash-core', ">= 2.0.0.beta2", "< 3.0.0"
s.add_runtime_dependency 'logstash-core', ">= 5.0"
s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency 'stud'
s.add_runtime_dependency 'jdbc-sqlite3', '3.8.10.1'
Expand Down

0 comments on commit 2f21016

Please sign in to comment.