Skip to content

Commit

Permalink
Update structure for resuming
Browse files Browse the repository at this point in the history
  • Loading branch information
Derek-Wds committed Mar 16, 2021
1 parent 08b44ed commit 447fed8
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 53 deletions.
20 changes: 14 additions & 6 deletions qlib/workflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,25 @@ def __repr__(self):

@contextmanager
def start(
self, experiment_name: Optional[Text] = None, recorder_name: Optional[Text] = None, uri: Optional[Text] = None
self,
experiment_name: Optional[Text] = None,
recorder_name: Optional[Text] = None,
uri: Optional[Text] = None,
resume: bool = False,
):
"""
Method to start an experiment. This method can only be called within a Python's `with` statement. Here is the example code:
.. code-block:: Python
# start new experimetn and recorder
# start new experiment and recorder
with R.start('test', 'recorder_1'):
model.fit(dataset)
R.log...
... # further operations
# resume previous experiment and recorder
with R.start('test', 'recorder_1'): # if users want to resume recorder, they have to specify the exact same name for experiment and recorder.
with R.start('test', 'recorder_1', resume=True): # if users want to resume recorder, they have to specify the exact same name for experiment and recorder.
... # further operations
Parameters
Expand All @@ -50,16 +54,18 @@ def start(
The default uri is set in the qlib.config. Note that this uri argument will not change the one defined in the config file.
Therefore, the next time when users call this function in the same experiment,
they have to also specify this argument with the same value. Otherwise, inconsistent uri may occur.
resume : bool
whether to resume the specific recorder with given name under the given experiment.
"""
run = self.start_exp(experiment_name, recorder_name, uri)
run = self.start_exp(experiment_name, recorder_name, uri, resume)
try:
yield run
except Exception as e:
self.end_exp(Recorder.STATUS_FA) # end the experiment if something went wrong
raise e
self.end_exp(Recorder.STATUS_FI)

def start_exp(self, experiment_name=None, recorder_name=None, uri=None):
def start_exp(self, experiment_name=None, recorder_name=None, uri=None, resume=False):
"""
Lower level method for starting an experiment. When use this method, one should end the experiment manually
and the status of the recorder may not be handled properly. Here is the example code:
Expand All @@ -80,12 +86,14 @@ def start_exp(self, experiment_name=None, recorder_name=None, uri=None):
uri : str
the tracking uri of the experiment, where all the artifacts/metrics etc. will be stored.
The default uri are set in the qlib.config.
resume : bool
whether to resume the specific recorder with given name under the given experiment.
Returns
-------
An experiment instance being started.
"""
return self.exp_manager.start_exp(experiment_name, recorder_name, uri)
return self.exp_manager.start_exp(experiment_name, recorder_name, uri, resume)

def end_exp(self, recorder_status=Recorder.STATUS_FI):
"""
Expand Down
80 changes: 43 additions & 37 deletions qlib/workflow/exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@ def info(self):
output["recorders"] = list(recorders.keys())
return output

def start(self, recorder_name=None):
def start(self, recorder_name=None, resume=False):
"""
Start the experiment and set it to be active. This method will also start a new recorder.
Parameters
----------
recorder_name : str
the name of the recorder to be created.
resume : bool
whether to resume the first recorder
Returns
-------
Expand Down Expand Up @@ -149,7 +151,35 @@ def get_recorder(self, recorder_id=None, recorder_name=None, create: bool = True
-------
A recorder object.
"""
raise NotImplementedError(f"Please implement the `get_recorder` method.")
# special case of getting the recorder
if recorder_id is None and recorder_name is None:
if self.active_recorder is not None:
return self.active_recorder
recorder_name = self._default_rec_name
if create:
recorder, is_new = self._get_or_create_rec(recorder_id=recorder_id, recorder_name=recorder_name)
else:
recorder, is_new = self._get_recorder(recorder_id=recorder_id, recorder_name=recorder_name), False
if is_new:
self.active_recorder = recorder
# start the recorder
self.active_recorder.start_run()
return recorder

def _get_or_create_rec(self, recorder_id=None, recorder_name=None) -> (object, bool):
"""
Method for getting or creating a recorder. It will try to first get a valid recorder, if exception occurs, it will
automatically create a new recorder based on the given id and name.
"""
try:
if recorder_id is None and recorder_name is None:
recorder_name = self._default_rec_name
return self._get_recorder(recorder_id=recorder_id, recorder_name=recorder_name), False
except ValueError:
if recorder_name is None:
recorder_name = self._default_rec_name
logger.info(f"No valid recorder found. Create a new recorder with name {recorder_name}.")
return self.create_recorder(recorder_name), True

def list_recorders(self):
"""
Expand Down Expand Up @@ -178,10 +208,17 @@ def __init__(self, id, name, uri):
def __repr__(self):
return "{name}(id={id}, info={info})".format(name=self.__class__.__name__, id=self.id, info=self.info)

def start(self, recorder_name=None):
def start(self, recorder_name=None, resume=False):
logger.info(f"Experiment {self.id} starts running ...")
# Get or create recorder
recorder, _ = self._get_or_create_rec(recorder_name=recorder_name)
if recorder_name is None:
recorder_name = self._default_rec_name
# resume the recorder
if resume:
recorder, _ = self._get_or_create_rec(recorder_name=recorder_name)
# create a new recorder
else:
recorder = self.create_recorder(recorder_name)
# Set up active recorder
self.active_recorder = recorder
# Start the recorder
Expand All @@ -201,37 +238,6 @@ def create_recorder(self, recorder_name=None):

return recorder

def get_recorder(self, recorder_id=None, recorder_name=None, create=True):
# special case of getting the recorder
if recorder_id is None and recorder_name is None:
if self.active_recorder is not None:
return self.active_recorder
recorder_name = self._default_rec_name
if create:
recorder, is_new = self._get_or_create_rec(recorder_id=recorder_id, recorder_name=recorder_name)
else:
recorder, is_new = self._get_recorder(recorder_id=recorder_id, recorder_name=recorder_name), False
if is_new:
self.active_recorder = recorder
# start the recorder
self.active_recorder.start_run()
return recorder

def _get_or_create_rec(self, recorder_id=None, recorder_name=None) -> (object, bool):
"""
Method for getting or creating a recorder. It will try to first get a valid recorder, if exception occurs, it will
automatically create a new recorder based on the given id and name.
"""
try:
if recorder_id is None and recorder_name is None:
recorder_name = self._default_rec_name
return self._get_recorder(recorder_id=recorder_id, recorder_name=recorder_name), False
except ValueError:
if recorder_name is None:
recorder_name = self._default_rec_name
logger.info(f"No valid recorder found. Create a new recorder with name {recorder_name}.")
return self.create_recorder(recorder_name), True

def _get_recorder(self, recorder_id=None, recorder_name=None):
"""
Method for getting or creating a recorder. It will try to first get a valid recorder, if exception occurs, it will
Expand All @@ -249,7 +255,7 @@ def _get_recorder(self, recorder_id=None, recorder_name=None):
raise ValueError("No valid recorder has been found, please make sure the input recorder id is correct.")
elif recorder_name is not None:
logger.warning(
f"Please make sure the recorder name {recorder_name} is unique, we will only return the first recorder if there exist several matched the given name."
f"Please make sure the recorder name {recorder_name} is unique, we will only return the latest recorder if there exist several matched the given name."
)
recorders = self.list_recorders()
for rid in recorders:
Expand Down Expand Up @@ -283,7 +289,7 @@ def delete_recorder(self, recorder_id=None, recorder_name=None):
UNLIMITED = 50000 # FIXME: Mlflow can only list 50000 records at most!!!!!!!

def list_recorders(self, max_results=UNLIMITED):
runs = self._client.search_runs(self.id, run_view_type=ViewType.ACTIVE_ONLY, max_results=max_results)[::-1]
runs = self._client.search_runs(self.id, run_view_type=ViewType.ACTIVE_ONLY, max_results=max_results)
recorders = dict()
for i in range(len(runs)):
recorder = MLflowRecorder(self.id, self._uri, mlflow_run=runs[i])
Expand Down
25 changes: 15 additions & 10 deletions qlib/workflow/expm.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class ExpManager:

def __init__(self, uri: Text, default_exp_name: Optional[Text]):
self._current_uri = uri
self.default_exp_name = default_exp_name
self._default_exp_name = default_exp_name
self.active_experiment = None # only one experiment can active each time

def __repr__(self):
Expand All @@ -36,6 +36,7 @@ def start_exp(
experiment_name: Optional[Text] = None,
recorder_name: Optional[Text] = None,
uri: Optional[Text] = None,
resume: bool = False,
**kwargs,
):
"""
Expand All @@ -50,6 +51,8 @@ def start_exp(
name of the recorder to be started.
uri : str
the current tracking URI.
resume : boolean
whether to resume the experiment and recorder.
Returns
-------
Expand Down Expand Up @@ -151,9 +154,7 @@ def get_exp(self, experiment_id=None, experiment_name=None, create: bool = True)
if self.active_experiment is not None:
return self.active_experiment
# User don't want get active code now.
# Don't assume underlying code could handle the case of two None
if experiment_id is None and experiment_name is None:
experiment_name = self.default_exp_name
experiment_name = self._default_exp_name

if create:
exp, is_new = self._get_or_create_exp(experiment_id=experiment_id, experiment_name=experiment_name)
Expand All @@ -171,12 +172,10 @@ def _get_or_create_exp(self, experiment_id=None, experiment_name=None) -> (objec
automatically create a new experiment based on the given id and name.
"""
try:
if experiment_id is None and experiment_name is None:
experiment_name = self.default_exp_name
return self._get_exp(experiment_id=experiment_id, experiment_name=experiment_name), False
except ValueError:
if experiment_name is None:
experiment_name = self.default_exp_name
experiment_name = self._default_exp_name
logger.info(f"No valid experiment found. Create a new experiment with name {experiment_name}.")
return self.create_exp(experiment_name), True

Expand Down Expand Up @@ -291,16 +290,22 @@ def client(self):
return self._client

def start_exp(
self, experiment_name: Optional[Text] = None, recorder_name: Optional[Text] = None, uri: Optional[Text] = None
self,
experiment_name: Optional[Text] = None,
recorder_name: Optional[Text] = None,
uri: Optional[Text] = None,
resume: bool,
):
# Set the tracking uri
self.set_uri(uri)
# Create experiment
if experiment_name is None:
experiment_name = self._default_exp_name
experiment, _ = self._get_or_create_exp(experiment_name=experiment_name)
# Set up active experiment
self.active_experiment = experiment
# Start the experiment
self.active_experiment.start(recorder_name)
self.active_experiment.start(recorder_name, resume)

return self.active_experiment

Expand All @@ -316,7 +321,7 @@ def create_exp(self, experiment_name: Optional[Text] = None):
# init experiment
experiment_id = self.client.create_experiment(experiment_name)
experiment = MLflowExperiment(experiment_id, experiment_name, self.uri)
experiment._default_name = self.default_exp_name
experiment._default_name = self._default_exp_name

return experiment

Expand Down

0 comments on commit 447fed8

Please sign in to comment.