Skip to content

Commit

Permalink
Merge pull request #610 from IBM/noop-gen
Browse files Browse the repository at this point in the history
Add generator to noop pipeline.
  • Loading branch information
roytman authored Sep 23, 2024
2 parents eba80df + abf814b commit c79e88a
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 34 deletions.
4 changes: 3 additions & 1 deletion kfp/pipeline_generator/single-pipeline/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
## Steps to generate a new pipeline
- create a `pipeline_definitions.yaml` file for the required task (similar to the example [pipeline_definitions.yaml for the noop task](./example/pipeline_definitions.yaml)).
- create a `pipeline_definitions.yaml` file for the required task (similar to the example [pipeline_definitions.yaml for the noop task](../../../transforms/universal/noop/kfp_ray/pipeline_definitions.yaml)).
- execute `make -C ../../../transforms workflow-venv` from this directory
- execute `source ../../../transforms/venv/bin/activate`
- execute `./run.sh --config_file <pipeline_definitions_file_path> --output_dir_file <destination directory>`. When `pipeline_definitions_file_path` is the path of the `pipeline_definitions.yaml` file that defines the pipeline and `destination directory` is a directory where new pipeline file
will be generated.
18 changes: 6 additions & 12 deletions kfp/pipeline_generator/single-pipeline/pipeline_generator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
PRE_COMMIT = "../pre-commit-config.yaml"

PIPELINE_TEMPLATE_FILE = "simple_pipeline.py"

INPUT_PARAMETERS = "input_parameters"
Expand All @@ -13,13 +13,15 @@

if __name__ == "__main__":
import argparse

import os
import yaml
from jinja2 import Environment, FileSystemLoader

environment = Environment(loader=FileSystemLoader("templates/"))
script_dir = os.path.dirname(os.path.abspath(__file__))
environment = Environment(loader=FileSystemLoader(f"{script_dir}/templates/"))
template = environment.get_template(PIPELINE_TEMPLATE_FILE)

#pre_commit_config = f"{script_dir}/../pre-commit-config.yaml"
parser = argparse.ArgumentParser(description="Kubeflow pipeline generator for Foundation Models")
parser.add_argument("-c", "--config_file", type=str, default="")
parser.add_argument("-od", "--output_dir_file", type=str, default="")
Expand Down Expand Up @@ -50,16 +52,8 @@
image_pull_secret=common_input_params_values["image_pull_secret"],
multi_s3=pipeline_parameters["multi_s3"],
)

output_file = f"{args.output_dir_file}/{pipeline_parameters[NAME]}_wf.py"
output_file = f"{args.output_dir_file}{pipeline_parameters[NAME]}_wf.py"
with open(output_file, mode="w", encoding="utf-8") as message:
message.write(content)
print(f"... wrote {output_file}")

import sys

from pre_commit.main import main

print(f"Pipeline ${output_file} auto generation completed")
args = ["run", "--file", f"{output_file}", "-c", PRE_COMMIT]
sys.exit(main(args))
9 changes: 4 additions & 5 deletions kfp/pipeline_generator/single-pipeline/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ fi
ROOT_DIR=${PWD}

mkdir -p ${ROOT_DIR}/${DIST_DIR}/
python3 -m venv venv
source venv/bin/activate
pip install pre-commit
pip install jinja2
python3 pipeline_generator.py -c ${DEF_FILE} -od ${ROOT_DIR}/${DIST_DIR}/

script_dir="$(dirname "$(readlink -f "$0")")"
echo $PYTHONPATH
python3 ${script_dir}/pipeline_generator.py -c ${DEF_FILE} -od ${ROOT_DIR}/${DIST_DIR}/
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import os

import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl

from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils


Expand Down Expand Up @@ -101,19 +101,21 @@ def compute_exec_params_func(
)
def {{ pipeline_name }}(
# Ray cluster
ray_name: str = "{{ pipeline_name }}-kfp-ray",
ray_name: str = "{{ pipeline_name }}-kfp-ray", # name of Ray cluster
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image},
server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
# data access
{% if multi_s3 == False %}
{%- if multi_s3 == False %}
data_s3_config: str = "{'input_folder': '{{ input_folder }}', 'output_folder': '{{ output_folder }}'}",
{% else %}
{%- else %}
data_s3_config: str = ["{'input_folder': '{{ input_folder }}', 'output_folder': '{{ output_folder }}'}"],
{% endif %}
{%- endif %}
data_s3_access_secret: str = "{{ s3_access_secret }}",
data_max_files: int = -1,
data_num_samples: int = -1,
data_checkpointing: bool = False,
# orchestrator
runtime_actor_options: dict = {'num_cpus': 0.8},
runtime_pipeline_id: str = "pipeline_id",
Expand Down Expand Up @@ -209,4 +211,4 @@ def {{ pipeline_name }}(

if __name__ == "__main__":
# Compiling the pipeline
compiler.Compiler().compile({{ pipeline_name }}, __file__.replace(".py", ".yaml"))
compiler.Compiler().compile({{ pipeline_name }}, __file__.replace(".py", ".yaml"))
2 changes: 2 additions & 0 deletions kfp/pipeline_generator/superpipeline/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
## Steps to generate a new super pipeline in KFP v1.
- The super pipeline allows you to execute several transforms within a single pipeline. For more details, refer [multi_transform_pipeline.md](../../doc/multi_transform_pipeline.md).
- Create a `super_pipeline_definitions.yaml` file for the required task. You can refer to the example [super_pipeline_definitions.yaml](./super_pipeline_definitions.yaml).
- execute `make -C ../../../transforms workflow-venv` from this directory
- execute `source ../../../transforms/venv/bin/activate`
- Execute `./run.sh --config_file < super_pipeline_definitions.yaml> --output_dir_file <destination_directory>`. Here, `super_pipeline_definitions.yaml` is the super pipeline definition file, that you created above, and `destination_directory` is the directory where the new super pipeline file will be generated.


Expand Down
5 changes: 1 addition & 4 deletions kfp/pipeline_generator/superpipeline/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,5 @@ fi
ROOT_DIR=${PWD}

mkdir -p ${ROOT_DIR}/${DIST_DIR}/
python3 -m venv venv
source venv/bin/activate
pip install pre-commit
pip install jinja2

python3 super_pipeline_generator.py -c ${DEF_FILE} -od ${ROOT_DIR}/${DIST_DIR}/
3 changes: 3 additions & 0 deletions transforms/.make.workflows
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ ${WORKFLOW_VENV_ACTIVATE}: ${REPOROOT}/.make.versions ${REPOROOT}/kfp/kfp_ray_co
pip install -e $(REPOROOT)/kfp/kfp_support_lib/shared_workflow_support; \
pip install -e $(REPOROOT)/kfp/kfp_support_lib/$(WORKFLOW_SUPPORT_LIB); \
$(MAKE) -C ${REPOROOT}/kfp/kfp_ray_components set-versions
pip install jinja2
pip install pyyaml
pip install pre-commit
@# Help: Create the virtual environment common to all workflows

.PHONY: .workflows.upload-pipeline
Expand Down
4 changes: 4 additions & 0 deletions transforms/universal/noop/kfp_ray/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,7 @@ workflow-upload: workflow-build
@for file in $(YAML_WF); do \
$(MAKE) .workflows.upload-pipeline PIPELINE_FILE=$$file; \
done

.PHONY: workflow-generate
workflow-generate: workflow-venv
. ${WORKFLOW_VENV_ACTIVATE} && ../../../../kfp/pipeline_generator/single-pipeline/run.sh -c `pwd`/pipeline_definitions.yaml -od .
7 changes: 7 additions & 0 deletions transforms/universal/noop/kfp_ray/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ This project allows execution of the [noop Ray transform](../ray) as a

The detail pipeline is presented in the [Simplest Transform pipeline tutorial](../../../../kfp/doc/simple_transform_pipeline.md)

## Pipeline file generation
In order to generate a pipeline python file run
```shell
make workflow-generate
```
This will use the [pipeline_definitions.yaml](pipeline_definitions.yaml) to generate the python file of the pipeline. It uses the [pipeline generator](../../../../kfp/pipeline_generator/single-pipeline/) directory.

## Compilation

In order to compile pipeline definitions run
Expand Down
13 changes: 7 additions & 6 deletions transforms/universal/noop/kfp_ray/noop_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl

from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils


Expand Down Expand Up @@ -92,7 +93,7 @@ def compute_exec_params_func(

@dsl.pipeline(
name=TASK_NAME + "-ray-pipeline",
description="Pipeline for noop",
description="Pipeline for noop task",
)
def noop(
# Ray cluster
Expand All @@ -103,7 +104,7 @@ def noop(
server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
# data access
data_s3_config: str = "{'input_folder': 'test/noop/input/', 'output_folder': 'test/noop/output/'}",
data_s3_access_secret: str = "s3-minio",
data_s3_access_secret: str = "s3-secret",
data_max_files: int = -1,
data_num_samples: int = -1,
data_checkpointing: bool = False,
Expand All @@ -117,7 +118,7 @@ def noop(
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
):
"""
Pipeline to execute NOOP transform
Pipeline to execute noop transform
:param ray_name: name of the Ray cluster
:param ray_head_options: head node options, containing the following:
cpu - number of cpus
Expand Down Expand Up @@ -167,6 +168,7 @@ def noop(
runtime_code_location=runtime_code_location,
noop_sleep_sec=noop_sleep_sec,
)

ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2)
# start Ray cluster
ray_cluster = create_ray_op(
Expand All @@ -179,12 +181,12 @@ def noop(
)
ComponentUtils.add_settings_to_component(ray_cluster, ONE_HOUR_SEC * 2)
ray_cluster.after(compute_exec_params)

# Execute job
execute_job = execute_ray_jobs_op(
ray_name=ray_name,
run_id=run_id,
additional_params=additional_params,
# note that the parameters below are specific for NOOP transform
exec_params=compute_exec_params.output,
exec_script_name=EXEC_SCRIPT_NAME,
server_url=server_url,
Expand All @@ -193,7 +195,6 @@ def noop(
ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret)
execute_job.after(ray_cluster)


if __name__ == "__main__":
# Compiling the pipeline
compiler.Compiler().compile(noop, __file__.replace(".py", ".yaml"))
compiler.Compiler().compile(noop, __file__.replace(".py", ".yaml"))
24 changes: 24 additions & 0 deletions transforms/universal/noop/kfp_ray/pipeline_definitions.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
pipeline_parameters:
name: "noop"
description: "Pipeline for noop task"
script_name: "noop_transform_ray.py"
prefix: ""
multi_s3: False
compute_func_name: ""
compute_func_import: ""
component_spec_path: ""

pipeline_common_input_parameters_values:
kfp_base_image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"
transform_image: "quay.io/dataprep1/data-prep-kit/noop-ray:latest"
s3_access_secret: "s3-secret"
image_pull_secret: ""
input_folder: "test/noop/input/"
output_folder: "test/noop/output/"

pipeline_transform_input_parameters:
pipeline_arguments:
- name: "noop_sleep_sec"
type: "int"
value: 10
description: "noop sleep time"

0 comments on commit c79e88a

Please sign in to comment.