Skip to content

Commit

Permalink
Supports GenericTrainer in AIP Trainer.
Browse files Browse the repository at this point in the history
GenericTrainer can be used in AIP Trainer by using `ai_platform_trainer_executor.GenericExecutor`.

```
Trainer(
            custom_executor_spec=executor_spec.ExecutorClassSpec(
                ai_platform_trainer_executor.GenericExecutor),
            module_file=self._trainer_module,
            ...
```

PiperOrigin-RevId: 298978668
  • Loading branch information
jiyongjung authored and tensorflow-extended-team committed Mar 5, 2020
1 parent ea9e443 commit 62ab571
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 63 deletions.
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* Added 'tfx_runner' label for CAIP, BQML and Dataflow jobs submitted from
TFX components.
* Fixed the Taxi Colab notebook.
* Adopted the generic trainer executor when using CAIP Training.

### Deprecations

Expand Down
21 changes: 17 additions & 4 deletions tfx/extensions/google_cloud_ai_platform/trainer/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@
JOB_ID_KEY = 'ai_platform_training_job_id'


class Executor(base_executor.BaseExecutor):
"""Start a trainer job on Google Cloud AI Platform (GAIP)."""
class GenericExecutor(base_executor.BaseExecutor):
"""Start a trainer job on Google Cloud AI Platform using a generic Trainer."""

def _GetExecutorClass(self):
return tfx_trainer_executor.GenericExecutor

def Do(self, input_dict: Dict[Text, List[types.Artifact]],
output_dict: Dict[Text, List[types.Artifact]],
Expand Down Expand Up @@ -67,8 +70,18 @@ def Do(self, input_dict: Dict[Text, List[types.Artifact]],
raise ValueError(err_msg)

job_id = custom_config.get(JOB_ID_KEY)
executor_class_path = '%s.%s' % (tfx_trainer_executor.Executor.__module__,
tfx_trainer_executor.Executor.__name__)

executor_class = self._GetExecutorClass()
executor_class_path = '%s.%s' % (executor_class.__module__,
executor_class.__name__)

return runner.start_aip_training(input_dict, output_dict, exec_properties,
executor_class_path, training_inputs,
job_id)


class Executor(GenericExecutor):
"""Start a trainer job on Google Cloud AI Platform using a default Trainer."""

def _GetExecutorClass(self):
return tfx_trainer_executor.Executor
32 changes: 22 additions & 10 deletions tfx/extensions/google_cloud_ai_platform/trainer/executor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from __future__ import print_function

import os

# Standard Imports
import mock
import tensorflow as tf
Expand Down Expand Up @@ -50,36 +51,47 @@ def setUp(self):
self._executor_class_path = '%s.%s' % (
tfx_trainer_executor.Executor.__module__,
tfx_trainer_executor.Executor.__name__)
self._generic_executor_class_path = '%s.%s' % (
tfx_trainer_executor.GenericExecutor.__module__,
tfx_trainer_executor.GenericExecutor.__name__)

self.addCleanup(mock.patch.stopall)
self.mock_runner = mock.patch(
'tfx.extensions.google_cloud_ai_platform.trainer.executor.runner'
).start()

@mock.patch(
'tfx.extensions.google_cloud_ai_platform.trainer.executor.runner'
)
def testDo(self, mock_runner):
def testDo(self):
executor = ai_platform_trainer_executor.Executor()
executor.Do(self._inputs, self._outputs, self._exec_properties)
mock_runner.start_aip_training.assert_called_with(
self.mock_runner.start_aip_training.assert_called_with(
self._inputs, self._outputs, self._exec_properties,
self._executor_class_path, {
'project': self._project_id,
'jobDir': self._job_dir,
}, None)

@mock.patch(
'tfx.extensions.google_cloud_ai_platform.trainer.executor.runner'
)
def testDoWithJobIdOverride(self, mock_runner):
def testDoWithJobIdOverride(self):
executor = ai_platform_trainer_executor.Executor()
job_id = 'overridden_job_id'
self._exec_properties['custom_config'][
ai_platform_trainer_executor.JOB_ID_KEY] = job_id
executor.Do(self._inputs, self._outputs, self._exec_properties)
mock_runner.start_aip_training.assert_called_with(
self.mock_runner.start_aip_training.assert_called_with(
self._inputs, self._outputs, self._exec_properties,
self._executor_class_path, {
'project': self._project_id,
'jobDir': self._job_dir,
}, job_id)

def testDoWithGenericExecutorClass(self):
executor = ai_platform_trainer_executor.GenericExecutor()
executor.Do(self._inputs, self._outputs, self._exec_properties)
self.mock_runner.start_aip_training.assert_called_with(
self._inputs, self._outputs, self._exec_properties,
self._generic_executor_class_path, {
'project': self._project_id,
'jobDir': self._job_dir,
}, None)

if __name__ == '__main__':
tf.test.main()
128 changes: 79 additions & 49 deletions tfx/orchestration/kubeflow/kubeflow_gcp_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,55 +196,32 @@ def testModelValidatorOnDataflowRunner(self):
])
self._compile_and_run_pipeline(pipeline)

def testAIPlatformTrainerPipeline(self):
"""Trainer-only test pipeline on AI Platform Training."""
pipeline_name = 'kubeflow-aip-trainer-test-{}'.format(self._random_id())
pipeline = self._create_pipeline(
pipeline_name,
[
self.schema_importer,
self.transformed_examples_importer,
self.transform_graph_importer,
Trainer(
custom_executor_spec=executor_spec.ExecutorClassSpec(
ai_platform_trainer_executor.Executor),
module_file=self._trainer_module,
transformed_examples=self.transformed_examples_importer
.outputs['result'],
schema=self.schema_importer.outputs['result'],
transform_graph=self.transform_graph_importer.outputs['result'],
train_args=trainer_pb2.TrainArgs(num_steps=10),
eval_args=trainer_pb2.EvalArgs(num_steps=5),
custom_config={
# Test that distributed training is behaves properly.
ai_platform_trainer_executor.TRAINING_ARGS_KEY: {
'project':
self._gcp_project_id,
'region':
self._gcp_region,
'jobDir':
os.path.join(
self._pipeline_root(pipeline_name), 'tmp'),
'masterConfig': {
'imageUri': self._container_image,
},
'scaleTier':
'CUSTOM',
'masterType':
'large_model',
'parameterServerType':
'standard',
'parameterServerCount':
1,
'workerType':
'standard',
'workerCount':
2,
}
})
])
self._compile_and_run_pipeline(pipeline)

def getCaipTrainingArgs(self, pipeline_name):
"""Training args for Google CAIP Training."""
return {
'project': self._gcp_project_id,
'region': self._gcp_region,
'jobDir': os.path.join(self._pipeline_root(pipeline_name), 'tmp'),
'masterConfig': {
'imageUri': self._container_image,
},
}

def getCaipTrainingArgsForDistributed(self, pipeline_name):
"""Training args to test that distributed training is behaves properly."""
args = self.getCaipTrainingArgs(pipeline_name)
args.update({
'scaleTier': 'CUSTOM',
'masterType': 'large_model',
'parameterServerType': 'standard',
'parameterServerCount': 1,
'workerType': 'standard',
'workerCount': 2,
})
return args

def assertNumberOfTrainerOutputIsOne(self, pipeline_name):
"""Make sure the number of trainer executions and output models."""
# There must be only one execution of Trainer.
trainer_output_base_dir = os.path.join(
self._pipeline_root(pipeline_name), 'Trainer', 'model')
Expand All @@ -263,6 +240,59 @@ def testAIPlatformTrainerPipeline(self):
path_utils.serving_model_dir(model_uri), 'export',
'chicago-taxi'))))

def testAIPlatformTrainerPipeline(self):
"""Trainer-only test pipeline on AI Platform Training."""
pipeline_name = 'kubeflow-aip-trainer-test-{}'.format(self._random_id())
pipeline = self._create_pipeline(pipeline_name, [
self.schema_importer, self.transformed_examples_importer,
self.transform_graph_importer,
Trainer(
custom_executor_spec=executor_spec.ExecutorClassSpec(
ai_platform_trainer_executor.Executor),
module_file=self._trainer_module,
transformed_examples=self.transformed_examples_importer
.outputs['result'],
schema=self.schema_importer.outputs['result'],
transform_graph=self.transform_graph_importer.outputs['result'],
train_args=trainer_pb2.TrainArgs(num_steps=10),
eval_args=trainer_pb2.EvalArgs(num_steps=5),
custom_config={
ai_platform_trainer_executor.TRAINING_ARGS_KEY:
self.getCaipTrainingArgsForDistributed(pipeline_name)
})
])
self._compile_and_run_pipeline(pipeline)
self.assertNumberOfTrainerOutputIsOne(pipeline_name)

def testAIPlatformGenericTrainerPipeline(self):
"""Trainer-only pipeline on AI Platform Training with GenericTrainer."""
pipeline_name = 'kubeflow-aip-generic-trainer-test-{}'.format(
self._random_id())
pipeline = self._create_pipeline(pipeline_name, [
self.schema_importer,
self.transformed_examples_importer,
self.transform_graph_importer,
Trainer(
custom_executor_spec=executor_spec.ExecutorClassSpec(
ai_platform_trainer_executor.GenericExecutor),
module_file=self._trainer_module,
transformed_examples=self.transformed_examples_importer
.outputs['result'],
schema=self.schema_importer.outputs['result'],
transform_graph=self.transform_graph_importer.outputs['result'],
train_args=trainer_pb2.TrainArgs(num_steps=10),
eval_args=trainer_pb2.EvalArgs(num_steps=5),
custom_config={
ai_platform_trainer_executor.TRAINING_ARGS_KEY:
self.getCaipTrainingArgs(pipeline_name)
})
])
self._compile_and_run_pipeline(pipeline)
self.assertNumberOfTrainerOutputIsOne(pipeline_name)
# TODO(b/150661783): Add tests using distributed training with a generic
# trainer.
# TODO(b/150576271): Add Trainer tests using Keras models.

# TODO(muchida): Identify more model types to ensure models trained in TF 2
# works with CAIP prediction service.
def testAIPlatformPusherPipeline(self):
Expand Down

0 comments on commit 62ab571

Please sign in to comment.