Skip to content

Commit

Permalink
Show error log of failed trial jobs for integration tests (microsoft#…
Browse files Browse the repository at this point in the history
…1602)

* show failed job log
  • Loading branch information
chicm-ms authored Nov 5, 2019
1 parent 86f8c2a commit 52b93d0
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/nni_manager/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,4 +510,4 @@ function unixPathJoin(...paths: any[]): string {

export {countFilesRecursively, validateFileNameRecursively, getRemoteTmpDir, generateParamFileName, getMsgDispatcherCommand, getCheckpointDir,
getLogDir, getExperimentRootDir, getJobCancelStatus, getDefaultDatabaseDir, getIPV4Address, unixPathJoin,
mkDirP, delay, prepareUnitTest, parseArg, cleanupUnitTest, uniqueString, randomSelect, getLogLevel, getVersion, getCmdPy, getTunerProc, isAlive, killPid, getNewLine };
mkDirP, mkDirPSync, delay, prepareUnitTest, parseArg, cleanupUnitTest, uniqueString, randomSelect, getLogLevel, getVersion, getCmdPy, getTunerProc, isAlive, killPid, getNewLine };
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import { String } from 'typescript-string-operations';
import * as component from '../../common/component';
import { getBasePort, getExperimentId } from '../../common/experimentStartupInfo';
import { RestServer } from '../../common/restServer';
import { getLogDir } from '../../common/utils';
import { getExperimentRootDir, mkDirPSync } from '../../common/utils';

/**
* Cluster Job Training service Rest server, provides rest API to support Cluster job metrics update
Expand Down Expand Up @@ -146,7 +146,9 @@ export abstract class ClusterJobRestServer extends RestServer {
this.errorMessage = `Version check failed, didn't get version check response from trialKeeper,`
+ ` please check your NNI version in NNIManager and TrialKeeper!`;
}
const trialLogPath: string = path.join(getLogDir(), `trial_${req.params.trialId}.log`);
const trialLogDir: string = path.join(getExperimentRootDir(), 'trials', req.params.trialId);
mkDirPSync(trialLogDir);
const trialLogPath: string = path.join(trialLogDir, 'stdout_log_collection.log');
try {
let skipLogging: boolean = false;
if (req.body.tag === 'trial' && req.body.msg !== undefined) {
Expand Down
24 changes: 12 additions & 12 deletions test/config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
import subprocess
import time
import traceback
import json

from utils import setup_experiment, get_experiment_status, get_yml_content, dump_yml_content, \
parse_max_duration_time, get_succeeded_trial_num, print_stderr, deep_update
parse_max_duration_time, get_succeeded_trial_num, deep_update, print_failed_job_log, get_failed_trial_jobs
from utils import GREEN, RED, CLEAR, STATUS_URL, TRIAL_JOBS_URL

def gen_new_config(config_file, training_service='local'):
Expand All @@ -37,18 +38,18 @@ def gen_new_config(config_file, training_service='local'):
config = get_yml_content(config_file)
new_config_file = config_file + '.tmp'

ts = get_yml_content('training_service.yml')[training_service]
print(ts)
it_config = get_yml_content('training_service.yml')

# hack for kubeflow trial config
if training_service == 'kubeflow':
ts['trial']['worker']['command'] = config['trial']['command']
it_config[training_service]['trial']['worker']['command'] = config['trial']['command']
config['trial'].pop('command')
if 'gpuNum' in config['trial']:
config['trial'].pop('gpuNum')

deep_update(config, ts)
print(config)
deep_update(config, it_config['all'])
deep_update(config, it_config[training_service])

dump_yml_content(new_config_file, config)

return new_config_file, config
Expand All @@ -57,6 +58,7 @@ def run_test(config_file, training_service, local_gpu=False):
'''run test per configuration file'''

new_config_file, config = gen_new_config(config_file, training_service)
print(json.dumps(config, sort_keys=True, indent=4))

if training_service == 'local' and not local_gpu and config['trial']['gpuNum'] > 0:
print('no gpu, skiping: ', config_file)
Expand All @@ -72,14 +74,12 @@ def run_test(config_file, training_service, local_gpu=False):
for _ in range(0, max_duration+30, sleep_interval):
time.sleep(sleep_interval)
status = get_experiment_status(STATUS_URL)
if status == 'DONE':
num_succeeded = get_succeeded_trial_num(TRIAL_JOBS_URL)
if training_service == 'local':
print_stderr(TRIAL_JOBS_URL)
assert num_succeeded == max_trial_num, 'only %d succeeded trial jobs, there should be %d' % (num_succeeded, max_trial_num)
if status in ['DONE', 'ERROR'] or get_failed_trial_jobs(TRIAL_JOBS_URL):
break

assert status == 'DONE', 'Failed to finish in maxExecDuration'
print_failed_job_log(config['trainingServicePlatform'], TRIAL_JOBS_URL)
if status != 'DONE' or get_succeeded_trial_num(TRIAL_JOBS_URL) < max_trial_num:
raise AssertionError('Failed to finish in maxExecDuration')
finally:
if os.path.exists(new_config_file):
os.remove(new_config_file)
Expand Down
4 changes: 2 additions & 2 deletions test/metrics_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import json
import requests

from utils import get_experiment_status, get_yml_content, parse_max_duration_time, get_succeeded_trial_num, print_stderr
from utils import get_experiment_status, get_yml_content, parse_max_duration_time, get_succeeded_trial_num, print_failed_job_log
from utils import GREEN, RED, CLEAR, STATUS_URL, TRIAL_JOBS_URL, METRICS_URL

def run_test():
Expand All @@ -49,7 +49,7 @@ def run_test():
#print('experiment status:', status)
if status == 'DONE':
num_succeeded = get_succeeded_trial_num(TRIAL_JOBS_URL)
print_stderr(TRIAL_JOBS_URL)
print_failed_job_log('local', TRIAL_JOBS_URL)
if sys.platform == "win32":
time.sleep(sleep_interval) # Windows seems to have some issues on updating in time
assert num_succeeded == max_trial_num, 'only %d succeeded trial jobs, there should be %d' % (num_succeeded, max_trial_num)
Expand Down
3 changes: 3 additions & 0 deletions test/training_service.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
all:
logCollection: http

kubeflow:
maxExecDuration: 15m
nniManagerIp:
Expand Down
42 changes: 30 additions & 12 deletions test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,18 @@ def get_experiment_id(experiment_url):
experiment_id = requests.get(experiment_url).json()['id']
return experiment_id

def get_nni_log_path(experiment_url):
'''get nni's log path from nni's experiment url'''
def get_experiment_dir(experiment_url):
'''get experiment root directory'''
experiment_id = get_experiment_id(experiment_url)
experiment_path = os.path.join(os.path.expanduser('~'), 'nni', 'experiments', experiment_id)
nnimanager_log_path = os.path.join(experiment_path, 'log', 'nnimanager.log')
return os.path.join(os.path.expanduser('~'), 'nni', 'experiments', experiment_id)

return nnimanager_log_path
def get_nni_log_dir(experiment_url):
'''get nni's log directory from nni's experiment url'''
return os.path.join(get_experiment_dir(experiment_url), 'log')

def get_nni_log_path(experiment_url):
'''get nni's log path from nni's experiment url'''
return os.path.join(get_nni_log_dir(experiment_url), 'nnimanager.log')

def is_experiment_done(nnimanager_log_path):
'''check if the experiment is done successfully'''
Expand All @@ -104,25 +109,38 @@ def get_experiment_status(status_url):

def get_succeeded_trial_num(trial_jobs_url):
trial_jobs = requests.get(trial_jobs_url).json()
print(trial_jobs)
num_succeed = 0
for trial_job in trial_jobs:
if trial_job['status'] in ['SUCCEEDED', 'EARLY_STOPPED']:
num_succeed += 1
print('num_succeed:', num_succeed)
return num_succeed

def print_stderr(trial_jobs_url):
def get_failed_trial_jobs(trial_jobs_url):
'''Return failed trial jobs'''
trial_jobs = requests.get(trial_jobs_url).json()
failed_jobs = []
for trial_job in trial_jobs:
if trial_job['status'] in ['FAILED']:
failed_jobs.append(trial_job)
return failed_jobs

def print_failed_job_log(training_service, trial_jobs_url):
'''Print job log of FAILED trial jobs'''
trial_jobs = get_failed_trial_jobs(trial_jobs_url)
for trial_job in trial_jobs:
if trial_job['status'] == 'FAILED':
if training_service == 'local':
if sys.platform == "win32":
p = trial_job['stderrPath'].split(':')
stderr_path = ':'.join([p[-2], p[-1]])
subprocess.run(['type', stderr_path], shell=True)
log_filename = ':'.join([p[-2], p[-1]])
else:
stderr_path = trial_job['stderrPath'].split(':')[-1]
subprocess.run(['cat', stderr_path])
log_filename = trial_job['stderrPath'].split(':')[-1]
else:
log_filename = os.path.join(get_experiment_dir(EXPERIMENT_URL), 'trials', trial_job['id'], 'stdout_log_collection.log')
with open(log_filename, 'r') as f:
log_content = f.read()
print(log_filename, flush=True)
print(log_content, flush=True)

def parse_max_duration_time(max_exec_duration):
unit = max_exec_duration[-1]
Expand Down

0 comments on commit 52b93d0

Please sign in to comment.