Skip to content

Commit

Permalink
Fix merge conflict with asset daemon sensor tests and cursor storage …
Browse files Browse the repository at this point in the history
…changes (dagster-io#19117)

Summary:
And that's why you always rebase before landing.

Test Plan: BK

## Summary & Motivation

## How I Tested These Changes
  • Loading branch information
gibsondan authored Jan 9, 2024
1 parent e80e34d commit 2ff7aaa
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 15 deletions.
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/_daemon/asset_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ def _run_iteration_impl(
InstigatorStatus.AUTOMATICALLY_RUNNING,
SensorInstigatorData(
min_interval=sensor.min_interval_seconds,
cursor=AssetDaemonCursor.empty().serialize(),
cursor=None,
last_sensor_start_timestamp=pendulum.now("UTC").timestamp(),
sensor_type=SensorType.AUTOMATION_POLICY,
),
Expand Down Expand Up @@ -534,7 +534,7 @@ def _process_auto_materialize_tick_generator(
check.not_none(auto_materialize_instigator_state).instigator_data,
).cursor

stored_cursor = (
stored_cursor: AssetDaemonCursor = (
LegacyAssetDaemonCursorWrapper.from_compressed(
compressed_cursor
).get_asset_daemon_cursor(asset_graph)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@
from dagster._core.definitions.asset_daemon_context import (
AssetDaemonContext,
)
from dagster._core.definitions.asset_daemon_cursor import AssetDaemonCursor
from dagster._core.definitions.asset_daemon_cursor import (
AssetDaemonCursor,
LegacyAssetDaemonCursorWrapper,
)
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.auto_materialize_rule import AutoMaterializeRule
from dagster._core.definitions.auto_materialize_rule_evaluation import (
Expand Down Expand Up @@ -73,10 +76,10 @@
)
from dagster._core.types.loadable_target_origin import LoadableTargetOrigin
from dagster._daemon.asset_daemon import (
_PRE_SENSOR_AUTO_MATERIALIZE_CURSOR_KEY,
_PRE_SENSOR_AUTO_MATERIALIZE_ORIGIN_ID,
_PRE_SENSOR_AUTO_MATERIALIZE_SELECTOR_ID,
AssetDaemon,
_get_pre_sensor_auto_materialize_serialized_cursor,
get_current_evaluation_id,
)

Expand Down Expand Up @@ -447,25 +450,30 @@ def _evaluate_tick_daemon(
sensor.get_external_origin_id(), sensor.selector_id
)
)
raw_cursor = (
compressed_cursor = (
cast(
SensorInstigatorData,
check.not_none(auto_materialize_instigator_state).instigator_data,
).cursor
or AssetDaemonCursor.empty().serialize()
)
new_cursor = (
LegacyAssetDaemonCursorWrapper.from_compressed(
compressed_cursor
).get_asset_daemon_cursor(self.asset_graph)
if compressed_cursor
else AssetDaemonCursor.empty()
)
else:
raw_cursor = self.instance.daemon_cursor_storage.get_cursor_values(
{_PRE_SENSOR_AUTO_MATERIALIZE_CURSOR_KEY}
).get(
_PRE_SENSOR_AUTO_MATERIALIZE_CURSOR_KEY,
AssetDaemonCursor.empty().serialize(),
raw_cursor = _get_pre_sensor_auto_materialize_serialized_cursor(self.instance)
new_cursor = (
AssetDaemonCursor.from_serialized(
raw_cursor,
self.asset_graph,
)
if raw_cursor
else AssetDaemonCursor.empty()
)

new_cursor = AssetDaemonCursor.from_serialized(
raw_cursor,
self.asset_graph,
)
new_run_requests = [
run_request(
list(run.asset_selection or []),
Expand Down

0 comments on commit 2ff7aaa

Please sign in to comment.