Skip to content

Commit

Permalink
graphql for pure asset backfills (dagster-io#11379)
Browse files Browse the repository at this point in the history
branch-name: asset-backfill-graphql

### Summary & Motivation

Enables launching pure asset backfills via graphql.

Sits atop dagster-io#11378.

### How I Tested These Changes
  • Loading branch information
sryza authored Jan 4, 2023
1 parent 72654df commit 4b91684
Show file tree
Hide file tree
Showing 14 changed files with 396 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import {useHistory} from 'react-router-dom';
import {showCustomAlert} from '../app/CustomAlertProvider';
import {usePermissions} from '../app/Permissions';
import {PythonErrorInfo} from '../app/PythonErrorInfo';
import {displayNameForAssetKey} from '../asset-graph/Utils';
import {displayNameForAssetKey, isHiddenAssetGroupJob} from '../asset-graph/Utils';
import {PartitionHealthSummary} from '../assets/PartitionHealthSummary';
import {AssetKey} from '../assets/types';
import {
ConfigPartitionSelectionQueryQuery,
ConfigPartitionSelectionQueryQueryVariables,
LaunchBackfillParams,
LaunchPartitionBackfillMutation,
LaunchPartitionBackfillMutationVariables,
PartitionDefinitionForLaunchAssetFragment,
Expand Down Expand Up @@ -219,20 +220,26 @@ const LaunchAssetChoosePartitionsDialogBody: React.FC<Props> = ({
setOpen(false);
}
} else {
const selectorUnlessGraph:
| LaunchBackfillParams['selector']
| undefined = !isHiddenAssetGroupJob(assetJobName)
? {
partitionSetName: partitionSet.name,
repositorySelector: {
repositoryLocationName: repoAddress.location,
repositoryName: repoAddress.name,
},
}
: undefined;

const {data: launchBackfillData} = await client.mutate<
LaunchPartitionBackfillMutation,
LaunchPartitionBackfillMutationVariables
>({
mutation: LAUNCH_PARTITION_BACKFILL_MUTATION,
variables: {
backfillParams: {
selector: {
partitionSetName: partitionSet.name,
repositorySelector: {
repositoryLocationName: repoAddress.location,
repositoryName: repoAddress.name,
},
},
selector: selectorUnlessGraph,
assetSelection: assets.map((a) => ({path: a.assetKey.path})),
partitionNames: allSelected.map((k) => k.partitionKey),
fromFailure: false,
Expand Down
12 changes: 6 additions & 6 deletions js_modules/dagit/packages/core/src/graphql/graphql.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions js_modules/dagit/packages/core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from typing import TYPE_CHECKING, Any, List, Mapping, Union, cast

import pendulum

import dagster._check as check
from dagster._core.definitions.external_asset_graph import ExternalAssetGraph
from dagster._core.errors import DagsterError
from dagster._core.events import AssetKey
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
Expand All @@ -13,85 +16,124 @@
BACKFILL_CHUNK_SIZE = 25


@capture_error
def create_and_launch_partition_backfill(graphene_info, backfill_params):
if TYPE_CHECKING:
from ...schema.backfill import GrapheneLaunchBackfillSuccess
from ...schema.errors import GraphenePartitionSetNotFoundError

partition_set_selector = backfill_params.get("selector")
partition_set_name = partition_set_selector.get("partitionSetName")
repository_selector = RepositorySelector.from_graphql_input(
partition_set_selector.get("repositorySelector")
)
location = graphene_info.context.get_repository_location(repository_selector.location_name)
repository = location.get_repository(repository_selector.repository_name)
matches = [
partition_set
for partition_set in repository.get_external_partition_sets()
if partition_set.name == partition_set_selector.get("partitionSetName")
]
if not matches:
return GraphenePartitionSetNotFoundError(partition_set_name)

check.invariant(
len(matches) == 1,
"Partition set names must be unique: found {num} matches for {partition_set_name}".format(
num=len(matches), partition_set_name=partition_set_name
),
)

external_partition_set = next(iter(matches))

if backfill_params.get("allPartitions"):
result = graphene_info.context.get_external_partition_names(external_partition_set)
partition_names = result.partition_names
elif backfill_params.get("partitionNames"):
partition_names = backfill_params.get("partitionNames")
else:
raise DagsterError(
'Backfill requested without specifying either "allPartitions" or "partitionNames" '
"arguments"
)
@capture_error
def create_and_launch_partition_backfill(
graphene_info, backfill_params: Mapping[str, Any]
) -> Union["GrapheneLaunchBackfillSuccess", "GraphenePartitionSetNotFoundError"]:
from ...schema.backfill import GrapheneLaunchBackfillSuccess
from ...schema.errors import GraphenePartitionSetNotFoundError

backfill_id = make_new_backfill_id()
backfill = PartitionBackfill(
backfill_id=backfill_id,
partition_set_origin=external_partition_set.get_external_origin(),
status=BulkActionStatus.REQUESTED,
partition_names=partition_names,
from_failure=bool(backfill_params.get("fromFailure")),
reexecution_steps=backfill_params.get("reexecutionSteps"),
tags={t["key"]: t["value"] for t in backfill_params.get("tags", [])},
backfill_timestamp=pendulum.now("UTC").timestamp(),
asset_selection=[
AssetKey.from_graphql_input(asset_key)
for asset_key in backfill_params.get("assetSelection")

asset_selection = (
[
cast(AssetKey, AssetKey.from_graphql_input(asset_key))
for asset_key in backfill_params["assetSelection"]
]
if backfill_params.get("assetSelection")
else None,
else None
)

if backfill_params.get("forceSynchronousSubmission"):
# should only be used in a test situation
to_submit = [name for name in partition_names]
submitted_run_ids = []

while to_submit:
chunk = to_submit[:BACKFILL_CHUNK_SIZE]
to_submit = to_submit[BACKFILL_CHUNK_SIZE:]
submitted_run_ids.extend(
run_id
for run_id in submit_backfill_runs(
graphene_info.context.instance,
workspace=graphene_info.context,
repo_location=location,
backfill_job=backfill,
partition_names=chunk,
tags = {t["key"]: t["value"] for t in backfill_params.get("tags", [])}
backfill_timestamp = pendulum.now("UTC").timestamp()

if backfill_params.get("selector") is not None: # job backfill
partition_set_selector = backfill_params["selector"]
partition_set_name = partition_set_selector.get("partitionSetName")
repository_selector = RepositorySelector.from_graphql_input(
partition_set_selector.get("repositorySelector")
)
location = graphene_info.context.get_repository_location(repository_selector.location_name)
repository = location.get_repository(repository_selector.repository_name)
matches = [
partition_set
for partition_set in repository.get_external_partition_sets()
if partition_set.name == partition_set_selector.get("partitionSetName")
]
if not matches:
return GraphenePartitionSetNotFoundError(partition_set_name)

check.invariant(
len(matches) == 1,
"Partition set names must be unique: found {num} matches for {partition_set_name}".format(
num=len(matches), partition_set_name=partition_set_name
),
)
external_partition_set = next(iter(matches))

if backfill_params.get("allPartitions"):
result = graphene_info.context.get_external_partition_names(external_partition_set)
partition_names = result.partition_names
elif backfill_params.get("partitionNames"):
partition_names = backfill_params["partitionNames"]
else:
raise DagsterError(
'Backfill requested without specifying either "allPartitions" or "partitionNames" '
"arguments"
)

backfill = PartitionBackfill(
backfill_id=backfill_id,
partition_set_origin=external_partition_set.get_external_origin(),
status=BulkActionStatus.REQUESTED,
partition_names=partition_names,
from_failure=bool(backfill_params.get("fromFailure")),
reexecution_steps=backfill_params.get("reexecutionSteps"),
tags=tags,
backfill_timestamp=backfill_timestamp,
asset_selection=asset_selection,
)

if backfill_params.get("forceSynchronousSubmission"):
# should only be used in a test situation
to_submit = [name for name in partition_names]
submitted_run_ids: List[str] = []

while to_submit:
chunk = to_submit[:BACKFILL_CHUNK_SIZE]
to_submit = to_submit[BACKFILL_CHUNK_SIZE:]
submitted_run_ids.extend(
run_id
for run_id in submit_backfill_runs(
graphene_info.context.instance,
workspace=graphene_info.context,
repo_location=location,
backfill_job=backfill,
partition_names=chunk,
)
if run_id is not None
)
if run_id != None
return GrapheneLaunchBackfillSuccess(
backfill_id=backfill_id, launched_run_ids=submitted_run_ids
)
elif asset_selection is not None: # pure asset backfill
if backfill_params.get("forceSynchronousSubmission"):
raise DagsterError(
"forceSynchronousSubmission is not supported for pure asset backfills"
)
return GrapheneLaunchBackfillSuccess(
backfill_id=backfill_id, launched_run_ids=submitted_run_ids

if backfill_params.get("fromFailure"):
raise DagsterError("fromFailure is not supported for pure asset backfills")

if backfill_params.get("allPartitions"):
raise DagsterError("allPartitions is not supported for pure asset backfills")

backfill = PartitionBackfill.from_asset_partitions(
asset_graph=ExternalAssetGraph.from_workspace(graphene_info.context),
backfill_id=backfill_id,
tags=tags,
backfill_timestamp=backfill_timestamp,
asset_selection=asset_selection,
partition_names=backfill_params["partitionNames"],
)
else:
raise DagsterError(
"Backfill requested without specifying partition set selector or asset selection"
)

graphene_info.context.instance.add_backfill(backfill)
Expand Down
Loading

0 comments on commit 4b91684

Please sign in to comment.