diff --git a/python_modules/dagster/dagster/_core/definitions/freshness_checks/sensor.py b/python_modules/dagster/dagster/_core/definitions/freshness_checks/sensor.py index 90b4343123ec0..462746329d43a 100644 --- a/python_modules/dagster/dagster/_core/definitions/freshness_checks/sensor.py +++ b/python_modules/dagster/dagster/_core/definitions/freshness_checks/sensor.py @@ -29,11 +29,11 @@ from ..run_request import RunRequest from ..sensor_definition import DefaultSensorStatus, SensorDefinition, SensorEvaluationContext from .utils import ( - DEADLINE_CRON_METADATA_KEY, + DEADLINE_CRON_PARAM_KEY, DEFAULT_FRESHNESS_TIMEZONE, FRESHNESS_PARAMS_METADATA_KEY, - FRESHNESS_TIMEZONE_METADATA_KEY, - LOWER_BOUND_DELTA_METADATA_KEY, + LOWER_BOUND_DELTA_PARAM_KEY, + TIMEZONE_PARAM_KEY, ensure_freshness_checks, ensure_no_duplicate_asset_checks, ) @@ -186,13 +186,13 @@ def get_metadata(check_spec: AssetCheckSpec) -> Mapping[str, Any]: def get_freshness_cron(metadata: Mapping[str, Any]) -> Optional[str]: - return metadata[FRESHNESS_PARAMS_METADATA_KEY].get(DEADLINE_CRON_METADATA_KEY) + return metadata[FRESHNESS_PARAMS_METADATA_KEY].get(DEADLINE_CRON_PARAM_KEY) def get_freshness_cron_timezone(metadata: Mapping[str, Any]) -> Optional[str]: - return metadata[FRESHNESS_PARAMS_METADATA_KEY].get(FRESHNESS_TIMEZONE_METADATA_KEY) + return metadata[FRESHNESS_PARAMS_METADATA_KEY].get(TIMEZONE_PARAM_KEY) def get_lower_bound_delta(metadata: Mapping[str, Any]) -> Optional[datetime.timedelta]: - float_delta: float = metadata[FRESHNESS_PARAMS_METADATA_KEY].get(LOWER_BOUND_DELTA_METADATA_KEY) + float_delta: float = metadata[FRESHNESS_PARAMS_METADATA_KEY].get(LOWER_BOUND_DELTA_PARAM_KEY) return datetime.timedelta(seconds=float_delta) if float_delta else None diff --git a/python_modules/dagster/dagster/_core/definitions/freshness_checks/shared_builder.py b/python_modules/dagster/dagster/_core/definitions/freshness_checks/shared_builder.py index a6b183dbfbdc1..796ff3deb7591 100644 --- a/python_modules/dagster/dagster/_core/definitions/freshness_checks/shared_builder.py +++ b/python_modules/dagster/dagster/_core/definitions/freshness_checks/shared_builder.py @@ -20,13 +20,13 @@ from ..assets import AssetsDefinition, SourceAsset from ..events import AssetKey, CoercibleToAssetKey from .utils import ( - DEADLINE_CRON_METADATA_KEY, + DEADLINE_CRON_PARAM_KEY, FRESHNESS_PARAMS_METADATA_KEY, - FRESHNESS_TIMEZONE_METADATA_KEY, LAST_UPDATED_TIMESTAMP_METADATA_KEY, - LOWER_BOUND_DELTA_METADATA_KEY, + LOWER_BOUND_DELTA_PARAM_KEY, OVERDUE_DEADLINE_TIMESTAMP_METADATA_KEY, OVERDUE_SECONDS_METADATA_KEY, + TIMEZONE_PARAM_KEY, asset_to_keys_iterable, ensure_no_duplicate_assets, get_description_for_freshness_check_result, @@ -44,11 +44,11 @@ def build_freshness_multi_check( lower_bound_delta: Optional[datetime.timedelta], asset_property_enforcement_lambda: Optional[Callable[[AssetsDefinition], bool]], ) -> AssetChecksDefinition: - params_metadata: dict[str, Any] = {FRESHNESS_TIMEZONE_METADATA_KEY: timezone} + params_metadata: dict[str, Any] = {TIMEZONE_PARAM_KEY: timezone} if deadline_cron: - params_metadata[DEADLINE_CRON_METADATA_KEY] = deadline_cron + params_metadata[DEADLINE_CRON_PARAM_KEY] = deadline_cron if lower_bound_delta: - params_metadata[LOWER_BOUND_DELTA_METADATA_KEY] = lower_bound_delta.total_seconds() + params_metadata[LOWER_BOUND_DELTA_PARAM_KEY] = lower_bound_delta.total_seconds() @multi_asset_check( specs=[ diff --git a/python_modules/dagster/dagster/_core/definitions/freshness_checks/utils.py b/python_modules/dagster/dagster/_core/definitions/freshness_checks/utils.py index 323851a9d0316..57a02af817616 100644 --- a/python_modules/dagster/dagster/_core/definitions/freshness_checks/utils.py +++ b/python_modules/dagster/dagster/_core/definitions/freshness_checks/utils.py @@ -13,16 +13,21 @@ from ..assets import AssetsDefinition, SourceAsset from ..events import AssetKey, CoercibleToAssetKey +# Constants DEFAULT_FRESHNESS_SEVERITY = AssetCheckSeverity.WARN DEFAULT_FRESHNESS_TIMEZONE = "UTC" -LOWER_BOUND_DELTA_METADATA_KEY = "dagster/lower_bound_delta" -DEADLINE_CRON_METADATA_KEY = "dagster/deadline_cron" -FRESHNESS_TIMEZONE_METADATA_KEY = "dagster/freshness_timezone" + +# Top-level metadata keys LAST_UPDATED_TIMESTAMP_METADATA_KEY = "dagster/last_updated_timestamp" FRESHNESS_PARAMS_METADATA_KEY = "dagster/freshness_params" OVERDUE_DEADLINE_TIMESTAMP_METADATA_KEY = "dagster/overdue_deadline_timestamp" OVERDUE_SECONDS_METADATA_KEY = "dagster/overdue_seconds" +# dagster/freshness_params inner keys +LOWER_BOUND_DELTA_PARAM_KEY = "lower_bound_delta" +DEADLINE_CRON_PARAM_KEY = "deadline_cron" +TIMEZONE_PARAM_KEY = "timezone" + def ensure_no_duplicate_assets( assets: Sequence[Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset]], diff --git a/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_last_update_freshness.py b/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_last_update_freshness.py index c22e7b5185748..fa6f91f0feacb 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_last_update_freshness.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_last_update_freshness.py @@ -48,8 +48,8 @@ def my_asset(): assert next(iter(check.check_keys)).asset_key == my_asset.key assert next(iter(check_specs)).metadata == { "dagster/freshness_params": { - "dagster/lower_bound_delta": 600, - "dagster/freshness_timezone": "UTC", + "lower_bound_delta": 600, + "timezone": "UTC", } } @@ -71,9 +71,9 @@ def other_asset(): assert isinstance(check, AssetChecksDefinition) assert next(iter(check.check_specs)).metadata == { "dagster/freshness_params": { - "dagster/lower_bound_delta": 600, - "dagster/deadline_cron": "0 0 * * *", - "dagster/freshness_timezone": "UTC", + "lower_bound_delta": 600, + "deadline_cron": "0 0 * * *", + "timezone": "UTC", } } @@ -252,9 +252,9 @@ def my_asset(): metadata_match={ "dagster/freshness_params": JsonMetadataValue( { - "dagster/deadline_cron": deadline_cron, - "dagster/freshness_timezone": timezone, - "dagster/lower_bound_delta": lower_bound_delta.total_seconds(), + "deadline_cron": deadline_cron, + "timezone": timezone, + "lower_bound_delta": lower_bound_delta.total_seconds(), } ), "dagster/overdue_deadline_timestamp": TimestampMetadataValue( @@ -286,9 +286,9 @@ def my_asset(): metadata_match={ "dagster/freshness_params": JsonMetadataValue( { - "dagster/deadline_cron": deadline_cron, - "dagster/freshness_timezone": timezone, - "dagster/lower_bound_delta": lower_bound_delta.total_seconds(), + "deadline_cron": deadline_cron, + "timezone": timezone, + "lower_bound_delta": lower_bound_delta.total_seconds(), } ), "dagster/last_updated_timestamp": TimestampMetadataValue( @@ -349,8 +349,8 @@ def my_asset(): metadata_match={ "dagster/freshness_params": JsonMetadataValue( { - "dagster/lower_bound_delta": 600, - "dagster/freshness_timezone": "UTC", + "lower_bound_delta": 600, + "timezone": "UTC", } ), # Indicates that no records exist. @@ -383,8 +383,8 @@ def my_asset(): metadata_match={ "dagster/freshness_params": JsonMetadataValue( { - "dagster/lower_bound_delta": 600, - "dagster/freshness_timezone": "UTC", + "lower_bound_delta": 600, + "timezone": "UTC", } ), # Since no freshness cron, deadline is the current time. diff --git a/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_time_partition_freshness.py b/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_time_partition_freshness.py index a75e813585810..da056c4847462 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_time_partition_freshness.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/freshness_checks_tests/test_time_partition_freshness.py @@ -54,8 +54,8 @@ def my_partitioned_asset(): assert next(iter(check.check_keys)).asset_key == my_partitioned_asset.key assert next(iter(check.check_specs)).metadata == { "dagster/freshness_params": { - "dagster/deadline_cron": "0 0 * * *", - "dagster/freshness_timezone": "UTC", + "deadline_cron": "0 0 * * *", + "timezone": "UTC", } } assert (