Skip to content

Commit

Permalink
add since type and column.
Browse files Browse the repository at this point in the history
Signed-off-by: zqzjz0911 <[email protected]>
  • Loading branch information
zqzjz0911 committed May 7, 2016
1 parent cd8917a commit c4d5704
Showing 1 changed file with 36 additions and 5 deletions.
41 changes: 36 additions & 5 deletions lib/logstash/inputs/mongodb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ class LogStash::Inputs::MongoDB < LogStash::Inputs::Base

config :since_table, :validate => :string, :default => "logstash_since"

# This allows you to select the column you would like compare the since info
config :since_column, :validate => :string, :default => "_id"

# This allows you to select the type of since info, like "id", "date"
config :since_type, :validate => :string, :default => "id"

# The collection to use. Is turned into a regex so 'events' will match 'events_20150227'
# Example collection: events_20150227 or events_
config :collection, :validate => :string, :required => true
Expand Down Expand Up @@ -85,9 +91,11 @@ def init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
@logger.debug("init placeholder for #{since_table}_#{mongo_collection_name}")
since = sqlitedb[SINCE_TABLE]
mongo_collection = mongodb.collection(mongo_collection_name)
first_entry = mongo_collection.find({}).sort('_id' => 1).limit(1).first
first_entry_id = first_entry['_id'].to_s

first_entry = mongo_collection.find({}).sort(since_column => 1).limit(1).first
first_entry_id = first_entry[since_column].to_s
since.insert(:table => "#{since_table}_#{mongo_collection_name}", :place => first_entry_id)
@logger.info("init placeholder for #{since_table}_#{mongo_collection_name}: #{first_entry}")
return first_entry_id
end

Expand Down Expand Up @@ -134,7 +142,13 @@ def get_cursor_for_collection(mongodb, mongo_collection_name, last_id_object, ba
collection = mongodb.collection(mongo_collection_name)
# Need to make this sort by date in object id then get the first of the series
# db.events_20150320.find().limit(1).sort({ts:1})
return collection.find({:_id => {:$gt => last_id_object}}).limit(batch_size)

av = {}
if last_id_object != ''
av = {since_column => {:$gt => last_id_object}}
end

return collection.find(av).sort(since_column => 1).limit(batch_size)
end

public
Expand Down Expand Up @@ -224,7 +238,16 @@ def run(queue)
last_id = @collection_data[index][:last_id]
#@logger.debug("last_id is #{last_id}", :index => index, :collection => collection_name)
# get batch of events starting at the last_place if it is set
last_id_object = BSON::ObjectId(last_id)


last_id_object = last_id
if since_type == 'id'
last_id_object = BSON::ObjectId(last_id)
elsif since_type == 'time'
if last_id != ''
last_id_object = Time.at(last_id)
end
end
cursor = get_cursor_for_collection(@mongodb, collection_name, last_id_object, batch_size)
cursor.each do |doc|
logdate = DateTime.parse(doc['_id'].generation_time.to_s)
Expand Down Expand Up @@ -331,7 +354,15 @@ def run(queue)
end

queue << event
@collection_data[index][:last_id] = doc['_id'].to_s

since_id = doc[since_column]
if since_type == 'id'
since_id = doc[since_column].to_s
elsif since_type == 'time'
since_id = doc[since_column].to_i
end

@collection_data[index][:last_id] = since_id
end
# Store the last-seen doc in the database
update_placeholder(@sqlitedb, since_table, collection_name, @collection_data[index][:last_id])
Expand Down

0 comments on commit c4d5704

Please sign in to comment.