Skip to content

Commit

Permalink
Merge pull request #627 from revit13/taint
Browse files Browse the repository at this point in the history
Add nodes toleration to Ray pods
  • Loading branch information
roytman authored Sep 27, 2024
2 parents 49ebd51 + daca127 commit 9c09652
Show file tree
Hide file tree
Showing 29 changed files with 79 additions and 27 deletions.
2 changes: 2 additions & 0 deletions kfp/doc/simple_transform_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ The parameters used here are as follows:
* memory - memory
* image - image to use
* image_pull_secret - image pull secret
* tolerations - (optional) tolerations for the ray pods
* ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
* replicas - number of replicas to create
* max_replicas - max number of replicas
Expand All @@ -119,6 +120,7 @@ The parameters used here are as follows:
* memory - memory
* image - image to use
* image_pull_secret - image pull secret
* tolerations - (optional) tolerations for the ray pods
* server_url - server url
* additional_params: additional (support) parameters, containing the following:
* wait_interval - wait interval for API server, sec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
ClusterSpec,
HeadNodeSpec,
RayJobRequest,
Template,
WorkerNodeSpec,
environment_variables_decoder,
template_decoder,
volume_decoder,
)
from ray.job_submission import JobStatus
Expand Down Expand Up @@ -121,41 +121,37 @@ def create_ray_cluster(
"""
# start with templates
# head_node
cpus = head_node.get("cpu", 1)
memory = head_node.get("memory", 1)
gpus = head_node.get("gpu", 0)
accelerator = head_node.get("gpu_accelerator", None)
dct = {}
dct["cpu"] = head_node.get("cpu", 1)
dct["memory"] = head_node.get("memory", 1)
dct["gpu"] = head_node.get("gpu", 0)
dct["gpu_accelerator"] = head_node.get("gpu_accelerator", None)
head_node_template_name = f"{name}-head-template"
_, _ = self.api_server_client.delete_compute_template(ns=namespace, name=head_node_template_name)
head_template = Template(
name=head_node_template_name,
namespace=namespace,
cpu=cpus,
memory=memory,
gpu=gpus,
gpu_accelerator=accelerator,
)
dct["name"] = head_node_template_name
dct["namespace"] = namespace
if "tolerations" in head_node:
dct["tolerations"] = head_node.get("tolerations")
_, _ = self.api_server_client.delete_compute_template(ns=namespace, name=dct["name"])
head_template = template_decoder(dct)
status, error = self.api_server_client.create_compute_template(head_template)
if status != 200:
return status, error
worker_template_names = [""] * len(worker_nodes)
index = 0
# For every worker group
for worker_node in worker_nodes:
cpus = worker_node.get("cpu", 1)
memory = worker_node.get("memory", 1)
gpus = worker_node.get("gpu", 0)
accelerator = worker_node.get("gpu_accelerator", None)
dct = {}
dct["cpu"] = worker_node.get("cpu", 1)
dct["memory"] = worker_node.get("memory", 1)
dct["gpu"] = worker_node.get("gpu", 0)
dct["gpu_accelerator"] = worker_node.get("gpu_accelerator", None)
worker_node_template_name = f"{name}-worker-template-{index}"
_, _ = self.api_server_client.delete_compute_template(ns=namespace, name=worker_node_template_name)
worker_template = Template(
name=worker_node_template_name,
namespace=namespace,
cpu=cpus,
memory=memory,
gpu=gpus,
gpu_accelerator=accelerator,
)
dct["name"] = worker_node_template_name
dct["namespace"] = namespace
if "tolerations" in worker_node:
dct["tolerations"] = worker_node.get("tolerations")
_, _ = self.api_server_client.delete_compute_template(ns=namespace, name=dct["name"])
worker_template = template_decoder(dct)
status, error = self.api_server_client.create_compute_template(worker_template)
if status != 200:
return status, error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def {{ pipeline_name }}(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -143,6 +144,7 @@ def {{ pipeline_name }}(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/code/code2parquet/kfp_ray/code2parquet_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def code2parquet(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -146,6 +147,7 @@ def code2parquet(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/code/code_quality/kfp_ray/code_quality_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def code_quality(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -143,6 +144,7 @@ def code_quality(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def header_cleanser(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -140,6 +141,7 @@ def header_cleanser(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/code/malware/kfp_ray/malware_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def malware(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -133,6 +134,7 @@ def malware(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/code/proglang_select/kfp_ray/proglang_select_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def lang_select(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -136,6 +137,7 @@ def lang_select(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def repo_level_order(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -163,6 +164,7 @@ def repo_level_order(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def doc_chunk(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -139,6 +140,7 @@ def doc_chunk(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/language/doc_chunk/kfp_ray/doc_chunk_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def doc_chunk(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -140,6 +141,7 @@ def doc_chunk(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def doc_quality(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -136,6 +137,7 @@ def doc_quality(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/language/doc_quality/kfp_ray/doc_quality_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def doc_quality(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -136,6 +137,7 @@ def doc_quality(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/language/lang_id/kfp_ray/lang_id_multiple_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ def lang_id(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -145,6 +146,7 @@ def lang_id(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/language/lang_id/kfp_ray/lang_id_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def lang_id(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -146,6 +147,7 @@ def lang_id(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def pdf2parquet(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -133,6 +134,7 @@ def pdf2parquet(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/language/pdf2parquet/kfp_ray/pdf2parquet_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def pdf2parquet(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -137,6 +138,7 @@ def pdf2parquet(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/language/pii_redactor/kfp_ray/pii_redactor_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def pii_redactor(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -131,6 +132,7 @@ def pii_redactor(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def text_encoder(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -136,6 +137,7 @@ def text_encoder(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/language/text_encoder/kfp_ray/text_encoder_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def text_encoder(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -137,6 +138,7 @@ def text_encoder(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/universal/doc_id/kfp_ray/doc_id_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def doc_id(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -147,6 +148,7 @@ def doc_id(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
Loading

0 comments on commit 9c09652

Please sign in to comment.