Skip to content

Commit

Permalink
Freshness metadata formatting fixes (dagster-io#21247)
Browse files Browse the repository at this point in the history
Makes some changes to better organize and display freshness metadata.
- No longer include dagster/ in second tier metadata keys
- Disambiguate between top level and nested metadata keys
- Better organize metadata keys in code
  • Loading branch information
dpeng817 authored Apr 19, 2024
1 parent d759885 commit fbf5c65
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
from ..run_request import RunRequest
from ..sensor_definition import DefaultSensorStatus, SensorDefinition, SensorEvaluationContext
from .utils import (
DEADLINE_CRON_METADATA_KEY,
DEADLINE_CRON_PARAM_KEY,
DEFAULT_FRESHNESS_TIMEZONE,
FRESHNESS_PARAMS_METADATA_KEY,
FRESHNESS_TIMEZONE_METADATA_KEY,
LOWER_BOUND_DELTA_METADATA_KEY,
LOWER_BOUND_DELTA_PARAM_KEY,
TIMEZONE_PARAM_KEY,
ensure_freshness_checks,
ensure_no_duplicate_asset_checks,
)
Expand Down Expand Up @@ -186,13 +186,13 @@ def get_metadata(check_spec: AssetCheckSpec) -> Mapping[str, Any]:


def get_freshness_cron(metadata: Mapping[str, Any]) -> Optional[str]:
return metadata[FRESHNESS_PARAMS_METADATA_KEY].get(DEADLINE_CRON_METADATA_KEY)
return metadata[FRESHNESS_PARAMS_METADATA_KEY].get(DEADLINE_CRON_PARAM_KEY)


def get_freshness_cron_timezone(metadata: Mapping[str, Any]) -> Optional[str]:
return metadata[FRESHNESS_PARAMS_METADATA_KEY].get(FRESHNESS_TIMEZONE_METADATA_KEY)
return metadata[FRESHNESS_PARAMS_METADATA_KEY].get(TIMEZONE_PARAM_KEY)


def get_lower_bound_delta(metadata: Mapping[str, Any]) -> Optional[datetime.timedelta]:
float_delta: float = metadata[FRESHNESS_PARAMS_METADATA_KEY].get(LOWER_BOUND_DELTA_METADATA_KEY)
float_delta: float = metadata[FRESHNESS_PARAMS_METADATA_KEY].get(LOWER_BOUND_DELTA_PARAM_KEY)
return datetime.timedelta(seconds=float_delta) if float_delta else None
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
from ..assets import AssetsDefinition, SourceAsset
from ..events import AssetKey, CoercibleToAssetKey
from .utils import (
DEADLINE_CRON_METADATA_KEY,
DEADLINE_CRON_PARAM_KEY,
FRESHNESS_PARAMS_METADATA_KEY,
FRESHNESS_TIMEZONE_METADATA_KEY,
LAST_UPDATED_TIMESTAMP_METADATA_KEY,
LOWER_BOUND_DELTA_METADATA_KEY,
LOWER_BOUND_DELTA_PARAM_KEY,
OVERDUE_DEADLINE_TIMESTAMP_METADATA_KEY,
OVERDUE_SECONDS_METADATA_KEY,
TIMEZONE_PARAM_KEY,
asset_to_keys_iterable,
ensure_no_duplicate_assets,
get_description_for_freshness_check_result,
Expand All @@ -44,11 +44,11 @@ def build_freshness_multi_check(
lower_bound_delta: Optional[datetime.timedelta],
asset_property_enforcement_lambda: Optional[Callable[[AssetsDefinition], bool]],
) -> AssetChecksDefinition:
params_metadata: dict[str, Any] = {FRESHNESS_TIMEZONE_METADATA_KEY: timezone}
params_metadata: dict[str, Any] = {TIMEZONE_PARAM_KEY: timezone}
if deadline_cron:
params_metadata[DEADLINE_CRON_METADATA_KEY] = deadline_cron
params_metadata[DEADLINE_CRON_PARAM_KEY] = deadline_cron
if lower_bound_delta:
params_metadata[LOWER_BOUND_DELTA_METADATA_KEY] = lower_bound_delta.total_seconds()
params_metadata[LOWER_BOUND_DELTA_PARAM_KEY] = lower_bound_delta.total_seconds()

@multi_asset_check(
specs=[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,21 @@
from ..assets import AssetsDefinition, SourceAsset
from ..events import AssetKey, CoercibleToAssetKey

# Constants
DEFAULT_FRESHNESS_SEVERITY = AssetCheckSeverity.WARN
DEFAULT_FRESHNESS_TIMEZONE = "UTC"
LOWER_BOUND_DELTA_METADATA_KEY = "dagster/lower_bound_delta"
DEADLINE_CRON_METADATA_KEY = "dagster/deadline_cron"
FRESHNESS_TIMEZONE_METADATA_KEY = "dagster/freshness_timezone"

# Top-level metadata keys
LAST_UPDATED_TIMESTAMP_METADATA_KEY = "dagster/last_updated_timestamp"
FRESHNESS_PARAMS_METADATA_KEY = "dagster/freshness_params"
OVERDUE_DEADLINE_TIMESTAMP_METADATA_KEY = "dagster/overdue_deadline_timestamp"
OVERDUE_SECONDS_METADATA_KEY = "dagster/overdue_seconds"

# dagster/freshness_params inner keys
LOWER_BOUND_DELTA_PARAM_KEY = "lower_bound_delta"
DEADLINE_CRON_PARAM_KEY = "deadline_cron"
TIMEZONE_PARAM_KEY = "timezone"


def ensure_no_duplicate_assets(
assets: Sequence[Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ def my_asset():
assert next(iter(check.check_keys)).asset_key == my_asset.key
assert next(iter(check_specs)).metadata == {
"dagster/freshness_params": {
"dagster/lower_bound_delta": 600,
"dagster/freshness_timezone": "UTC",
"lower_bound_delta": 600,
"timezone": "UTC",
}
}

Expand All @@ -71,9 +71,9 @@ def other_asset():
assert isinstance(check, AssetChecksDefinition)
assert next(iter(check.check_specs)).metadata == {
"dagster/freshness_params": {
"dagster/lower_bound_delta": 600,
"dagster/deadline_cron": "0 0 * * *",
"dagster/freshness_timezone": "UTC",
"lower_bound_delta": 600,
"deadline_cron": "0 0 * * *",
"timezone": "UTC",
}
}

Expand Down Expand Up @@ -252,9 +252,9 @@ def my_asset():
metadata_match={
"dagster/freshness_params": JsonMetadataValue(
{
"dagster/deadline_cron": deadline_cron,
"dagster/freshness_timezone": timezone,
"dagster/lower_bound_delta": lower_bound_delta.total_seconds(),
"deadline_cron": deadline_cron,
"timezone": timezone,
"lower_bound_delta": lower_bound_delta.total_seconds(),
}
),
"dagster/overdue_deadline_timestamp": TimestampMetadataValue(
Expand Down Expand Up @@ -286,9 +286,9 @@ def my_asset():
metadata_match={
"dagster/freshness_params": JsonMetadataValue(
{
"dagster/deadline_cron": deadline_cron,
"dagster/freshness_timezone": timezone,
"dagster/lower_bound_delta": lower_bound_delta.total_seconds(),
"deadline_cron": deadline_cron,
"timezone": timezone,
"lower_bound_delta": lower_bound_delta.total_seconds(),
}
),
"dagster/last_updated_timestamp": TimestampMetadataValue(
Expand Down Expand Up @@ -349,8 +349,8 @@ def my_asset():
metadata_match={
"dagster/freshness_params": JsonMetadataValue(
{
"dagster/lower_bound_delta": 600,
"dagster/freshness_timezone": "UTC",
"lower_bound_delta": 600,
"timezone": "UTC",
}
),
# Indicates that no records exist.
Expand Down Expand Up @@ -383,8 +383,8 @@ def my_asset():
metadata_match={
"dagster/freshness_params": JsonMetadataValue(
{
"dagster/lower_bound_delta": 600,
"dagster/freshness_timezone": "UTC",
"lower_bound_delta": 600,
"timezone": "UTC",
}
),
# Since no freshness cron, deadline is the current time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ def my_partitioned_asset():
assert next(iter(check.check_keys)).asset_key == my_partitioned_asset.key
assert next(iter(check.check_specs)).metadata == {
"dagster/freshness_params": {
"dagster/deadline_cron": "0 0 * * *",
"dagster/freshness_timezone": "UTC",
"deadline_cron": "0 0 * * *",
"timezone": "UTC",
}
}
assert (
Expand Down

0 comments on commit fbf5c65

Please sign in to comment.