Skip to content

Commit

Permalink
working on threaded poller
Browse files Browse the repository at this point in the history
  • Loading branch information
kookster committed Jun 27, 2012
1 parent cdd571e commit 2ef6fae
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 52 deletions.
6 changes: 4 additions & 2 deletions lib/activemessaging/adapters/asqs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def unreceive message, headers={}
def create_queue(name)
validate_new_queue name
response = make_request('CreateQueue', nil, {'QueueName'=>name})
add_queue response.get_text("//QueueUrl") unless response.nil?
add_queue(response.get_text("//QueueUrl")) unless response.nil?
end

def delete_queue queue
Expand Down Expand Up @@ -301,7 +301,9 @@ def add_queue(url)

def get_or_create_queue queue_name
qs = queues
qs.has_key?(queue_name) ? qs[queue_name] : create_queue(queue_name)
q = qs.has_key?(queue_name) ? qs[queue_name] : create_queue(queue_name)
raise "could not get or create queue: #{queue_name}" unless q
q
end

def queues
Expand Down
153 changes: 105 additions & 48 deletions lib/activemessaging/threaded_poller.rb
Original file line number Diff line number Diff line change
@@ -1,86 +1,137 @@
# still working on prioritized worker requests


# This owes no small debt to sidekiq for showing how to use celluloid for polling for messages.
# https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/manager.rb
# https://github.com/mperham/sidekiq/blob/poller/lib/sidekiq/manager.rb

require 'celluloid'

module ActiveMessaging

class ThreadedPoller

include Celluloid

# traps when any worker dies
trap_exit :died

attr_accessor :receiver, :connection, :pool_size, :workers, :busy, :running

def initialize(connection='default', pool_size=3)
self.pool_size = pool_size
attr_accessor :configuration, :receiver, :connection, :workers, :busy, :running

#
# connection is a string, name of the connection from broker.yml to use for this threaded poller instance
#
# configuration is a list of hashes
# each has describes a group of worker threads
# for each group, define what priorities those workers will process
# [
# {
# :pool_size => 1 # number of workers of this type
# :priorities => [1,2,3] # what message priorities this thread will process
# }
# ]
#
def initialize(connection='default', configuration={})
# default config is a pool size of 3 worker threads
self.configuration = configuration || [{:pool_size => 3}]
self.connection = connection
puts "subscribe"
ActiveMessaging::Gateway.subscribe
end

def start
logger.info "ActiveMessaging::ThreadedPoller start"

# these are workers ready to use
self.workers = []

# these are workers already working
self.busy = []

# this indicates if we are running or not, helps threads to stop gracefully
self.running = true
# create a message receiver actor
puts "ThreadedPoller start"
self.receiver = MessageReceiver.new(current_actor, ActiveMessaging::Gateway.connection(connection))
self.workers = pool_size.times.collect{|i| Worker.new_link(current_actor)}
pool_size.times{ receive }

# subscribe will create the connections based on subscriptions in processsors
# (you can't find or use the connection until it is created by calling this)
ActiveMessaging::Gateway.subscribe

# create a message receiver actor, ony need one, using connection
receiver_connection = ActiveMessaging::Gateway.connection(connection)
self.receiver = MessageReceiver.new(current_actor, receiver_connection)

# start the workers based on the config
configuration.each do |c|
(c[:pool_size] || 1).times{ self.workers << Worker.new_link(current_actor, c) }
end

# once all workers are created, start them up
self.workers.each{|worker| receive(worker)}

# in debug level, log info about workers every 10 seconds
log_status
end

def stop
logger.info "ActiveMessaging::ThreadedPoller stop"
# indicates to all busy workers not to pick up another messages, but does not interrupt
# also indicates to the message receiver to stop getting more messages
self.running = false

# tell each waiting worker to shut down. Running ones will be allowed to finish
workers.each { |w| w.terminate if w.alive? }
end

# recursive method, uses celluloid 'after' to keep calling
def log_status
ActiveMessaging.logger.info("ActiveMessaging::ThreadedPoller: #{connection}, #{pool_size}, #{workers.count}, #{busy.count}, #{running}")
after(5){ log_status }
return unless logger.debug?
logger.debug("ActiveMessaging::ThreadedPoller: conn:#{connection}, #{workers.count}, #{busy.count}, #{running}")
after(10){ log_status }
end

def dispatch(message)
worker = workers.pop
busy << worker
worker.execute!(message)
def receive(worker)
receiver.receive!(worker) if (receiver && running && worker)
end

def receive
receiver.receive! if (receiver && running)
def dispatch(message, worker)
workers.delete(worker)
busy << worker
worker.execute!(message)
end

def executed(worker)
busy.delete(worker)
workers << worker if worker.alive?

if running
receive
workers << worker
receive(worker)
else
worker.terminate if worker.alive?
if busy.empty?
puts "all executed: signal stopped"
signal(:shutdown)
logger.info "all executed: signal stopped"
self.terminate if alive?
end
end
end

def died(worker, reason)
puts "uh oh, #{worker.inspect} died because of #{reason.class}"
logger.info "uh oh, #{worker.inspect} died because of #{reason.class}"
busy.delete(worker)

if running
workers << Processor.new_link(current_actor)
receive
worker = Worker.new_link(current_actor)
workers << worker
receive(worker)
else
puts "check to see if busy is empty: #{busy.inspect}"
logger.info "check to see if busy is empty: #{busy.inspect}"
if busy.empty?
puts "all died: signal stopped"
after(0){ signal(:shutdown) }
logger.info "all died: signal stopped"
after(0){ self.terminate } if alive?
end
end
end

def stopped?
!alive? || (!running && busy.empty?)
end

def logger; ActiveMessaging.logger; end

end

Expand All @@ -90,45 +141,51 @@ class MessageReceiver
attr_accessor :poller, :connection, :pause

def initialize(poller, connection, pause=1)
puts "MessageReceiver initialize"
self.poller = poller
logger.debug("MessageReceiver initialize: poller:#{poller}, connection:#{connection}, pause:#{pause}")

raise "No connection found for '#{poller.connection}'" unless connection

self.poller = poller
self.connection = connection
self.pause = pause
self.pause = pause
end

def receive
def receive(worker)
return unless poller.running

message = self.connection.receive
message = self.connection.receive(worker.options)

if message
puts "message: #{message.inspect}"
poller.dispatch!(message)
logger.debug("ActiveMessaging::MessageReceiver.receive: message:'#{message.inspect}'")
poller.dispatch!(message, worker)
else
if poller.running
puts "no message, schedule recursive retry"
after(pause) { receive }
else
terminate if alive?
end
self.terminate if !poller.running && alive?
logger.debug("ActiveMessaging::MessageReceiver.receive: no message, retry in #{pause} sec")
after(pause) { receive(worker) }
end

end

def logger; ActiveMessaging.logger; end
end

class Worker
include Celluloid

attr_accessor :master
attr_accessor :poller, :options

def initialize(master)
self.master = master
def initialize(poller, options)
self.poller = poller
self.options = options
end

def execute(message)
ActiveMessaging::Gateway.dispatch(message)
master.executed!(current_actor)
poller.executed!(current_actor)
end

def logger; ActiveMessaging.logger; end

end

end
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ ActiveMessaging.load_activemessaging

# configure the connection (there can be multiple defined in broker.yml) and number of threads
connection_name = 'default'
thread_count = 3
configuration = [{:pool_size => 3}]

# start it up!
begin
trap("TERM", "EXIT")
@poller = ActiveMessaging::ThreadedPoller.new(connection_name, thread_count)
@poller = ActiveMessaging::ThreadedPoller.new(connection_name, configuration)
@poller.start!
sleep
rescue Interrupt
Expand Down

0 comments on commit 2ef6fae

Please sign in to comment.