Skip to content

Commit

Permalink
Merge pull request #233 from Mohammad-nassar10/super-pipe-gene
Browse files Browse the repository at this point in the history
Super pipeline generator
  • Loading branch information
roytman authored Jul 31, 2024
2 parents 7935365 + 87dbcd1 commit 4e3e8e9
Show file tree
Hide file tree
Showing 12 changed files with 492 additions and 1 deletion.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
PRE_COMMIT = "./pre-commit-config.yaml"
PRE_COMMIT = "../pre-commit-config.yaml"
PIPELINE_TEMPLATE_FILE = "simple_pipeline.py"

INPUT_PARAMETERS = "input_parameters"
Expand Down
File renamed without changes.
7 changes: 7 additions & 0 deletions kfp/pipeline_generator/superpipeline/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
## Steps to generate a new super pipeline in KFP v1.
- The super pipeline allows you to execute several transforms within a single pipeline. For more details, refer [multi_transform_pipeline.md](../../doc/multi_transform_pipeline.md).
- Create a `super_pipeline_definitions.yaml` file for the required task. You can refer to the example [super_pipeline_definitions.yaml](./super_pipeline_definitions.yaml).
- Execute `./run.sh --config_file < super_pipeline_definitions.yaml> --output_dir_file <destination_directory>`. Here, `super_pipeline_definitions.yaml` is the super pipeline definition file, that you created above, and `destination_directory` is the directory where the new super pipeline file will be generated.


*__NOTE__*: the `component_spec_path` is the path to the `kfp_ray_components` folder and depends on where the workflow is compiled.
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# NOTE: This file is auto generated by Pipeline Generator.

import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils


# path to kfp component specifications files
component_spec_path = "../../../../../kfp/kfp_ray_components/"
# For every sub workflow we need a separate components, that knows about this subworkflow.
run_doc_id_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_ededup_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
doc_id_image = "quay.io/dataprep1/data-prep-kit/doc_id-ray:latest`"
ededup_image = "quay.io/dataprep1/data-prep-kit/ededup-ray:latest"

# Pipeline to invoke execution on remote resource
@dsl.pipeline(
name="sample-super-kubeflow-pipeline",
description="Pipeline to show how to run combine several transformer pipelines",
)
def super_pipeline(
p1_orch_doc_id_name: str = "doc_id_wf",
p1_orch_ededup_name: str = "ededup_wf",
p2_pipeline_runtime_pipeline_id: str = "pipeline_id",
p2_pipeline_ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}',
p2_pipeline_input_parent_path: str = "test/doc_id/input/",
p2_pipeline_output_parent_path: str = "test/super/output/",
p2_pipeline_parent_path_suffix: str = "",
p2_pipeline_data_s3_access_secret: str = "s3-secret",
# doc_id step parameters
p3_name: str = "doc_id",
p3_skip: bool = False,
p3_doc_id_doc_column: str = "contents",
p3_doc_id_hash_column: str = "hash_column",
p3_doc_id_int_column: str = "int_id_column",
p3_overriding_params: str = '{"ray_worker_options": {"image": "'
+ doc_id_image
+ '"}, "ray_head_options": {"image": "'
+ doc_id_image
+ '"}}',
# ededup step parameters
p4_name: str = "ededup",
p4_skip: bool = False,
p4_ededup_doc_column: str = "contents",
p4_ededup_hash_cpu: float = 0.5,
p4_ededup_n_samples: int = 10,
p4_overriding_params: str = '{"ray_worker_options": {"image": "'
+ ededup_image
+ '"}, "ray_head_options": {"image": "'
+ ededup_image
+ '"}}',
):

# get all arguments
args = locals()
orch_host = "http://ml-pipeline:8888"

def _set_component(op: dsl.BaseOp, displaied_name: str, prev_op: dsl.BaseOp = None):
# set the sub component UI name
op.set_display_name(displaied_name)

# Add pod labels
op.add_pod_label("app", "ml-pipeline").add_pod_label("component", "data-science-pipelines")
# No cashing
op.execution_options.caching_strategy.max_cache_staleness = "P0D"
# image pull policy
op.set_image_pull_policy("Always")
# Set the timeout for each task to one week (in seconds)
op.set_timeout(ONE_WEEK_SEC)
if prev_op is not None:
op.after(prev_op)

doc_id = run_doc_id_op(
name=p1_orch_doc_id_name, prefix="p3_", params=args, host=orch_host, input_folder=p2_pipeline_input_parent_path
)
_set_component(doc_id, "doc_id")
ededup = run_ededup_op(
name=p1_orch_ededup_name, prefix="p4_", params=args, host=orch_host, input_folder=doc_id.output
)
_set_component(ededup, "ededup", doc_id)

# Configure the pipeline level to one week (in seconds)
dsl.get_pipeline_conf().set_timeout(ONE_WEEK_SEC)


if __name__ == "__main__":
# Compiling the pipeline
compiler.Compiler().compile(super_pipeline, __file__.replace(".py", ".yaml"))
64 changes: 64 additions & 0 deletions kfp/pipeline_generator/superpipeline/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/bin/bash

POSITIONAL_ARGS=()

while [[ $# -gt 0 ]]; do
case $1 in
-c|--config_file)
DEF_FILE="$2"
if [[ "$2" = -* ]]
then
echo "ERROR: config_file value not provided."
exit 1
fi
shift # past argument
shift # past value
;;
-od|--output_dir_file)
DIST_DIR="$2"
if [[ "$2" = -* ]]
then
echo "ERROR: output_dir_file value not provided."
exit 1
fi
shift # past argument
shift # past value
;;
-h|--help)
echo "-c/--config_file(required): file path to config_file(pipeline_definition.yaml)."
echo "-od/--output_dir_file(required): output folder path to store generated pipeline."
exit 1
;;
-*|--*)
echo "Unknown option $1"
exit 1
;;
*)
POSITIONAL_ARGS+=("$1") # save positional arg
shift # past argument
;;
esac
done


if [ -z ${DEF_FILE+x} ]
then
echo "ERROR: config_file is not defined."
exit 1
fi

if [ -z ${DIST_DIR+x} ]
then
echo "ERROR: output_dir_file is not defined."
exit 1
fi


ROOT_DIR=${PWD}

mkdir -p ${ROOT_DIR}/${DIST_DIR}/
python3 -m venv venv
source venv/bin/activate
pip install pre-commit
pip install jinja2
python3 super_pipeline_generator.py -c ${DEF_FILE} -od ${ROOT_DIR}/${DIST_DIR}/
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
super_pipeline_metadata:
name: "sample-super-kubeflow-pipeline"
description: "Pipeline to show how to run combine several transformer pipelines"

super_pipeline_tasks:
- name: "doc_id"
pipeline_name: "doc_id_wf"
image: "quay.io/dataprep1/data-prep-kit/doc_id-ray:latest`"
- name: "ededup"
pipeline_name: "ededup_wf"
image: "quay.io/dataprep1/data-prep-kit/ededup-ray:latest"

super_pipeline_common_parameters:
- name: "input_parent_path"
type: "str"
value: "test/doc_id/input/"
description: ""
- name: "output_parent_path"
type: "str"
value: "test/super/output/"
description: ""
- name: "parent_path_suffix"
type: "str"
value: ""
description: ""
- name: "data_s3_access_secret"
type: "str"
value: "s3-secret"
description: ""


doc_id_step_parameters:
step_parameters:
- name: "skip"
type: "bool"
value: False
description: ""
- name: "doc_id_doc_column"
type: "str"
value: "contents"
description: ""
- name: "doc_id_hash_column"
type: "str"
value: "hash_column"
description: ""
- name: "doc_id_int_column"
type: "str"
value: "int_id_column"
description: ""



ededup_step_parameters:
step_parameters:
- name: "skip"
type: "bool"
value: False
description: ""
- name: "ededup_doc_column"
type: "str"
value: "contents"
description: ""
- name: "ededup_hash_cpu"
type: "float"
value: 0.5
description: ""
- name: "ededup_n_samples"
type: "int"
value: 10
description: ""
124 changes: 124 additions & 0 deletions kfp/pipeline_generator/superpipeline/super_pipeline_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import yaml


PRE_COMMIT = "../pre-commit-config.yaml"
PIPELINE_TEMPLATE_FILE = "template_superpipeline.py"

PIPELINE_TASKS = "super_pipeline_tasks"
COMMON_INPUT_PARAMETERS = "super_pipeline_common_parameters"
PIPELINE_METADATA = "super_pipeline_metadata"

INPUT_PARAMETERS = "input_parameters"
PIPELINE_PARAMETERS = "pipeline_parameters"
STEP_PARAMETERS = "step_parameters"
PIPELINE_COMMON_INPUT_PARAMETERS_VALUES = "pipeline_common_input_parameters_values"
PIPELINE_TRANSFORM_INPUT_PARAMETERS = "pipeline_transform_input_parameters"

NAME = "name"
TYPE = "type"
VALUE = "value"
DESCRIPTION = "description"


def get_generic_params(params, prefix = "") -> str:
ret_str = ""
if params is None:
return ret_str
for param in params:
ret_str += f"\n {prefix}{param[NAME]}: {param[TYPE]} = "
if param[TYPE] == "str":
ret_str += f'"{param[VALUE]}"'
else:
ret_str += f"{param[VALUE]}"
ret_str += f", {param.get(DESCRIPTION, '')}"
return ret_str

if __name__ == "__main__":
import argparse

from pre_commit.main import main
from jinja2 import Environment, FileSystemLoader

environment = Environment(loader=FileSystemLoader("templates/"))
template = environment.get_template(PIPELINE_TEMPLATE_FILE)

parser = argparse.ArgumentParser(description="Kubeflow pipeline generator for Foundation Models")
parser.add_argument("-c", "--config_file", type=str, default="")
parser.add_argument("-od", "--output_dir_file", type=str, default="")

args = parser.parse_args()
# open configuration file
with open(args.config_file, "r") as file:
pipeline_definitions = yaml.safe_load(file)

tasks_steps_params = []
pipeline_metadata = pipeline_definitions[PIPELINE_METADATA]
pipeline_tasks = pipeline_definitions[PIPELINE_TASKS]
common_input_params = pipeline_definitions[COMMON_INPUT_PARAMETERS]

component_spec_path = pipeline_metadata.get("component_spec_path", "")
if component_spec_path == "":
component_spec_path = "../../../../../kfp/kfp_ray_components/"

for task in pipeline_tasks:
task_name = task["name"]
task_pipeline_name = task["pipeline_name"]
task_image = task["image"]

tasks_steps_params.append(pipeline_definitions[task_name+"_step_parameters"])

# get the sub tasks input parameters
sub_workflows_parameters = ""
index = 0
for task_step_params in tasks_steps_params:
task_params = task_step_params.get(STEP_PARAMETERS)
task_name = pipeline_tasks[index]["name"]
sub_workflows_parameters += "\n\t# " + task_name + " step parameters"
sub_workflows_parameters += "\n\tp" + str(index + 3) + "_name: str = \"" + task_name + "\","
sub_workflows_parameters += get_generic_params(task_params, "p" + str(index + 3) + "_")
override_params = "\n\tp" + str(index + 3) + "_overriding_params: str = '{\"ray_worker_options\": {\"image\": \"' + " + task_name + "_image + '\"}, "
override_params += "\"ray_head_options\": {\"image\": \"' + " + task_name + "_image + '\"}}',"
sub_workflows_parameters += override_params

index += 1

sub_workflows_operations = ""
# build the op for the first sub workflow
task_name = pipeline_tasks[0]["name"]
task_op = " " + task_name + " = run_" + task_name + "_op(name=p1_orch_" + task_name + "_name, prefix=\"p3_\", params=args, host=orch_host, input_folder=p2_pipeline_input_parent_path)"
task_op += "\n _set_component(" + task_name + ', "' + task_name + '")'

sub_workflows_operations += task_op
i = 1
prefix_index = 4
for task in pipeline_tasks[1:]:
task_name = task["name"]
task_op = "\n " + task_name + " = run_" + task_name + "_op(name=p1_orch_" + task_name + "_name, prefix=\"p" + str(prefix_index) + '_", params=args, host=orch_host, input_folder=' + pipeline_tasks[i-1]["name"] + ".output)"
task_op += "\n _set_component(" + task_name + ', "' + task_name + '", ' + pipeline_tasks[i-1]["name"] + ")"
sub_workflows_operations += task_op
prefix_index += 1
i += 1

content = template.render(
superpipeline_name=pipeline_metadata[NAME],
superpipeline_description=pipeline_metadata[DESCRIPTION],
sub_workflows_components=pipeline_definitions[PIPELINE_TASKS],
component_spec_path=component_spec_path,
p1_parameters=pipeline_definitions[PIPELINE_TASKS],
add_p2_parameters=common_input_params,
sub_workflows_parameters=sub_workflows_parameters,
sub_workflows_operations=sub_workflows_operations,
)
output_file = f"{args.output_dir_file}/{pipeline_metadata[NAME]}_wf.py"
with open(output_file, mode="w", encoding="utf-8") as message:
message.write(content)
print(f"... wrote {output_file}")

import sys

from pre_commit.main import main

print(f"Pipeline ${output_file} auto generation completed")
# format the pipeline python file
args = ["run", "--file", f"{output_file}", "-c", PRE_COMMIT]
sys.exit(main(args))
Loading

0 comments on commit 4e3e8e9

Please sign in to comment.