Skip to content

Commit

Permalink
add task time log and modify sucess status
Browse files Browse the repository at this point in the history
Signed-off-by: wzh <[email protected]>
  • Loading branch information
zhihuiwan committed Jan 16, 2020
1 parent 39a84c9 commit 1eabe93
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 12 deletions.
7 changes: 7 additions & 0 deletions arch/api/utils/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ def current_timestamp():
return int(time.time() * 1000)


def timestamp_to_date(timestamp, format_string="%Y-%m-%d %H:%M:%S"):
timestamp = int(timestamp) / 1000
time_array = time.localtime(timestamp)
str_date = time.strftime(format_string, time_array)
return str_date


def base64_encode(src):
return bytes_to_string(base64.b64encode(src.encode("utf-8")))

Expand Down
2 changes: 1 addition & 1 deletion fate_flow/driver/job_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def kill_job(job_id, role, party_id, job_initiator, timeout=False, component_nam
'success' if kill_status else 'failed'))
status = TaskStatus.FAILED if not timeout else TaskStatus.TIMEOUT

if task.f_status != TaskStatus.SUCCESS:
if task.f_status != TaskStatus.COMPLETE:
task.f_status = status
try:
TaskExecutor.sync_task_status(job_id=job_id, component_name=task.f_component_name, task_id=task.f_task_id,
Expand Down
8 changes: 5 additions & 3 deletions fate_flow/driver/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
import argparse
import importlib
import os
import time
import traceback

from arch.api import federation
from arch.api import session
from arch.api.utils import file_utils, log_utils
from arch.api.utils.core import current_timestamp, get_lan_ip
from arch.api.utils.core import current_timestamp, get_lan_ip, timestamp_to_date
from arch.api.utils.log_utils import schedule_logger
from fate_flow.db.db_models import Task
from fate_flow.entity.runtime_config import RuntimeConfig
Expand Down Expand Up @@ -123,7 +122,7 @@ def run_task():
output_model = run_object.export_model()
# There is only one model output at the current dsl version.
tracker.save_output_model(output_model, task_output_dsl['model'][0] if task_output_dsl.get('model') else 'default')
task.f_status = TaskStatus.SUCCESS
task.f_status = TaskStatus.COMPLETE
except Exception as e:
task.f_status = TaskStatus.FAILED
kill_path = os.path.join(job_utils.get_job_directory(job_id), str(role), str(party_id), component_name,
Expand All @@ -146,6 +145,9 @@ def run_task():
except Exception as e:
traceback.print_exc()
schedule_logger().exception(e)
schedule_logger().info('task {} {} {} start time: {}'.format(task_id, role, party_id, timestamp_to_date(task.f_start_time)))
schedule_logger().info('task {} {} {} end time: {}'.format(task_id, role, party_id, timestamp_to_date(task.f_end_time)))
schedule_logger().info('task {} {} {} takes {}s'.format(task_id, role, party_id, int(task.f_elapsed)/1000))
schedule_logger().info(
'finish {} {} {} {} {} {} task'.format(job_id, component_name, task_id, role, party_id, task.f_status if sync_success else TaskStatus.FAILED))

Expand Down
8 changes: 4 additions & 4 deletions fate_flow/driver/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from fate_flow.utils import job_utils
from fate_flow.utils.api_utils import federated_api
from fate_flow.utils.job_utils import query_task, get_job_dsl_parser, query_job
from fate_flow.entity.constant_config import JobStatus, Backend
from fate_flow.entity.constant_config import JobStatus, Backend, TaskStatus


class TaskScheduler(object):
Expand Down Expand Up @@ -111,12 +111,12 @@ def run_job(job_id, initiator_role, initiator_party_id):
if len(top_level_task_status) == 2:
job.f_status = JobStatus.FAILED
elif True in top_level_task_status:
job.f_status = JobStatus.SUCCESS
job.f_status = JobStatus.COMPLETE
else:
job.f_status = JobStatus.FAILED
job.f_end_time = current_timestamp()
job.f_elapsed = job.f_end_time - job.f_start_time
if job.f_status == JobStatus.SUCCESS:
if job.f_status == JobStatus.COMPLETE:
job.f_progress = 100
job.f_update_time = current_timestamp()
TaskScheduler.sync_job_status(job_id=job_id, roles=job_runtime_conf['role'],
Expand Down Expand Up @@ -266,7 +266,7 @@ def check_task_status(job_id, component, interval=0.25):
return False
if 'timeout' in status_collect:
return None
elif len(status_collect) == 1 and 'success' in status_collect:
elif len(status_collect) == 1 and TaskStatus.COMPLETE in status_collect:
return True
else:
time.sleep(interval)
Expand Down
4 changes: 2 additions & 2 deletions fate_flow/entity/constant_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def is_spark(self):
class JobStatus(object):
WAITING = 'waiting'
RUNNING = 'running'
SUCCESS = 'success'
COMPLETE = 'complete'
FAILED = 'failed'
TIMEOUT = 'timeout'
CANCELED = 'canceled'
Expand All @@ -46,6 +46,6 @@ class JobStatus(object):
class TaskStatus(object):
START = 'start'
RUNNING = 'running'
SUCCESS = 'success'
COMPLETE = 'complete'
FAILED = 'failed'
TIMEOUT = 'timeout'
4 changes: 2 additions & 2 deletions fate_flow/manager/tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def save_job_info(self, role, party_id, job_info, create=False):
job.f_role = role
job.f_party_id = party_id
if 'f_status' in job_info:
if job.f_status in [JobStatus.SUCCESS, JobStatus.FAILED]:
if job.f_status in [JobStatus.COMPLETE, JobStatus.FAILED]:
# Termination status cannot be updated
# TODO:
pass
Expand Down Expand Up @@ -358,7 +358,7 @@ def save_task(self, role, party_id, task_info):
task.f_role = role
task.f_party_id = party_id
if 'f_status' in task_info:
if task.f_status in [TaskStatus.SUCCESS, TaskStatus.FAILED]:
if task.f_status in [TaskStatus.COMPLETE, TaskStatus.FAILED]:
# Termination status cannot be updated
# TODO:
pass
Expand Down
1 change: 1 addition & 0 deletions fate_flow/utils/job_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from fate_flow.utils import api_utils
from flask import request, redirect, url_for

from fate_flow.utils.session_utils import SessionStop

class IdCounter:
_lock = threading.RLock()
Expand Down

0 comments on commit 1eabe93

Please sign in to comment.