Skip to content

Commit

Permalink
API and python client support for DeleteExperiment and RestoreExperim…
Browse files Browse the repository at this point in the history
…ent (mlflow#344)

* API and python client support for DeleteExperiment and RestoreExperiment

* fixing broken test

* added delete_experiment and restore_experiment to tracking.rst

* addressing PR comments, fixed documentation

* removing delete_experiment and restore_experiment from fluent API
  • Loading branch information
mparkhe authored Aug 23, 2018
1 parent 9c33548 commit 091b1b3
Show file tree
Hide file tree
Showing 11 changed files with 483 additions and 135 deletions.
2 changes: 2 additions & 0 deletions mlflow/entities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from mlflow.entities.run_status import RunStatus
from mlflow.entities.run_tag import RunTag
from mlflow.entities.source_type import SourceType
from mlflow.entities.view_type import ViewType

__all__ = [
"Experiment",
Expand All @@ -22,4 +23,5 @@
"RunStatus",
"RunTag",
"SourceType",
"ViewType",
]
3 changes: 3 additions & 0 deletions mlflow/entities/view_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class ViewType(object):
"""Enum to qualify `ListExperiments` API query for requested experiment types."""
ACTIVE_ONLY, DELETED_ONLY, ALL = range(1, 4)
81 changes: 80 additions & 1 deletion mlflow/protos/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,41 @@ service MlflowService {
};
}

// Delete experiment.
//
// This operation will mark experiment and associated runs, params, metrics, ... etc for deletion.
// If experiment uses FileStore, artifacts associated with experiment will also be deleted.
//
rpc deleteExperiment (DeleteExperiment) returns (DeleteExperiment.Response) {
option (rpc) = {
endpoints: [{
method: "POST",
path: "/preview/mlflow/experiments/delete"
since { major: 2, minor: 0 },
}],
visibility: PUBLIC,
};
}

// Restore a deleted experiment.
//
// This operation will restore an experiment marked for deletion. This will also restore
// associated metadata, runs, metrics, and params. If experiment uses FileStore, underlying
// artifacts associated with experiment will also be restored.
//
// Throws ``RESOURCE_DOES_NOT_EXIST`` if experiment was never created or was permanently deleted.
//
rpc restoreExperiment (RestoreExperiment) returns (RestoreExperiment.Response) {
option (rpc) = {
endpoints: [{
method: "POST",
path: "/preview/mlflow/experiments/restore"
since { major: 2, minor: 0 },
}],
visibility: PUBLIC,
};
}

// Create a new run within an experiment. A run is usually a single execution of a
// machine learning or data ETL pipeline. MLflow uses runs to track :ref:`mlflowParam`,
// :ref:`mlflowMetric`, and :ref:`mlflowRunTag` associated with a single execution.
Expand Down Expand Up @@ -228,6 +263,18 @@ service MlflowService {
}
}

// Qualifier view type for ListExperiment query.
enum ViewType {
// Default. Only return active experiments.
ACTIVE_ONLY = 1;

// Only return deleted experiments.
DELETED_ONLY = 2;

// Get all active and deleted experiments.
ALL = 3;
}

// Description of the source that generated a run.
enum SourceType {
// Within Databricks Notebook environment.
Expand Down Expand Up @@ -362,7 +409,15 @@ message Experiment {
// Location where artifacts for this experiment are stored.
optional string artifact_location = 3;

// TODO: Do we need to qualify location type?
// Current life cycle stage of the experiment : OneOf("active", "deleted")
// Deleted experiments are not returned by APIs
optional string lifecycle_stage = 4;

// Last update time
optional int64 last_update_time = 5;

// Creation time
optional int64 creation_time = 6;
}

message CreateExperiment {
Expand All @@ -384,6 +439,10 @@ message CreateExperiment {
message ListExperiments {
option (scalapb.message).extends = "com.databricks.rpc.RPC[$this.Response]";

// Qualifier for type of experiments to be returned.
// If unspecified returns only active experiments.
optional ViewType view_type = 1;

message Response {
// All experiments
repeated Experiment experiments = 1;
Expand All @@ -405,6 +464,26 @@ message GetExperiment {
}
}

message DeleteExperiment {
option (scalapb.message).extends = "com.databricks.rpc.RPC[$this.Response]";

// Identifier to get an experiment
optional int64 experiment_id = 1 [(validate_required) = true];

message Response {
}
}

message RestoreExperiment {
option (scalapb.message).extends = "com.databricks.rpc.RPC[$this.Response]";

// Identifier to get an experiment
optional int64 experiment_id = 1 [(validate_required) = true];

message Response {
}
}

message CreateRun {
option (scalapb.message).extends = "com.databricks.rpc.RPC[$this.Response]";

Expand Down
400 changes: 312 additions & 88 deletions mlflow/protos/service_pb2.py

Large diffs are not rendered by default.

26 changes: 24 additions & 2 deletions mlflow/server/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
from mlflow.protos import databricks_pb2
from mlflow.protos.service_pb2 import CreateExperiment, MlflowService, GetExperiment, \
GetRun, SearchRuns, ListArtifacts, GetMetricHistory, CreateRun, \
UpdateRun, LogMetric, LogParam, SetTag, ListExperiments, GetMetric, GetParam
UpdateRun, LogMetric, LogParam, SetTag, ListExperiments, GetMetric, GetParam, \
DeleteExperiment, RestoreExperiment
from mlflow.store.artifact_repo import ArtifactRepository
from mlflow.store.file_store import FileStore

Expand Down Expand Up @@ -115,6 +116,24 @@ def _get_experiment():
return response


def _delete_experiment():
request_message = _get_request_message(DeleteExperiment())
_get_store().delete_experiment(request_message.experiment_id)
response_message = DeleteExperiment.Response()
response = Response(mimetype='application/json')
response.set_data(_message_to_json(response_message))
return response


def _restore_experiment():
request_message = _get_request_message(RestoreExperiment())
_get_store().restore_experiment(request_message.experiment_id)
response_message = RestoreExperiment.Response()
response = Response(mimetype='application/json')
response.set_data(_message_to_json(response_message))
return response


def _create_run():
request_message = _get_request_message(CreateRun())

Expand Down Expand Up @@ -245,8 +264,9 @@ def _get_param():


def _list_experiments():
request_message = _get_request_message(ListExperiments())
experiment_entities = _get_store().list_experiments(request_message.view_type)
response_message = ListExperiments.Response()
experiment_entities = _get_store().list_experiments()
response_message.experiments.extend([e.to_proto() for e in experiment_entities])
response = Response(mimetype='application/json')
response.set_data(_message_to_json(response_message))
Expand Down Expand Up @@ -292,6 +312,8 @@ def get_endpoints():
HANDLERS = {
CreateExperiment: _create_experiment,
GetExperiment: _get_experiment,
DeleteExperiment: _delete_experiment,
RestoreExperiment: _restore_experiment,
CreateRun: _create_run,
UpdateRun: _update_run,
LogParam: _log_param,
Expand Down
21 changes: 11 additions & 10 deletions mlflow/store/abstract_store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from abc import abstractmethod, ABCMeta

from mlflow.entities import ViewType


class AbstractStore:
"""
Expand All @@ -17,12 +19,11 @@ def __init__(self):
pass

@abstractmethod
def list_experiments(self, include_deleted, only_deleted):
def list_experiments(self, view_type=ViewType.ACTIVE_ONLY):
"""
:param include_deleted: Response list to also includes experiments marked for deletion.
:param only_deleted: Response list to only include experiments that are marked for deletion.
:return: a list of all known Experiment objects
:param view_type: Qualify requested type of experiments.
:return: a list of Experiment objects stored in store for requested view.
"""
pass

Expand All @@ -39,21 +40,21 @@ def create_experiment(self, name, artifact_location):
pass

@abstractmethod
def get_experiment(self, experiment_id, include_deleted, only_deleted):
def get_experiment(self, experiment_id):
"""
Fetches the experiment from the backend store.
Fetches the experiment by ID from the backend store.
Throws an exception if experiment is not found or permanently deleted.
:param experiment_id: Integer id for the experiment
:param include_deleted: Response list to also includes experiments marked for deletion.
:param only_deleted: Response list to only include experiments that are marked for deletion.
:return: A single Experiment object if it exists, otherwise raises an Exception.
"""
pass

@abstractmethod
def delete_experiment(self, experiment_id):
"""
Deletes the experiment from the backend store.
Deletes the experiment from the backend store. Deleted experiments can be restored until
permanently deleted.
:param experiment_id: Integer id for the experiment
"""
Expand All @@ -62,7 +63,7 @@ def delete_experiment(self, experiment_id):
@abstractmethod
def restore_experiment(self, experiment_id):
"""
Restore deleted experiment.
Restore deleted experiment unless it is permanently deleted.
:param experiment_id: Integer id for the experiment
"""
Expand Down
50 changes: 27 additions & 23 deletions mlflow/store/file_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import uuid

from mlflow.entities import Experiment, Metric, Param, Run, RunData, RunInfo, RunStatus, RunTag
from mlflow.entities import Experiment, Metric, Param, Run, RunData, RunInfo, RunStatus, RunTag, \
ViewType
from mlflow.store.abstract_store import AbstractStore
from mlflow.utils.validation import _validate_metric_name, _validate_param_name, _validate_run_id, \
_validate_tag_name
Expand Down Expand Up @@ -58,13 +59,11 @@ def _check_root_dir(self):
if not is_directory(self.root_directory):
raise Exception("'%s' is not a directory." % self.root_directory)

def _get_experiment_path(self, experiment_id, include_deleted=True, only_deleted=False):
if include_deleted and only_deleted:
raise Exception("Only one of 'include_deleted' or 'only_deleted' can be 'true'.")
def _get_experiment_path(self, experiment_id, view_type=ViewType.ALL):
parents = []
if not only_deleted:
if view_type == ViewType.ACTIVE_ONLY or view_type == ViewType.ALL:
parents.append(self.root_directory)
if include_deleted or only_deleted:
if view_type == ViewType.DELETED_ONLY or view_type == ViewType.ALL:
parents.append(self.trash_folder)
for parent in parents:
exp_list = find(parent, str(experiment_id), full_path=True)
Expand Down Expand Up @@ -108,16 +107,14 @@ def _get_active_experiments(self, full_path=False):
def _get_deleted_experiments(self, full_path=False):
return list_subdirs(self.trash_folder, full_path)

def list_experiments(self, include_deleted=False, only_deleted=False):
if include_deleted and only_deleted:
raise Exception("Only one of 'include_deleted' or 'only_deleted' can be 'true'.")
def list_experiments(self, view_type=ViewType.ACTIVE_ONLY):
self._check_root_dir()
rsl = []
if include_deleted or only_deleted:
rsl += self._get_deleted_experiments(False)
if not only_deleted:
rsl += self._get_active_experiments(False)
return [self.get_experiment(exp_id) for exp_id in rsl]
if view_type == ViewType.ACTIVE_ONLY or view_type == ViewType.ALL:
rsl += self._get_active_experiments(full_path=False)
if view_type == ViewType.DELETED_ONLY or view_type == ViewType.ALL:
rsl += self._get_deleted_experiments(full_path=False)
return [self._get_experiment(exp_id, view_type) for exp_id in rsl]

def _create_experiment_with_id(self, name, experiment_id, artifact_uri):
self._check_root_dir()
Expand All @@ -136,41 +133,48 @@ def create_experiment(self, name, artifact_location=None):
raise Exception("Experiment '%s' already exists." % experiment.name)
# Get all existing experiments and find the one with largest ID.
# len(list_all(..)) would not work when experiments are deleted.
experiments_ids = [e.experiment_id for e in self.list_experiments(include_deleted=True)]
experiments_ids = [e.experiment_id for e in self.list_experiments(ViewType.ALL)]
experiment_id = max(experiments_ids) + 1
return self._create_experiment_with_id(name, experiment_id, artifact_location)

def _has_experiment(self, experiment_id):
return len(self._get_experiment_path(experiment_id)) > 0

def get_experiment(self, experiment_id, include_deleted=False, only_deleted=False):
def _get_experiment(self, experiment_id, view_type=ViewType.ALL):
self._check_root_dir()
experiment_dirs = self._get_experiment_path(experiment_id, include_deleted, only_deleted)
experiment_dirs = self._get_experiment_path(experiment_id, view_type)
if len(experiment_dirs) == 0:
raise Exception("Could not find experiment with ID %s" % experiment_id)
meta = read_yaml(experiment_dirs[0], FileStore.META_DATA_FILE_NAME)
return Experiment.from_dictionary(meta)

def get_experiment(self, experiment_id):
"""
Fetches the experiment. This will search for active as well as deleted experiments.
:param experiment_id: Integer id for the experiment
:return: A single Experiment object if it exists, otherwise raises an Exception.
"""
return self._get_experiment(experiment_id)

def get_experiment_by_name(self, name):
self._check_root_dir()
for experiment in self.list_experiments(include_deleted=True):
for experiment in self.list_experiments(ViewType.ALL):
if experiment.name == name:
return experiment
return None

def delete_experiment(self, experiment_id):
experiment_dirs = self._get_experiment_path(experiment_id, include_deleted=False)
experiment_dirs = self._get_experiment_path(experiment_id, ViewType.ACTIVE_ONLY)
if len(experiment_dirs) == 0:
raise Exception("Could not find experiment with ID %s" % experiment_id)
mv(experiment_dirs[0], self.trash_folder)

def restore_experiment(self, experiment_id):
experiment_dirs = self._get_experiment_path(experiment_id,
include_deleted=False,
only_deleted=True)
experiment_dirs = self._get_experiment_path(experiment_id, ViewType.DELETED_ONLY)
if len(experiment_dirs) == 0:
raise Exception("Could not find deleted experiment with ID %d" % experiment_id)
conflict_experiment = self._get_experiment_path(experiment_id, include_deleted=False)
conflict_experiment = self._get_experiment_path(experiment_id, ViewType.ACTIVE_ONLY)
if len(conflict_experiment) > 0:
raise Exception("Cannot restore eperiment with ID %d. "
"An experiment with same ID already exists." % experiment_id)
Expand Down
10 changes: 5 additions & 5 deletions mlflow/store/rest_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@

from google.protobuf.json_format import MessageToJson, ParseDict


from mlflow.store.abstract_store import AbstractStore

from mlflow.entities import Experiment, Run, RunInfo, Param, Metric
from mlflow.entities import Experiment, Run, RunInfo, Param, Metric, ViewType

from mlflow.utils.rest_utils import http_request

Expand Down Expand Up @@ -79,11 +78,12 @@ def _call_endpoint(self, api, json_body):
ParseDict(js_dict=js_dict, message=response_proto)
return response_proto

def list_experiments(self, include_deleted=False, only_deleted=False):
def list_experiments(self, view_type=ViewType.ACTIVE_ONLY):
"""
:return: a list of all known Experiment objects
"""
response_proto = self._call_endpoint(ListExperiments, None)
req_body = _message_to_json(ListExperiments(view_type=view_type))
response_proto = self._call_endpoint(ListExperiments, req_body)
return [Experiment.from_proto(experiment_proto)
for experiment_proto in response_proto.experiments]

Expand All @@ -100,7 +100,7 @@ def create_experiment(self, name, artifact_location=None):
response_proto = self._call_endpoint(CreateExperiment, req_body)
return response_proto.experiment_id

def get_experiment(self, experiment_id, include_deleted=False, only_deleted=False):
def get_experiment(self, experiment_id):
"""
Fetches the experiment from the backend store.
Expand Down
Loading

0 comments on commit 091b1b3

Please sign in to comment.