Skip to content

Commit

Permalink
provide final bugfix for databrickslabs#136
Browse files Browse the repository at this point in the history
  • Loading branch information
renardeinside committed Jan 26, 2022
1 parent 8ff5773 commit 74e828e
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed
- Provided bugfix for emoji-based messages in certain shell environments
- Provided bugfix for cases when not all jobs are listed due to usage of Jobs API 2.1

----
> Unreleased changes must be tracked above this line.
Expand Down
15 changes: 4 additions & 11 deletions dbx/commands/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
get_deployment_config,
_preprocess_cluster_args, # noqa
)
from dbx.utils.job_listing import find_job_by_name
from dbx.utils.policy_parser import PolicyParser


Expand Down Expand Up @@ -450,20 +451,12 @@ def _create_jobs(jobs: List[Dict[str, Any]], api_client: ApiClient) -> Dict[str,
for job in jobs:
dbx_echo(f'Processing deployment for job: {job["name"]}')
jobs_service = JobsService(api_client)
all_jobs = jobs_service.list_jobs().get("jobs", [])
matching_jobs = [j for j in all_jobs if j["settings"]["name"] == job["name"]]
matching_job = find_job_by_name(jobs_service, job["name"])

if not matching_jobs:
if not matching_job:
job_id = _create_job(api_client, job)
else:

if len(matching_jobs) > 1:
raise Exception(
f"""There are more than one jobs with name {job["name"]}.
Please delete duplicated jobs first"""
)

job_id = matching_jobs[0]["job_id"]
job_id = matching_job["job_id"]
_update_job(jobs_service, job_id, job)

deployment_data[job["name"]] = job_id
Expand Down
12 changes: 3 additions & 9 deletions dbx/commands/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
InfoFile,
get_current_branch_name,
)
from dbx.utils.job_listing import find_job_by_name

TERMINAL_RUN_LIFECYCLE_STATES = ["TERMINATED", "SKIPPED", "INTERNAL_ERROR"]
POSSIBLE_TASK_KEYS = ["notebook_task", "spark_jar_task", "spark_python_task", "spark_submit_task"]
Expand Down Expand Up @@ -313,18 +314,11 @@ def __init__(
def launch(self) -> Tuple[Dict[Any, Any], Optional[str]]:
dbx_echo("Launching job via run now API")
jobs_service = JobsService(self.api_client)
job_data = find_job_by_name(jobs_service, self.job)

all_jobs = jobs_service.list_jobs().get("jobs", [])

matching_jobs = [j for j in all_jobs if j["settings"]["name"] == self.job]

if not matching_jobs:
if not job_data:
raise Exception(f"Job with name {self.job} not found")

if len(matching_jobs) > 1:
raise Exception(f"Job with name {self.job} is duplicated. Please make job name unique.")

job_data = matching_jobs[0]
job_id = job_data["job_id"]

active_runs = jobs_service.list_runs(job_id, active_only=True).get("runs", [])
Expand Down
26 changes: 26 additions & 0 deletions dbx/utils/job_listing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import List, Dict, Any, Optional

from databricks_cli.sdk.service import JobsService


def list_all_jobs(js: JobsService) -> List[Dict[str, Any]]:
all_jobs = js.list_jobs(
version="2.0"
) # version 2.0 is expected to list all jobs without iterations over limit/offset
return all_jobs.get("jobs", [])


def find_job_by_name(js: JobsService, job_name: str) -> Optional[Dict[str, Any]]:
all_jobs = list_all_jobs(js)
matching_jobs = [j for j in all_jobs if j["settings"]["name"] == job_name]

if len(matching_jobs) > 1:
raise Exception(
f"""There are more than one jobs with name {job_name}.
Please delete duplicated jobs first"""
)

if not matching_jobs:
return None
else:
return matching_jobs[0]

0 comments on commit 74e828e

Please sign in to comment.