Skip to content

Commit

Permalink
Allow grpc servers to be shut down over graphql
Browse files Browse the repository at this point in the history
Summary: For servers you run themselves, this gives you a way to signal over graphql that they should be shut down (and in k8s or docker-compose with the right configuration, spun back up)

Test Plan: BK (adding coverage now)

Reviewers: alangenfeld, johann, sashank, max

Reviewed By: johann

Differential Revision: https://dagster.phacility.com/D8508
  • Loading branch information
gibsondan committed Jun 24, 2021
1 parent 4bbdc40 commit 0f8b285
Show file tree
Hide file tree
Showing 20 changed files with 278 additions and 7 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ install_dev_python_modules_verbose:
python scripts/install_dev_python_modules.py

graphql:
cd js_modules/dagit/; make generate-types
cd js_modules/dagit/; make generate-graphql

sanity_check:
#NOTE: fails on nonPOSIX-compliant shells (e.g. CMD, powershell)
Expand Down
26 changes: 26 additions & 0 deletions docs/content/concepts/dagit/graphql-client.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ Note that all GraphQL methods on the API are not yet available in Python - the `
object="DagsterGraphQLClient"
method="reload_repository_location"
/>
- <PyObject
module="dagster_graphql"
object="DagsterGraphQLClient"
method="shutdown_repository_location"
/>

## Using the GraphQL Client

Expand Down Expand Up @@ -136,6 +141,27 @@ except DagsterGraphQLClientError as exc:
raise exc
```

### Shutting down a Repository Location Server

If you're running your own gRPC server, we generally recommend updating your repository code by building a new Docker image with a new tag and redeploying your server using that new image, but sometimes you may want to restart your server without changing the image (for example, if your pipeline definitions are generated programatically from a database, and you want to restart the server and re-generate your repositories even though the underlying Python code hasn't changed). In these situations, `reload_repository_location` is insufficient, since it refreshes Dagit's information about the repositories but doesn't actually restart the server or reload the repository definition.

One way to cause your server to restart and your repositories to be reloaded is to run your server in an environment like Kubernetes that automatically restarts services when they fail (or docker-compose with `restart: always` set on the service), and then use the `shutdown_repository_location` function on the GraphQL client to shut down the server. The server will then be restarted by your environment, which will be automatically detected by Dagit.

Example usage:

```python file=/concepts/dagit/graphql/client_example.py startafter=start_shutdown_repo_location_marker endbefore=end_shutdown_repo_location_marker
from dagster_graphql import (
ShutdownRepositoryLocationInfo,
ShutdownRepositoryLocationStatus,
)

shutdown_info: ShutdownRepositoryLocationInfo = client.shutdown_repository_location(REPO_NAME)
if shutdown_info.status == ShutdownRepositoryLocationStatus.SUCCESS:
do_something_on_success()
else:
raise Exception(f"Repository location shutdown failed: {shutdown_info.message}")
```

#### Repository Location and Repository Inference

Note that specifying the repository location name and repository name are not always necessary; the GraphQL client will infer the repository name and repository location name if the pipeline name is unique.
Expand Down
1 change: 1 addition & 0 deletions examples/deploy_docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ services:
dockerfile: ./Dockerfile_pipelines
container_name: docker_example_pipelines
image: docker_example_pipelines_image
restart: always
environment:
DAGSTER_POSTGRES_USER: "postgres_user"
DAGSTER_POSTGRES_PASSWORD: "postgres_password"
Expand Down
1 change: 1 addition & 0 deletions examples/deploy_docker/from_source/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ services:
dockerfile: ./Dockerfile_pipelines
container_name: docker_example_pipelines
image: docker_example_pipelines_image
restart: always
environment:
DAGSTER_POSTGRES_USER: "postgres_user"
DAGSTER_POSTGRES_PASSWORD: "postgres_password"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,16 @@ def do_something_with_exc(some_exception): # pylint: disable=W0613
f"{reload_info.failure_type} error: {reload_info.message}"
)
# end_reload_repo_location_marker

# start_shutdown_repo_location_marker
from dagster_graphql import (
ShutdownRepositoryLocationInfo,
ShutdownRepositoryLocationStatus,
)

shutdown_info: ShutdownRepositoryLocationInfo = client.shutdown_repository_location(REPO_NAME)
if shutdown_info.status == ShutdownRepositoryLocationStatus.SUCCESS:
do_something_on_success()
else:
raise Exception(f"Repository location shutdown failed: {shutdown_info.message}")
# end_shutdown_repo_location_marker

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions js_modules/dagit/packages/core/src/graphql/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -1397,6 +1397,7 @@ type Mutation {
deletePipelineRun(runId: String!): DeletePipelineRunResult!
reloadRepositoryLocation(repositoryLocationName: String!): ReloadRepositoryLocationMutationResult!
reloadWorkspace: ReloadWorkspaceMutationResult!
shutdownRepositoryLocation(repositoryLocationName: String!): ShutdownRepositoryLocationMutationResult!
wipeAssets(assetKeys: [AssetKeyInput!]!): AssetWipeMutationResult!
launchPartitionBackfill(backfillParams: LaunchBackfillParams!): LaunchBackfillResult!
resumePartitionBackfill(backfillId: String!): ResumeBackfillResult!
Expand Down Expand Up @@ -1501,6 +1502,12 @@ type RepositoryLocationNotFound implements Error {

union ReloadWorkspaceMutationResult = Workspace | ReadOnlyError | PythonError

union ShutdownRepositoryLocationMutationResult = ShutdownRepositoryLocationSuccess | RepositoryLocationNotFound | ReadOnlyError | PythonError

type ShutdownRepositoryLocationSuccess {
repositoryLocationName: String!
}

union AssetWipeMutationResult = AssetNotFoundError | ReadOnlyError | PythonError | AssetWipeSuccess

type AssetWipeSuccess {
Expand Down Expand Up @@ -1730,6 +1737,10 @@ type ReloadWorkspaceMutation {
Output: ReloadWorkspaceMutationResult!
}

type ShutdownRepositoryLocationMutation {
Output: ShutdownRepositoryLocationMutationResult!
}

type TerminatePipelineExecutionMutation {
Output: TerminatePipelineExecutionResult!
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def check():
"Missing some query history (sub)directories:"
f"\n\t{missing_query_history_subdirs}"
+ f"\n\t at {legacy_query_info.directory}"
+ "\n\t Please run `dagster-graphql-client query snapshot` on the command line"
+ "\n\t Please run `dagster-graphql-client query snapshot` on the command line "
+ "or manually resolve these issues"
)
for query_name in query_directories_present:
Expand Down Expand Up @@ -108,7 +108,9 @@ def snapshot():
with open(os.path.join(query_dir, most_recent_query_filename), "r") as f:
most_recent_query = f.read()

if most_recent_query is not None and not are_queries_compatible(
# Create a new snapshot if it's the first one or the query is not compatible
# with the most recent one
if most_recent_query is None or not are_queries_compatible(
current_query_body, most_recent_query
):
query_filename = serialize_to_query_filename(
Expand Down
4 changes: 4 additions & 0 deletions python_modules/dagster-graphql/dagster_graphql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
InvalidOutputErrorInfo,
ReloadRepositoryLocationInfo,
ReloadRepositoryLocationStatus,
ShutdownRepositoryLocationInfo,
ShutdownRepositoryLocationStatus,
)
from .version import __version__

Expand All @@ -17,4 +19,6 @@
"InvalidOutputErrorInfo",
"ReloadRepositoryLocationInfo",
"ReloadRepositoryLocationStatus",
"ShutdownRepositoryLocationInfo",
"ShutdownRepositoryLocationStatus",
]
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
InvalidOutputErrorInfo,
ReloadRepositoryLocationInfo,
ReloadRepositoryLocationStatus,
ShutdownRepositoryLocationInfo,
ShutdownRepositoryLocationStatus,
)

__all__ = [
"DagsterGraphQLClient",
"DagsterGraphQLClientError",
"ReloadRepositoryLocationInfo",
"ReloadRepositoryLocationStatus",
"ShutdownRepositoryLocationInfo",
"ShutdownRepositoryLocationStatus",
]
40 changes: 40 additions & 0 deletions python_modules/dagster-graphql/dagster_graphql/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
CLIENT_SUBMIT_PIPELINE_RUN_MUTATION,
GET_PIPELINE_RUN_STATUS_QUERY,
RELOAD_REPOSITORY_LOCATION_MUTATION,
SHUTDOWN_REPOSITORY_LOCATION_MUTATION,
)
from .utils import (
DagsterGraphQLClientError,
InvalidOutputErrorInfo,
PipelineInfo,
ReloadRepositoryLocationInfo,
ReloadRepositoryLocationStatus,
ShutdownRepositoryLocationInfo,
ShutdownRepositoryLocationStatus,
)


Expand Down Expand Up @@ -296,3 +299,40 @@ def reload_repository_location(
failure_type=query_result_type,
message=query_result["message"],
)

def shutdown_repository_location(
self, repository_location_name: str
) -> ShutdownRepositoryLocationInfo:
"""Shuts down the server that is serving metadata for the provided repository location.
This is primarily useful when you want the server to be restarted by the compute environment
in which it is running (for example, in Kubernetes, the pod in which the server is running
will automatically restart when the server is shut down, and the repository metadata will
be reloaded)
Args:
repository_location_name (str): The name of the repository location
Returns:
ShutdownRepositoryLocationInfo: Object with information about the result of the reload request
"""
check.str_param(repository_location_name, "repository_location_name")

res_data: Dict[str, Dict[str, Any]] = self._execute(
SHUTDOWN_REPOSITORY_LOCATION_MUTATION,
{"repositoryLocationName": repository_location_name},
)

query_result: Dict[str, Any] = res_data["shutdownRepositoryLocation"]
query_result_type: str = query_result["__typename"]
if query_result_type == "ShutdownRepositoryLocationSuccess":
return ShutdownRepositoryLocationInfo(status=ShutdownRepositoryLocationStatus.SUCCESS)
elif (
query_result_type == "RepositoryLocationNotFound" or query_result_type == "PythonError"
):
return ShutdownRepositoryLocationInfo(
status=ShutdownRepositoryLocationStatus.FAILURE,
message=query_result["message"],
)
else:
raise Exception(f"Unexpected query result type {query_result_type}")
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,17 @@
}
}
"""

SHUTDOWN_REPOSITORY_LOCATION_MUTATION = """
mutation ($repositoryLocationName: String!) {
shutdownRepositoryLocation(repositoryLocationName: $repositoryLocationName) {
__typename
... on PythonError {
message
}
... on RepositoryLocationNotFound {
message
}
}
}
"""
19 changes: 19 additions & 0 deletions python_modules/dagster-graphql/dagster_graphql/client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ class ReloadRepositoryLocationStatus(Enum):
FAILURE = "FAILURE"


class ShutdownRepositoryLocationStatus(Enum):
SUCCESS = "SUCCESS"
FAILURE = "FAILURE"


class ReloadRepositoryLocationInfo(NamedTuple):
"""This class gives information about the result of reloading
a Dagster repository location with a GraphQL mutation.
Expand All @@ -37,6 +42,20 @@ class ReloadRepositoryLocationInfo(NamedTuple):
message: Optional[str] = None


class ShutdownRepositoryLocationInfo(NamedTuple):
"""This class gives information about the result of shutting down the server for
a Dagster repository location using a GraphQL mutation.
Args:
status (ShutdownRepositoryLocationStatus) Whether the shutdown succeeded or failed.
message (Optional[str], optional): the failure message/reason if
`status == ShutdownRepositoryLocationStatus.FAILURE`. Defaults to None.
"""

status: ShutdownRepositoryLocationStatus
message: Optional[str] = None


class PipelineInfo(NamedTuple):
repository_location_name: str
repository_name: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ def types():
GrapheneReloadRepositoryLocationMutationResult,
GrapheneReloadWorkspaceMutation,
GrapheneReloadWorkspaceMutationResult,
GrapheneShutdownRepositoryLocationMutation,
GrapheneShutdownRepositoryLocationMutationResult,
GrapheneTerminatePipelineExecutionFailure,
GrapheneTerminatePipelineExecutionMutation,
GrapheneTerminatePipelineExecutionResult,
Expand All @@ -36,6 +38,8 @@ def types():
GrapheneReloadRepositoryLocationMutationResult,
GrapheneReloadWorkspaceMutation,
GrapheneReloadWorkspaceMutationResult,
GrapheneShutdownRepositoryLocationMutation,
GrapheneShutdownRepositoryLocationMutationResult,
GrapheneTerminatePipelineExecutionFailure,
GrapheneTerminatePipelineExecutionMutation,
GrapheneTerminatePipelineExecutionResult,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,24 @@ class Meta:
name = "ReloadRepositoryLocationMutationResult"


class GrapheneShutdownRepositoryLocationSuccess(graphene.ObjectType):
repositoryLocationName = graphene.NonNull(graphene.String)

class Meta:
name = "ShutdownRepositoryLocationSuccess"


class GrapheneShutdownRepositoryLocationMutationResult(graphene.Union):
class Meta:
types = (
GrapheneShutdownRepositoryLocationSuccess,
GrapheneRepositoryLocationNotFound,
GrapheneReadOnlyError,
GraphenePythonError,
)
name = "ShutdownRepositoryLocationMutationResult"


class GrapheneReloadRepositoryLocationMutation(graphene.Mutation):
Output = graphene.NonNull(GrapheneReloadRepositoryLocationMutationResult)

Expand All @@ -343,9 +361,7 @@ class Meta:
def mutate(self, graphene_info, **kwargs):
location_name = kwargs["repositoryLocationName"]

if not graphene_info.context.has_repository_location(
location_name
) and not graphene_info.context.has_repository_location_error(location_name):
if not graphene_info.context.has_repository_location_name(location_name):
return GrapheneRepositoryLocationNotFound(location_name)

if not graphene_info.context.is_reload_supported(location_name):
Expand All @@ -360,6 +376,30 @@ def mutate(self, graphene_info, **kwargs):
return GrapheneWorkspaceLocationEntry(new_context.workspace_snapshot[location_name])


class GrapheneShutdownRepositoryLocationMutation(graphene.Mutation):
Output = graphene.NonNull(GrapheneShutdownRepositoryLocationMutationResult)

class Arguments:
repositoryLocationName = graphene.NonNull(graphene.String)

class Meta:
name = "ShutdownRepositoryLocationMutation"

@capture_error
@check_read_only
def mutate(self, graphene_info, **kwargs):
location_name = kwargs["repositoryLocationName"]

if not graphene_info.context.has_repository_location_name(location_name):
return GrapheneRepositoryLocationNotFound(location_name)

if not graphene_info.context.is_shutdown_supported(location_name):
raise Exception(f"Location {location_name} does not support shutting down via GraphQL")

graphene_info.context.shutdown_repository_location(location_name)
return GrapheneShutdownRepositoryLocationSuccess(repositoryLocationName=location_name)


class GrapheneReloadWorkspaceMutationResult(graphene.Union):
class Meta:
types = (
Expand Down Expand Up @@ -431,6 +471,7 @@ class GrapheneMutation(graphene.ObjectType):
delete_pipeline_run = GrapheneDeleteRunMutation.Field()
reload_repository_location = GrapheneReloadRepositoryLocationMutation.Field()
reload_workspace = GrapheneReloadWorkspaceMutation.Field()
shutdown_repository_location = GrapheneShutdownRepositoryLocationMutation.Field()
wipe_assets = GrapheneAssetWipeMutation.Field()
launch_partition_backfill = GrapheneLaunchBackfillMutation.Field()
resume_partition_backfill = GrapheneResumeBackfillMutation.Field()
Expand Down
Loading

0 comments on commit 0f8b285

Please sign in to comment.