Skip to content

Commit

Permalink
Separate validation logic out into a ValidatingAdmissionWebhook (#37)
Browse files Browse the repository at this point in the history
* webhook manifest

* minimally working validator

* dry run validation

* add test

* finish up validator and tests

* bugfix

* use httpstatus codes

* helm hooks

Co-authored-by: Aurick Qiao <[email protected]>
  • Loading branch information
aurickq and Aurick Qiao authored Oct 10, 2020
1 parent 69560ca commit d91a03e
Show file tree
Hide file tree
Showing 8 changed files with 370 additions and 34 deletions.
10 changes: 5 additions & 5 deletions helm/adaptdl-sched/templates/adaptdl-sched.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ spec:
spec:
shareProcessNamespace: true
serviceAccountName: adaptdl
{{- with .Values.tolerations }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- end }}
containers:
- name: main
image: {{ .Values.image.repository }}{{ empty .Values.image.digest | ternary ":" "@" }}{{ coalesce .Values.image.digest .Values.image.tag .Chart.AppVersion }}
Expand Down Expand Up @@ -57,7 +57,7 @@ spec:
containerPort: {{ .Values.supervisor.service.targetPort }}
protocol: TCP
imagePullPolicy: Always
{{- if .Values.image.pullSecrets }}
{{- with .Values.image.pullSecrets }}
imagePullSecrets:
{{ toYaml .Values.image.pullSecrets | indent 8 }}
{{- end -}}
{{ toYaml . | indent 8 }}
{{- end -}}
59 changes: 59 additions & 0 deletions helm/adaptdl-sched/templates/validator-deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Release.Name }}-validator
labels:
app: adaptdl-validator
release: {{ .Release.Name }}
spec:
replicas: 1
selector:
matchLabels:
app: adaptdl-validator
release: {{ .Release.Name }}
template:
metadata:
labels:
app: adaptdl-validator
release: {{ .Release.Name }}
spec:
volumes:
- name: tls
secret:
secretName: {{.Release.Name }}-validator
containers:
- name: server
image: {{ .Values.image.repository }}{{ empty .Values.image.digest | ternary ":" "@" }}{{ coalesce .Values.image.digest .Values.image.tag .Chart.AppVersion }}
imagePullPolicy: Always
command: ["python", "-m", "adaptdl_sched.validator"]
args:
- --host=0.0.0.0
- --port=8443
- --tls-crt=/mnt/tls.crt
- --tls-key=/mnt/tls.key
volumeMounts:
- name: tls
mountPath: /mnt
readOnly: true
ports:
- name: https
containerPort: 8443
livenessProbe:
httpGet:
path: /healthz
port: https
scheme: HTTPS
readinessProbe:
httpGet:
path: /healthz
port: https
scheme: HTTPS
{{- with .Values.image.pullSecrets }}
imagePullSecrets:
{{ toYaml . | indent 8 }}
{{- end -}}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

15 changes: 15 additions & 0 deletions helm/adaptdl-sched/templates/validator-service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: {{ .Release.Name }}-validator
labels:
app: adaptdl-validator
release: {{ .Release.Name }}
spec:
ports:
- port: 443
targetPort: https
name: https
selector:
app: adaptdl-validator
release: {{ .Release.Name }}
39 changes: 39 additions & 0 deletions helm/adaptdl-sched/templates/validator-webhook.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
apiVersion: v1
kind: Secret
type: kubernetes.io/tls
metadata:
name: {{ .Release.Name }}-validator
annotations:
"helm.sh/hook": "pre-install"
"helm.sh/hook-delete-policy": "before-hook-creation"
data:
{{- $altNames := list ( printf "%s-validator.%s" .Release.Name .Release.Namespace ) ( printf "%s-validator.%s.svc" .Release.Name .Release.Namespace ) -}}
{{- $cert := genSelfSignedCert ( printf "%s-validator" .Release.Name ) nil $altNames 3650 }}
tls.crt: {{ $cert.Cert | b64enc }}
tls.key: {{ $cert.Key | b64enc }}
---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
name: {{ .Release.Name }}-validator
labels:
app: {{ .Release.Name }}-validator
annotations:
"helm.sh/hook": "pre-install"
"helm.sh/hook-delete-policy": "before-hook-creation"
webhooks:
- name: {{ .Release.Name }}-validator.{{ .Release.Namespace }}.svc.cluster.local
clientConfig:
caBundle: {{ $cert.Cert | b64enc }}
service:
name: {{ .Release.Name }}-validator
namespace: {{ .Release.Namespace }}
path: "/validate"
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["adaptdl.petuum.com"]
apiVersions: ["v1"]
resources: ["adaptdljobs"]
admissionReviewVersions:
- v1
sideEffects: None
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
portpicker==1.3.1
torch==1.5
torchtext==0.5.0
pytest-aiohttp==0.3.0
34 changes: 5 additions & 29 deletions sched/adaptdl_sched/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,15 @@ async def _sync_job(self, namespace, job_name):
current_ts = datetime.now(timezone.utc)
job, pods = await self._get_job_and_pods(namespace, job_name)
if job is not None:
job = await self._validate_job(job, pods, current_ts)
job = await self._validate_pods(job, pods, current_ts)
if job is None: # Not Found, presumably was deleted.
await self._delete_pods(pods)
return
# Use ChainMap to record updates to the job status fields.
job["status"] = collections.ChainMap({}, job.get("status", {}))
# Get the current phase of the job, None if no phase was set.
allocation = job["status"].get("allocation", [])
# Use ChainMap to record updates to the job status fields.
job["status"] = collections.ChainMap({}, job["status"])
phase = job["status"]["phase"]
phase = job["status"].setdefault("phase", "Pending")
replicas = job["status"].get("replicas", 0)
preemptible = job["spec"].get("preemptible", True)
if (completion_status := self._detect_completion(pods, preemptible)):
Expand Down Expand Up @@ -206,34 +206,10 @@ async def _get_job_and_pods(self, namespace, name):
raise # Unexpected error.
return job, pods

async def _validate_job(self, job, pods, current_ts):
async def _validate_pods(self, job, pods, current_ts):
namespace = job["metadata"]["namespace"]
name = job["metadata"]["name"]
patch_status = {}
if not job.get("status"):
# Assuming empty status section means this is a newly created job.
JOB_SUBMISSION_COUNT.inc()
patch_status = {"phase": "Pending"}
# if maxReplicas is provided, it should be >= minReplicas
if job["spec"].get("maxReplicas", sys.maxsize) < \
job["spec"].get("minReplicas", 0):
patch_status["phase"] = "Failed"
patch_status["reason"] = "Invalid"
patch_status["message"] = "maxReplicas should be greater " \
"or equal to minReplicas in the " \
"job Spec"
# Validate pod template using a dry run.
template = {
"metadata": {"name": name, "namespace": namespace},
"template": job["spec"]["template"],
}
try:
await self._core_api.create_namespaced_pod_template(
namespace, template, dry_run="All")
except kubernetes.client.rest.ApiException as exc:
patch_status["phase"] = "Failed"
patch_status["reason"] = "Invalid"
patch_status["message"] = json.loads(exc.body).get("message")
# Validate pods for job.
group_list = []
replicas_list = []
Expand Down
133 changes: 133 additions & 0 deletions sched/adaptdl_sched/validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Copyright 2020 Petuum, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import json
import logging
import ssl
import sys

import kubernetes_asyncio as kubernetes

from aiohttp import web
from http import HTTPStatus

LOG = logging.getLogger(__name__)
LOG.setLevel(logging.INFO)


class Validator(object):
def __init__(self):
self._core_api = kubernetes.client.CoreV1Api()
self._app = web.Application()
self._app.add_routes([
web.get('/healthz', self._handle_healthz),
web.post('/validate', self._handle_validate),
])

def get_app(self):
return self._app

def run(self, host, port, ssl_context=None):
web.run_app(self.get_app(), host=host, port=port,
ssl_context=ssl_context)

async def _handle_healthz(self, request):
# Health check.
return web.Response()

async def _handle_validate(self, request):
request_json = await request.json()
if request_json["request"]["operation"] == "CREATE":
response = await self._validate_create(request_json["request"])
elif request_json["request"]["operation"] == "UPDATE":
response = await self._validate_update(request_json["request"])
else:
response = {"allowed": True}
LOG.info("%s %s/%s: %s",
request_json["request"]["operation"],
request_json["request"]["namespace"],
request_json["request"].get("name", "<none>"),
response)
response["uid"] = request_json["request"]["uid"]
return web.json_response({
"apiVersion": "admission.k8s.io/v1",
"kind": "AdmissionReview",
"response": response,
})

async def _validate_create(self, request):
job = request["object"]
namespace = request["namespace"]
template = {
"metadata": {"name": "spec.template", "namespace": namespace},
"template": job["spec"]["template"],
}
try:
await self._core_api.create_namespaced_pod_template(
namespace, template, dry_run="All")
except kubernetes.client.rest.ApiException as exc:
return {
"allowed": False,
"status": {
"code": HTTPStatus.UNPROCESSABLE_ENTITY,
"reason": "Invalid",
"message": json.loads(exc.body).get("message"),
}
}
# If maxReplicas is provided, it should be >= minReplicas
if job["spec"].get("maxReplicas", sys.maxsize) < \
job["spec"].get("minReplicas", 0):
return {
"allowed": False,
"status": {
"code": HTTPStatus.UNPROCESSABLE_ENTITY,
"reason": "Invalid",
"message": ("spec.maxReplicas must be greater "
"than or equal to spec.minReplicas")
}
}
return {"allowed": True}

async def _validate_update(self, request):
if request["object"]["spec"] != request["oldObject"]["spec"]:
return {
"allowed": False,
"status": {
"code": HTTPStatus.UNPROCESSABLE_ENTITY,
"reason": "Forbidden",
"message": "updates to job spec are forbidden",
}
}
return {"allowed": True}

if __name__ == "__main__":
logging.basicConfig()
kubernetes.config.load_incluster_config()

parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, default="0.0.0.0")
parser.add_argument("--port", type=int, default=8080)
parser.add_argument("--tls-crt", type=str)
parser.add_argument("--tls-key", type=str)
args = parser.parse_args()

if args.tls_crt and args.tls_key:
ssl_context = ssl.SSLContext()
ssl_context.load_cert_chain(args.tls_crt, args.tls_key)
else:
ssl_context = None

validator = Validator()
validator.run(args.host, args.port, ssl_context=ssl_context)
Loading

0 comments on commit d91a03e

Please sign in to comment.