Skip to content

Commit

Permalink
fate_flow: improve submit job on queue failed exception
Browse files Browse the repository at this point in the history
  • Loading branch information
jarviszeng-zjc committed Sep 29, 2019
1 parent bd25fac commit 983887b
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 11 deletions.
5 changes: 4 additions & 1 deletion fate_flow/driver/job_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ def submit_job(job_data):

# push into queue
job_event = job_utils.job_event(job_id, initiator_role, initiator_party_id)
RuntimeConfig.JOB_QUEUE.put_event(job_event)
try:
RuntimeConfig.JOB_QUEUE.put_event(job_event)
except Exception as e:
raise Exception('push job into queue failed')

schedule_logger(job_id).info(
'submit job successfully, job id is {}, model id is {}'.format(job.f_job_id, job_parameters['model_id']))
Expand Down
22 changes: 12 additions & 10 deletions fate_flow/manager/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ def get_event(self):
stat_logger.info('get event from redis queue: {}'.format(event))
return event
except Exception as e:
stat_logger.exception(e)
stat_logger.error('get event from redis queue failed')
stat_logger.exception(e)
return None

def is_ready(self):
Expand All @@ -75,19 +75,20 @@ def put_event(self, event):
ret = conn.lpush(self.queue_name, json.dumps(event))
stat_logger.info('put event into redis queue {}: {}'.format('successfully' if ret else 'failed', event))
except Exception as e:
stat_logger.exception(e)
stat_logger.error('put event into redis queue failed')
stat_logger.exception(e)
raise e

def del_event(self, event):
try:
conn = self.get_conn()
ret = conn.lrem(self.queue_name, 1, json.dumps(event))
stat_logger.info('delete event from redis queue {}: {}'.format('successfully' if ret else 'failed', event))
if not ret:
raise Exception('delete event from redis queue failed:job not in redis queue')
raise Exception('job not in redis queue')
except Exception as e:
stat_logger.exception(e)
stat_logger.error('delete event from redis queue failed')
stat_logger.exception(e)
raise Exception('delete event from redis queue failed')

def parse_event(self, content):
Expand Down Expand Up @@ -120,17 +121,18 @@ def put_event(self, event):
self.queue.put(event)
stat_logger.info('put event into in-process queue successfully: {}'.format(event))
except Exception as e:
stat_logger.exception(e)
stat_logger.error('put event into in-process queue failed')
stat_logger.exception(e)
raise e

def get_event(self):
try:
event = self.queue.get(block=True)
stat_logger.info('get event from in-process queue successfully: {}'.format(event))
return event
except Exception as e:
stat_logger.exception(e)
stat_logger.error('get event from in-process queue failed')
stat_logger.exception(e)
return None

def qsize(self):
Expand All @@ -154,29 +156,29 @@ def put_event(self, event):
self.put(event)
stat_logger.info('put event into in-process queue successfully: {}'.format(event))
except Exception as e:
stat_logger.exception(e)
stat_logger.error('put event into in-process queue failed')
stat_logger.exception(e)
raise e

def get_event(self):
try:
event = self.get(block=True)
stat_logger.info('get event from in-process queue successfully: {}'.format(event))
return event
except Exception as e:
stat_logger.exception(e)
stat_logger.error('get event from in-process queue failed')
stat_logger.exception(e)
return None

def del_event(self, event):
try:
ret = self.dell(event)
stat_logger.info('delete event from redis queue {}: {}'.format('successfully' if ret else 'failed', event))
except Exception as e:
stat_logger.exception(e)
stat_logger.error('delete event from queue failed')
stat_logger.exception(e)
raise Exception('{} not in ListQueue'.format(event))


def dell(self, event):
with self.not_empty:
if event in self.queue:
Expand Down

0 comments on commit 983887b

Please sign in to comment.