Skip to content

Commit

Permalink
gql for pausing asset daemon (dagster-io#13560)
Browse files Browse the repository at this point in the history
Accidentally landed dagster-io#13551 in
to the upstream branch
  • Loading branch information
johannkm authored Apr 13, 2023
1 parent 1d32209 commit 1cf6c4f
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 6 deletions.
2 changes: 2 additions & 0 deletions js_modules/dagit/packages/core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions js_modules/dagit/packages/core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dagster._core.instance import DagsterInstance
from dagster._core.launcher.base import RunLauncher
from dagster._core.storage.captured_log_manager import CapturedLogManager
from dagster._daemon.asset_daemon import get_auto_materialize_paused
from dagster._daemon.types import DaemonStatus

from .errors import GraphenePythonError
Expand Down Expand Up @@ -95,6 +96,7 @@ class GrapheneInstance(graphene.ObjectType):
daemonHealth = graphene.NonNull(GrapheneDaemonHealth)
hasInfo = graphene.NonNull(graphene.Boolean)
hasCapturedLogManager = graphene.NonNull(graphene.Boolean)
autoMaterializePaused = graphene.NonNull(graphene.Boolean)

class Meta:
name = "Instance"
Expand Down Expand Up @@ -129,3 +131,6 @@ def resolve_daemonHealth(self, _graphene_info: ResolveInfo):

def resolve_hasCapturedLogManager(self, _graphene_info: ResolveInfo):
return isinstance(self._instance.compute_log_manager, CapturedLogManager)

def resolve_autoMaterializePaused(self, _graphene_info: ResolveInfo):
return get_auto_materialize_paused(self._instance)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dagster._core.definitions.events import AssetKey
from dagster._core.nux import get_has_seen_nux, set_nux_seen
from dagster._core.workspace.permissions import Permissions
from dagster._daemon.asset_daemon import set_auto_materialize_paused

from dagster_graphql.implementation.execution.backfill import (
cancel_partition_backfill,
Expand Down Expand Up @@ -694,6 +695,23 @@ def mutate(self, _graphene_info):
return get_has_seen_nux()


class GrapheneSetAutoMaterializePausedMutation(graphene.Mutation):
"""Toggle asset auto materializing on or off."""

Output = graphene.NonNull(graphene.Boolean)

class Meta:
name = "SetAutoMaterializedPausedMutation"

class Arguments:
paused = graphene.Argument(graphene.NonNull(graphene.Boolean))

@capture_error
def mutate(self, graphene_info, paused: bool):
set_auto_materialize_paused(graphene_info.context.instance, paused)
return paused


class GrapheneDagitMutation(graphene.ObjectType):
"""The root for all mutations to modify data in your Dagster instance."""

Expand Down Expand Up @@ -725,3 +743,4 @@ class Meta:
log_telemetry = GrapheneLogTelemetryMutation.Field()
set_nux_seen = GrapheneSetNuxSeenMutation.Field()
add_dynamic_partition = GrapheneAddDynamicPartitionMutation.Field()
setAutoMaterializePaused = GrapheneSetAutoMaterializePausedMutation.Field()
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from dagster_graphql.test.utils import execute_dagster_graphql

from dagster_graphql_tests.graphql.graphql_context_test_suite import (
ExecutingGraphQLContextTestMatrix,
)

MUTATION = """
mutation SetAutoMaterializePausedMutation($paused: Boolean!) {
setAutoMaterializePaused(paused: $paused)
}
"""

QUERY = """
query GetAutoMaterializePausedQuery {
instance {
autoMaterializePaused
}
}
"""


class TestDaemonHealth(ExecutingGraphQLContextTestMatrix):
def test_get_individual_daemons(self, graphql_context):
results = execute_dagster_graphql(graphql_context, QUERY)
assert results.data == {
"instance": {
"autoMaterializePaused": False,
}
}

results = execute_dagster_graphql(graphql_context, MUTATION, variables={"paused": True})
assert results.data == {
"setAutoMaterializePaused": True,
}

results = execute_dagster_graphql(graphql_context, QUERY)
assert results.data == {
"instance": {
"autoMaterializePaused": True,
}
}

results = execute_dagster_graphql(graphql_context, MUTATION, variables={"paused": False})
assert results.data == {
"setAutoMaterializePaused": False,
}

results = execute_dagster_graphql(graphql_context, QUERY)
assert results.data == {
"instance": {
"autoMaterializePaused": False,
}
}
35 changes: 29 additions & 6 deletions python_modules/dagster/dagster/_daemon/asset_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,29 @@
from dagster._core.definitions.external_asset_graph import ExternalAssetGraph
from dagster._core.definitions.mode import DEFAULT_MODE_NAME
from dagster._core.definitions.selector import PipelineSelector
from dagster._core.instance import DagsterInstance
from dagster._core.storage.pipeline_run import DagsterRunStatus
from dagster._core.storage.tags import CREATED_BY_TAG
from dagster._core.workspace.context import IWorkspaceProcessContext
from dagster._daemon.daemon import DaemonIterator, IntervalDaemon

CURSOR_NAME = "ASSET_DAEMON_CURSOR"
CURSOR_KEY = "ASSET_DAEMON_CURSOR"
ASSET_DAEMON_PAUSED_KEY = "ASSET_DAEMON_PAUSED"


def get_auto_materialize_paused(instance: DagsterInstance) -> bool:
return (
instance.daemon_cursor_storage.get_cursor_values({ASSET_DAEMON_PAUSED_KEY}).get(
ASSET_DAEMON_PAUSED_KEY
)
== "true"
)


def set_auto_materialize_paused(instance: DagsterInstance, paused: bool):
instance.daemon_cursor_storage.set_cursor_values(
{ASSET_DAEMON_PAUSED_KEY: "true" if paused else "false"}
)


class AssetDaemon(IntervalDaemon):
Expand All @@ -27,9 +44,17 @@ def run_iteration(
workspace_process_context: IWorkspaceProcessContext,
) -> DaemonIterator:
instance = workspace_process_context.instance

persisted_info = instance.daemon_cursor_storage.get_cursor_values(
{CURSOR_KEY, ASSET_DAEMON_PAUSED_KEY}
)

if persisted_info.get(ASSET_DAEMON_PAUSED_KEY) == "true":
yield
return

workspace = workspace_process_context.create_request_context()
asset_graph = ExternalAssetGraph.from_workspace(workspace)

target_asset_keys = {
target_key
for target_key in asset_graph.non_source_asset_keys
Expand All @@ -40,9 +65,7 @@ def run_iteration(
yield
return

raw_cursor = instance.daemon_cursor_storage.get_cursor_values({CURSOR_NAME}).get(
CURSOR_NAME
)
raw_cursor = persisted_info.get(CURSOR_KEY)
cursor = (
AssetReconciliationCursor.from_serialized(raw_cursor, asset_graph)
if raw_cursor
Expand Down Expand Up @@ -120,4 +143,4 @@ def run_iteration(
)
instance.submit_run(run.run_id, workspace)

instance.daemon_cursor_storage.set_cursor_values({CURSOR_NAME: new_cursor.serialize()})
instance.daemon_cursor_storage.set_cursor_values({CURSOR_KEY: new_cursor.serialize()})

0 comments on commit 1cf6c4f

Please sign in to comment.