Skip to content

Commit

Permalink
CLOUDSTACK-3096 format asyncJobMgr
Browse files Browse the repository at this point in the history
  • Loading branch information
DaanHoogland authored and sebgoa committed Jun 23, 2013
1 parent 3fdcf18 commit 34fc209
Showing 1 changed file with 29 additions and 17 deletions.
46 changes: 29 additions & 17 deletions tools/marvin/marvin/asyncJobMgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -24,6 +24,7 @@
import jsonHelper
import datetime


class job(object):
def __init__(self):
self.id = None
Expand All @@ -41,7 +42,8 @@ def __init__(self):
self.responsecls = None

def __str__(self):
return '{%s}' % str(', '.join('%s : %s' % (k, repr(v)) for (k, v) in self.__dict__.iteritems()))
return '{%s}' % str(', '.join('%s : %s' % (k, repr(v)) for (k, v)
in self.__dict__.iteritems()))


class workThread(threading.Thread):
Expand Down Expand Up @@ -82,8 +84,9 @@ def executeCmd(self, job):
result = self.connection.make_request(cmd)
jobstatus.result = result
jobstatus.endTime = datetime.datetime.now()
jobstatus.duration = time.mktime(jobstatus.endTime.timetuple()) - time.mktime(
jobstatus.startTime.timetuple())
jobstatus.duration =\
time.mktime(jobstatus.endTime.timetuple()) - time.mktime(
jobstatus.startTime.timetuple())
else:
result = self.connection.make_request(cmd, None, True)
if result is None:
Expand All @@ -92,8 +95,10 @@ def executeCmd(self, job):
jobId = result.jobid
jobstatus.jobId = jobId
try:
responseName = cmd.__class__.__name__.replace("Cmd", "Response")
jobstatus.responsecls = jsonHelper.getclassFromName(cmd, responseName)
responseName =\
cmd.__class__.__name__.replace("Cmd", "Response")
jobstatus.responsecls =\
jsonHelper.getclassFromName(cmd, responseName)
except:
pass
jobstatus.status = True
Expand Down Expand Up @@ -175,7 +180,8 @@ def updateTimeStamp(self, jobstatus):
jobId = jobstatus.jobId
if jobId is not None and self.db is not None:
result = self.db.execute(
"select job_status, created, last_updated from async_job where id='%s'" % str(jobId))
"select job_status, created, last_updated from async_job where\
id='%s'" % str(jobId))
if result is not None and len(result) > 0:
if result[0][0] == 1:
jobstatus.status = True
Expand All @@ -192,7 +198,8 @@ def waitForComplete(self, workers=10):
resultQueue = Queue.Queue()
'''intermediate result is stored in self.outqueue'''
for i in range(workers):
worker = workThread(self.outqueue, resultQueue, self.apiClient, self.db, lock)
worker = workThread(self.outqueue, resultQueue, self.apiClient,
self.db, lock)
worker.start()

self.outqueue.join()
Expand All @@ -205,20 +212,26 @@ def waitForComplete(self, workers=10):

return asyncJobResult

'''put commands into a queue at first, then start workers numbers threads to execute this commands'''

def submitCmdsAndWait(self, cmds, workers=10):
'''
put commands into a queue at first, then start workers numbers
threads to execute this commands
'''
self.submitCmds(cmds)
lock = threading.Lock()
for i in range(workers):
worker = workThread(self.inqueue, self.outqueue, self.apiClient, self.db, lock)
worker = workThread(self.inqueue, self.outqueue, self.apiClient,
self.db, lock)
worker.start()

return self.waitForComplete(workers)

'''submit one job and execute the same job ntimes, with nums_threads of threads'''

def submitJobExecuteNtimes(self, job, ntimes=1, nums_threads=1, interval=1):
def submitJobExecuteNtimes(self, job, ntimes=1, nums_threads=1,
interval=1):
'''
submit one job and execute the same job ntimes, with nums_threads
of threads
'''
inqueue1 = Queue.Queue()
lock = threading.Condition()
for i in range(ntimes):
Expand All @@ -232,9 +245,8 @@ def submitJobExecuteNtimes(self, job, ntimes=1, nums_threads=1, interval=1):
work.start()
inqueue1.join()

'''submit n jobs, execute them with nums_threads of threads'''

def submitJobs(self, jobs, nums_threads=1, interval=1):
'''submit n jobs, execute them with nums_threads of threads'''
inqueue1 = Queue.Queue()
lock = threading.Condition()

Expand Down

0 comments on commit 34fc209

Please sign in to comment.