Skip to content

Commit

Permalink
fix bugs that stop running job failed sometimes
Browse files Browse the repository at this point in the history
  • Loading branch information
jarviszeng-zjc committed Sep 29, 2020
1 parent 9791305 commit 99e1447
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 29 deletions.
2 changes: 1 addition & 1 deletion python/fate_flow/scheduler/dag_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def schedule_running_job(cls, job, canceled=False):
dsl_parser = schedule_utils.get_job_dsl_parser(dsl=job.f_dsl,
runtime_conf=job.f_runtime_conf,
train_runtime_conf=job.f_train_runtime_conf)
task_scheduling_status_code, tasks = TaskScheduler.schedule(job=job, dsl_parser=dsl_parser)
task_scheduling_status_code, tasks = TaskScheduler.schedule(job=job, dsl_parser=dsl_parser, canceled=canceled)
tasks_status = [task.f_status for task in tasks]
new_job_status = cls.calculate_job_status(task_scheduling_status_code=task_scheduling_status_code, tasks_status=tasks_status)
if new_job_status == JobStatus.WAITING and canceled:
Expand Down
45 changes: 23 additions & 22 deletions python/fate_flow/scheduler/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

class TaskScheduler(object):
@classmethod
def schedule(cls, job, dsl_parser):
def schedule(cls, job, dsl_parser, canceled=False):
schedule_logger(job_id=job.f_job_id).info("scheduling job {} tasks".format(job.f_job_id))
initiator_tasks_group = JobSaver.get_tasks_asc(job_id=job.f_job_id, role=job.f_role, party_id=job.f_party_id)
waiting_tasks = []
Expand All @@ -48,27 +48,28 @@ def schedule(cls, job, dsl_parser):
FederatedScheduler.stop_task(job=job, task=initiator_task, stop_status=initiator_task.f_status)

scheduling_status_code = SchedulingStatusCode.NO_NEXT
for waiting_task in waiting_tasks:
for component in dsl_parser.get_upstream_dependent_components(component_name=waiting_task.f_component_name):
dependent_task = initiator_tasks_group[
JobSaver.task_key(task_id=job_utils.generate_task_id(job_id=job.f_job_id, component_name=component.get_name()),
role=job.f_role,
party_id=job.f_party_id
)
]
if dependent_task.f_status != TaskStatus.COMPLETE:
# can not start task
break
else:
# can start task
scheduling_status_code = SchedulingStatusCode.HAVE_NEXT
status_code = cls.start_task(job=job, task=waiting_task)
if status_code == SchedulingStatusCode.NO_RESOURCE:
# Wait for the next round of scheduling
break
elif status_code == SchedulingStatusCode.FAILED:
scheduling_status_code = SchedulingStatusCode.FAILED
break
if not canceled:
for waiting_task in waiting_tasks:
for component in dsl_parser.get_upstream_dependent_components(component_name=waiting_task.f_component_name):
dependent_task = initiator_tasks_group[
JobSaver.task_key(task_id=job_utils.generate_task_id(job_id=job.f_job_id, component_name=component.get_name()),
role=job.f_role,
party_id=job.f_party_id
)
]
if dependent_task.f_status != TaskStatus.COMPLETE:
# can not start task
break
else:
# can start task
scheduling_status_code = SchedulingStatusCode.HAVE_NEXT
status_code = cls.start_task(job=job, task=waiting_task)
if status_code == SchedulingStatusCode.NO_RESOURCE:
# Wait for the next round of scheduling
break
elif status_code == SchedulingStatusCode.FAILED:
scheduling_status_code = SchedulingStatusCode.FAILED
break
schedule_logger(job_id=job.f_job_id).info("finish scheduling job {} tasks".format(job.f_job_id))
return scheduling_status_code, initiator_tasks_group.values()

Expand Down
12 changes: 6 additions & 6 deletions python/fate_flow/utils/job_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,15 +337,15 @@ def is_task_executor_process(task: Task, process: psutil.Process):
def kill_task_executor_process(task: Task, only_child=False):
try:
if not task.f_run_pid:
schedule_logger(task.f_job_id).info("job {} task {} {} {} no process pid".format(
task.f_job_id, task.f_task_id, task.f_role, task.f_party_id))
schedule_logger(task.f_job_id).info("job {} task {} {} {} with {} party status no process pid".format(
task.f_job_id, task.f_task_id, task.f_role, task.f_party_id, task.f_party_status))
return KillProcessStatusCode.NOT_FOUND
pid = int(task.f_run_pid)
schedule_logger(task.f_job_id).info("try to stop job {} task {} {} {} process pid:{}".format(
task.f_job_id, task.f_task_id, task.f_role, task.f_party_id, pid))
schedule_logger(task.f_job_id).info("try to stop job {} task {} {} {} with {} party status process pid:{}".format(
task.f_job_id, task.f_task_id, task.f_role, task.f_party_id, task.f_party_status, pid))
if not check_job_process(pid):
schedule_logger(task.f_job_id).info("can not found job {} task {} {} {} process pid:{}".format(
task.f_job_id, task.f_task_id, task.f_role, task.f_party_id, pid))
schedule_logger(task.f_job_id).info("can not found job {} task {} {} {} with {} party status process pid:{}".format(
task.f_job_id, task.f_task_id, task.f_role, task.f_party_id, task.f_party_status, pid))
return KillProcessStatusCode.NOT_FOUND
p = psutil.Process(int(pid))
if not is_task_executor_process(task=task, process=p):
Expand Down

0 comments on commit 99e1447

Please sign in to comment.