Skip to content

Commit

Permalink
modify queue
Browse files Browse the repository at this point in the history
Signed-off-by: wzh <[email protected]>
  • Loading branch information
zhihuiwan committed May 15, 2020
1 parent 8906838 commit bb07ec1
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 155 deletions.
4 changes: 2 additions & 2 deletions fate_flow/apps/job_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from arch.api.utils.core_utils import json_loads
from fate_flow.driver.job_controller import JobController
from fate_flow.driver.task_scheduler import TaskScheduler
from fate_flow.manager.data_manager import query_data_view
from fate_flow.manager import data_manager
from fate_flow.settings import stat_logger, CLUSTER_STANDALONE_JOB_SERVER_PORT
from fate_flow.utils import job_utils, detect_utils
from fate_flow.utils.api_utils import get_json_result, request_execute_server
Expand Down Expand Up @@ -140,7 +140,7 @@ def query_task():

@manager.route('/data/view/query', methods=['POST'])
def query_data_view():
data_views = job_utils.query_data_view(**request.json)
data_views = data_manager.query_data_view(**request.json)
if not data_views:
return get_json_result(retcode=101, retmsg='find data view failed')
return get_json_result(retcode=0, retmsg='success', data=[data_view.to_json() for data_view in data_views])
Expand Down
9 changes: 9 additions & 0 deletions fate_flow/apps/schedule_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ def job_status(job_id, role, party_id):
return get_json_result(retcode=0, retmsg='success')


@manager.route('/<job_id>/<role>/<party_id>/check', methods=['POST'])
def job_check(job_id, role, party_id):
status = JobController.check_job_run(role=role, event=request.json)
if status:
return get_json_result(retcode=0, retmsg='success')
else:
return get_json_result(retcode=101, retmsg='The job running on the host side exceeds the maximum running amount')


@manager.route('/<job_id>/<role>/<party_id>/<model_id>/<model_version>/save/pipeline', methods=['POST'])
@request_authority_certification
def save_pipeline(job_id, role, party_id, model_id, model_version):
Expand Down
29 changes: 24 additions & 5 deletions fate_flow/driver/dag_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,27 @@ def run(self):
schedule_logger().error('queue is not ready')
return False
all_jobs = []

put_events(self.queue)

while True:
try:
if len(all_jobs) == self.concurrent_num:
for future in as_completed(all_jobs):
all_jobs.remove(future)
break
result = future.result()
if isinstance(result, dict):
self.queue.put_event(result, status=3, job_id=result['job_id'])
job_event = self.queue.get_event()
schedule_logger(job_event['job_id']).info('schedule job {}'.format(job_event))
future = self.job_executor_pool.submit(DAGScheduler.handle_event, job_event)
future.add_done_callback(DAGScheduler.get_result)
all_jobs.append(future)
status = TaskScheduler.check(job_event['job_id'], job_event['initiator_role'], job_event['initiator_party_id'])
if not status:
schedule_logger(job_event['job_id']).info('host is busy, job {} into waiting......'.format(job_event['job_id']))
self.queue.put_event(job_event, status=3, job_id=job_event['job_id'])
else:
schedule_logger(job_event['job_id']).info('schedule job {}'.format(job_event))
future = self.job_executor_pool.submit(DAGScheduler.handle_event, job_event)
future.add_done_callback(DAGScheduler.get_result)
all_jobs.append(future)
except Exception as e:
schedule_logger().exception(e)

Expand All @@ -63,3 +73,12 @@ def handle_event(job_event):
@staticmethod
def get_result(future):
future.result()


def put_events(queue):
n = queue.qsize(status=3)
for i in range(n):
event = queue.get_event(status=3)
queue.put_event(event, job_id=event['job_id'], status=1)
t = threading.Timer(120, put_events, [queue])
t.start()
5 changes: 4 additions & 1 deletion fate_flow/driver/job_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ def update_job_status(job_id, role, party_id, job_info, create=False):
job_tracker = Tracking(job_id=job_id, role=role, party_id=party_id,
model_id=job_parameters["model_id"],
model_version=job_parameters["model_version"])
job_tracker.job_quantity_constraint()
if job_parameters.get("job_type", "") != "predict":
job_tracker.init_pipelined_model()
roles = json_loads(job_info['f_roles'])
Expand Down Expand Up @@ -253,6 +252,10 @@ def clean_job(job_id, role, party_id, roles, party_ids):
schedule_logger(job_id).exception(e)
schedule_logger(job_id).info('job {} on {} {} clean done'.format(job_id, role, party_id))

@staticmethod
def check_job_run(role, event):
return job_utils.job_quantity_constraint(role, event)

@staticmethod
def cancel_job(job_id, role, party_id, job_initiator):
schedule_logger(job_id).info('{} {} get cancel waiting job {} command'.format(role, party_id, job_id))
Expand Down
55 changes: 53 additions & 2 deletions fate_flow/driver/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,74 @@ def distribute_job(job, roles, job_initiator):
json_body=job.to_json(),
work_mode=job.f_work_mode)
if response_json["retcode"]:
job.f_status = JobStatus.FAILED
job_info = {'f_status': JobStatus.FAILED}
TaskScheduler.sync_job_status(job_id=job.f_job_id, roles=roles,
work_mode=job.f_work_mode,
initiator_party_id=job_initiator['party_id'],
initiator_role=job_initiator['role'],
job_info=job.to_json())
job_info=job_info)
raise Exception(
"an error occurred while creating the job: role {} party_id {}".format(role, party_id)
+ "\n" + str(response_json["retmsg"]))

@staticmethod
def check(job_id, initiator_role, initiator_party_id):
job_dsl, job_runtime_conf, train_runtime_conf = job_utils.get_job_configuration(job_id=job_id,
role=initiator_role,
party_id=initiator_party_id)
job_parameters = job_runtime_conf.get('job_parameters', {})
job_initiator = job_runtime_conf.get('initiator', {})
status = TaskScheduler.check_job_run(job_id=job_id, roles=job_runtime_conf['role'],
work_mode=job_parameters['work_mode'],
initiator_party_id=job_initiator['party_id'],
initiator_role=job_initiator['role'],
job_info={
'job_id': job_id,
'initiator_role': initiator_role,
'initiator_party_id': initiator_party_id
})
return status

@staticmethod
def check_job_run(job_id, roles, work_mode, initiator_party_id, initiator_role, job_info):
for role, partys in roles.items():
job_info['f_role'] = role
for party_id in partys:
job_info['f_party_id'] = party_id
response = federated_api(job_id=job_id,
method='POST',
endpoint='/{}/schedule/{}/{}/{}/check'.format(
API_VERSION,
job_id,
role,
party_id),
src_party_id=initiator_party_id,
dest_party_id=party_id,
src_role=initiator_role,
json_body=job_info,
work_mode=work_mode)
if response['retcode'] == 101:
return False
return True

@staticmethod
def run_job(job_id, initiator_role, initiator_party_id):
job_dsl, job_runtime_conf, train_runtime_conf = job_utils.get_job_configuration(job_id=job_id,
role=initiator_role,
party_id=initiator_party_id)
job_parameters = job_runtime_conf.get('job_parameters', {})
job_initiator = job_runtime_conf.get('initiator', {})
status = TaskScheduler.check_job_run(job_id=job_id, roles=job_runtime_conf['role'],
work_mode=job_parameters['work_mode'],
initiator_party_id=job_initiator['party_id'],
initiator_role=job_initiator['role'],
job_info={
'job_id': job_id,
'initiator_role': initiator_role,
'initiator_party_id': initiator_party_id
})
if not status:
return -1
dag = get_job_dsl_parser(dsl=job_dsl,
runtime_conf=job_runtime_conf,
train_runtime_conf=train_runtime_conf)
Expand Down
Loading

0 comments on commit bb07ec1

Please sign in to comment.