Skip to content

Commit

Permalink
AssetGraphView, TemporalContext, and AssetSlice
Browse files Browse the repository at this point in the history
### Summary & Motivation

With the additional of AMP, asset partitions, dynamic partitioning, and other related features, the complexity of our system has outstripped the ability of our abstractions to model it. A shallow indication of this our repeated threading of current time/evaluation time, storage_id/cursors, and dynamic_partitions_store up and down our stack. Another is that also have one off "caching" classes like `CachingStaleStatusResolver`, `CachingInstanceQueryer`, `CachingDataTimeResolver` and perhaps others I do not know about that present a wide range of capabilities inconsistently. Superficially it is annoying to have to thread time, storage_id, and dynamic_partitions_store around everywhere and hard to understand what class to use when interrogating the asset graph.

This belies a more profound problem: Some of our capabilities respect current time; some do not. Some of our capabilities respect storage_id; some do not. That means the engineers do not know what reads are volatile with respect to time and underlying storage. It is also difficult to know what is to safe to cache or not. This means also that as an engineer navigates the state of the asset graph, it is difficult to know what operations are cheap to compute, versus potentially extremely expensive to compute.

This PR introduces `AssetGraphView`, `AssetSlice` and `TemporalContext` to address this issue.

Temporal Context: TemporalContext represents an effective time, used for business logic, and last_event_id which is used to identify that state of the event log at some point in time. Put another way, the value of a TemporalContext represents a point in time and a snapshot of the event log.

Asset Graph View. It is a view of the asset graph and its state from the perspective of a specific temporal context. From the asset graph you can access asset slices, the main API for navigating the view of an asset graph.

Asset Slice: Represents a set of partitions attached to a particular asset. By having `AssetSlice` contain a reference to `AssetGraphView` this enables a more elegant traversal of an asset graph's partition space than before.

AssetSlice strives to be "partition-native". Very specifically, it deliberately does not have properties like `is_partitioned`. Instead they are just represented a slice with a single asset partition. Right now we do an inordinate (and unnecessary) special casing for unpartitioned assets. This will take some adjustment but will result in much cleaner code and and mental model.

e.g.

Before:

```python
def parent_slice(
    context, 
    asset_graph: AssetGraph,
    child_asset_subset: AssetSubset, 
    parent_key: AssetKey) -> AssetSubset:
    return asset_graph.get_parent_asset_subset(
        dynamic_partitions_store=context.instance_queryer,
        parent_asset_key=parent_asset_key,
        child_asset_subset=child_asset_subset,
        current_time=context.evaluation_dt,
    )

```

After:

```python
def parent_slice(child_slice: AssetSlice, parent_key: AssetKey) -> AssetSlice:
    return child_slice.compute_parent_slice(parent_key)
```

We also have naming conventions to indicate the performance characteristics of methods:

Naming conventions
    
* Properties guaranteed to be fast.
* Methods prefixed with `get_` do some work in-memory but not hugely expensive
* Methods prefixed with `compute_` do potentially expensive work, like compute
* partition mappings and query the instance.
* Methods using "materialize" indicate that they fully materialize partition sets
    These can potentially be very expensive if the underlying partition set has
    an in-memory representation that involves large time windows. I.e. if
    the underlying PartitionsSubset in the ValidAssetSubset is a TimeWindowPartitionsSubset
    Usage of these methods should be avoided if possible if you are potentially
    dealing with slices with large time-based partition windows.

This PR also addes two type aliases:

```python
PartitionKey: TypeAlias = Optional[str]
AssetPartition: TypeAlias = AssetKeyPartitionKey
```

I consider the rename of `AssetKeyPartitionKey` to `AssetPartition` merely an acknowledgement of the current ground truth in the code base, where nearly all local variables and method names refer to `asset_partition` because `asset_key_partition_key` is self evidently gross.

The `PartitionKey` alias is perhaps more controversial.
 

FAQ:

* Why add `AssetSlice` and not reuse `ValidAssetSubset`?
   * `AssetSlice` is different as it contains a reference to the asset graph view. This makes it fundamentally different. It also allow for elegant traversal of the graph without having to thread a datetime and a reference to an instance or instance queryer everywhere, or having to convert between `ValidAssetSubset` and `AssetSubset`.
   * Second I think the term "Subset" is extremely confusing, given that the "Subset" can actually refer to a _complete_ set of partitions to an asset.
   * Introducing a new name allows for disambiguation via a single word. `slice` and `subset` as local variables is very clear.
   * Slice also seeks to be "partition-native" and treat partitioned and unpartitioned assets uniformly. I consider properties like `bool_value` and `subset_value` on `AssetSubset` to be quite gross, so this abstractions seeks to encapsulate that.

The plan here is to introduce this abstraction then use it instead of direct use of various "resolver" and "queryer" classes throughout the code base. This will allow capabilities such as AMP, the backfill daemon, and state/outdated calculations int the web server to be more consistent and done with less code. 

Two concrete items this will help with immediately:
* Consistent treatment of last_storage_id in AMP
* Canonicalization of "outdated" logic in AMP and "stale" logic in dagster-webserver under a new single concept, likely named "unsynced" (but subject to discussion).

Another objective is to allow user-defined AMP rules against a higher level API that is either this or something directly backed by this.

## How I Tested These Changes

Included unit tests
  • Loading branch information
schrockn authored Mar 11, 2024
1 parent 2b0c777 commit 2a9e0f9
Show file tree
Hide file tree
Showing 5 changed files with 420 additions and 11 deletions.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
from datetime import datetime
from typing import TYPE_CHECKING, AbstractSet, Mapping, NamedTuple, NewType, Optional

from dagster import _check as check
from dagster._core.definitions.asset_subset import AssetSubset, ValidAssetSubset
from dagster._core.definitions.events import AssetKey
from dagster._utils.cached_method import cached_method

if TYPE_CHECKING:
from dagster._core.definitions.base_asset_graph import BaseAssetGraph, BaseAssetNode
from dagster._core.definitions.data_version import CachingStaleStatusResolver
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.instance import DagsterInstance
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer


class TemporalContext(NamedTuple):
"""TemporalContext represents an effective time, used for business logic, and last_event_id
which is used to identify that state of the event log at some point in time. Put another way,
the value of a TemporalContext represents a point in time and a snapshot of the event log.
Effective time: This is the effective time of the computation in terms of business logic,
and it impacts the behavior of partitioning and partition mapping. For example,
the "last" partition window of a given partitions definition, it is with
respect to the effective time.
Last event id: Our event log has a monotonically increasing event id. This is used to
cursor the event log. This event_id is also propogated to derived tables to indicate
when that record is valid. This allows us to query the state of the event log
at a given point in time.
Note that insertion time of the last_event_id is not the same as the effective time.
A last_event_id of None indicates that the reads will be volatile will immediately
reflect any subsequent writes.
"""

effective_dt: datetime
last_event_id: Optional[int]


# We reserve the right to constraints on the AssetSubset that we are going to use
# in AssetSlice internals. Adding a NewType enforces that we do that conversion
# in one spot (see _slice_from_subset)
_AssetSliceCompatibleSubset = NewType("_AssetSliceCompatibleSubset", ValidAssetSubset)


def _slice_from_subset(asset_graph_view: "AssetGraphView", subset: AssetSubset) -> "AssetSlice":
valid_subset = subset.as_valid(
asset_graph_view.asset_graph.get(subset.asset_key).partitions_def
)
return AssetSlice(asset_graph_view, _AssetSliceCompatibleSubset(valid_subset))


class AssetSlice:
"""An asset slice represents a set of partitions for a given asset key. It is
tied to a particular instance of an AssetGraphView, and is read-only.
With an asset slice you are able to traverse the set of partitions resident
in an asset graph at any point in time.
```python
asset_graph_view_t0 = AssetGraphView.for_test(defs, effective_dt=some_date())
some_asset_slice = asset_graph_view_to.get_asset_slice(some_asset.key)
for parent_slice in some_asset_slice.compute_parent_slices().values():
# do something with the parent slice
```
AssetSlice is read-only and tied to a specific AssetGraphView. Therefore
we can safely use cached methods and properties at will. However different methods
have different performance characterics so we have the following conventions:
Naming conventions
* Properties are guaranteed to be fast.
* Methods prefixed with `get_` do some work in-memory but are not hugely expensive.
* Methods prefixed with `compute_` do potentially expensive work, like compute
partition mappings and query the instance.
We also use this prefix to indicate that they fully materialize partition sets
These can potentially be very expensive if the underlying partition set has
an in-memory representation that involves large time windows. I.e. if the
underlying PartitionsSubset in the ValidAssetSubset is a
TimeWindowPartitionsSubset. Usage of these methods should be avoided if
possible if you are potentially dealing with slices with large time-based
partition windows.
"""

def __init__(
self, asset_graph_view: "AssetGraphView", compatible_subset: _AssetSliceCompatibleSubset
):
self._asset_graph_view = asset_graph_view
self._compatible_subset = compatible_subset

def convert_to_valid_asset_subset(self) -> ValidAssetSubset:
return self._compatible_subset

# only works for partitioned assets for now
def compute_partition_keys(self) -> AbstractSet[str]:
return {
check.not_none(akpk.partition_key, "No None partition keys")
for akpk in self._compatible_subset.asset_partitions
}

@property
def asset_key(self) -> AssetKey:
return self._compatible_subset.asset_key

@property
def parent_keys(self) -> AbstractSet[AssetKey]:
return self._asset_graph_view.asset_graph.get(self.asset_key).parent_keys

@property
def child_keys(self) -> AbstractSet[AssetKey]:
return self._asset_graph_view.asset_graph.get(self.asset_key).child_keys

@property
def _partitions_def(self) -> Optional["PartitionsDefinition"]:
return self._asset_graph_view.asset_graph.get(self.asset_key).partitions_def

@cached_method
def compute_parent_slice(self, parent_asset_key: AssetKey) -> "AssetSlice":
return self._asset_graph_view.compute_parent_asset_slice(parent_asset_key, self)

@cached_method
def compute_child_slice(self, child_asset_key: AssetKey) -> "AssetSlice":
return self._asset_graph_view.compute_child_asset_slice(child_asset_key, self)

@cached_method
def compute_parent_slices(self) -> Mapping[AssetKey, "AssetSlice"]:
return {ak: self.compute_parent_slice(ak) for ak in self.parent_keys}

@cached_method
def compute_child_slices(self) -> Mapping[AssetKey, "AssetSlice"]:
return {ak: self.compute_child_slice(ak) for ak in self.child_keys}

def compute_intersection_with_partition_keys(
self, partition_keys: AbstractSet[str]
) -> "AssetSlice":
"""Return a new AssetSlice with only the given partition keys if they are in the slice."""
return self._asset_graph_view.compute_intersection_with_partition_keys(partition_keys, self)


class AssetGraphView:
"""The Asset Graph View. It is a view of the asset graph from the perspective of a specific
temporal context.
If the user wants to get a new view of the asset graph with a new effective date or last event
id, they should create a new instance of an AssetGraphView. If they do not they will get
incorrect results because the AssetGraphView and its associated classes (like AssetSlice)
cache results based on the effective date and last event id.
```python
# in a test case
asset_graph_view_t0 = AssetGraphView.for_test(defs, effective_dt=some_date())
#
# call materialize on an asset in defs
#
# must create a new AssetGraphView to get the correct results,
# asset_graph_view_t1 will not reflect the new materialization
asset_graph_view_t1 = AssetGraphView.for_test(defs, effective_dt=some_date())
```
"""

@staticmethod
def for_test(
defs: "Definitions",
instance: Optional["DagsterInstance"] = None,
effective_dt: Optional[datetime] = None,
last_event_id: Optional[int] = None,
):
import pendulum

from dagster._core.definitions.data_version import CachingStaleStatusResolver
from dagster._core.instance import DagsterInstance

stale_resolver = CachingStaleStatusResolver(
instance=instance or DagsterInstance.ephemeral(),
asset_graph=defs.get_asset_graph(),
)
check.invariant(stale_resolver.instance_queryer, "Ensure instance queryer is constructed")
return AssetGraphView(
stale_resolver=stale_resolver,
temporal_context=TemporalContext(
effective_dt=effective_dt or pendulum.now(),
last_event_id=last_event_id,
),
)

def __init__(
self,
*,
temporal_context: TemporalContext,
stale_resolver: "CachingStaleStatusResolver",
):
# Current these properties have the ability to be lazily constructed.
# We instead are going to try to retain the invariant that they are
# constructed upfront so that initialization time is well-understood
# and deterministic. If there are cheap operations that do not
# require these instances, we should move them to a different abstraction.

# ensure it is already constructed rather than created on demand
check.invariant(stale_resolver._instance_queryer) # noqa: SLF001
# ensure it is already constructed rather than created on demand
check.invariant(stale_resolver._asset_graph) # noqa: SLF001

# stale resolver has a CachingInstanceQueryer which has a DagsterInstance
# so just passing the CachingStaleStatusResolver is enough
self._stale_resolver = stale_resolver
self._temporal_context = temporal_context

@property
def effective_dt(self) -> datetime:
return self._temporal_context.effective_dt

@property
def last_event_id(self) -> Optional[int]:
return self._temporal_context.last_event_id

@property
def asset_graph(self) -> "BaseAssetGraph[BaseAssetNode]":
return self._stale_resolver.asset_graph

@property
def _queryer(self) -> "CachingInstanceQueryer":
return self._stale_resolver.instance_queryer

def _get_partitions_def(self, asset_key: "AssetKey") -> Optional["PartitionsDefinition"]:
return self.asset_graph.get(asset_key).partitions_def

def get_asset_slice(self, asset_key: "AssetKey") -> "AssetSlice":
# not compute_asset_slice because dynamic partitions store
# is just passed to AssetSubset.all, not invoked
return _slice_from_subset(
self,
AssetSubset.all(
asset_key=asset_key,
partitions_def=self._get_partitions_def(asset_key),
dynamic_partitions_store=self._queryer,
current_time=self.effective_dt,
),
)

def compute_parent_asset_slice(
self, parent_asset_key: AssetKey, asset_slice: AssetSlice
) -> AssetSlice:
return _slice_from_subset(
self,
self.asset_graph.get_parent_asset_subset(
dynamic_partitions_store=self._queryer,
parent_asset_key=parent_asset_key,
child_asset_subset=asset_slice.convert_to_valid_asset_subset(),
current_time=self.effective_dt,
),
)

def compute_child_asset_slice(
self, child_asset_key: "AssetKey", asset_slice: AssetSlice
) -> "AssetSlice":
return _slice_from_subset(
self,
self.asset_graph.get_child_asset_subset(
dynamic_partitions_store=self._queryer,
child_asset_key=child_asset_key,
current_time=self.effective_dt,
parent_asset_subset=asset_slice.convert_to_valid_asset_subset(),
),
)

def compute_intersection_with_partition_keys(
self, partition_keys: AbstractSet[str], asset_slice: AssetSlice
) -> "AssetSlice":
"""Return a new AssetSlice with only the given partition keys if they are in the slice."""
partitions_def = check.not_none(
self._get_partitions_def(asset_slice.asset_key), "Must have partitions def"
)
for partition_key in partition_keys:
if not partitions_def.has_partition_key(
partition_key,
current_time=self.effective_dt,
dynamic_partitions_store=self._queryer,
):
check.failed(
f"Partition key {partition_key} not in partitions def {partitions_def}"
)

return _slice_from_subset(
self,
asset_slice.convert_to_valid_asset_subset()
& AssetSubset.from_partition_keys(
asset_slice.asset_key, partitions_def, partition_keys
),
)
30 changes: 19 additions & 11 deletions python_modules/dagster/dagster/_core/definitions/asset_subset.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,19 +147,27 @@ def from_asset_partitions_set(
partitions_def: Optional[PartitionsDefinition],
asset_partitions_set: AbstractSet[AssetKeyPartitionKey],
) -> "ValidAssetSubset":
if partitions_def is None:
return ValidAssetSubset(asset_key=asset_key, value=bool(asset_partitions_set))
else:
return ValidAssetSubset(
return (
ValidAssetSubset.from_partition_keys(
asset_key=asset_key,
value=partitions_def.subset_with_partition_keys(
{
ap.partition_key
for ap in asset_partitions_set
if ap.partition_key is not None
}
),
partitions_def=partitions_def,
partition_keys={
ap.partition_key for ap in asset_partitions_set if ap.partition_key is not None
},
)
if partitions_def
else ValidAssetSubset(asset_key=asset_key, value=bool(asset_partitions_set))
)

@staticmethod
def from_partition_keys(
asset_key: AssetKey,
partitions_def: PartitionsDefinition,
partition_keys: AbstractSet[str],
) -> "ValidAssetSubset":
return ValidAssetSubset(
asset_key=asset_key, value=partitions_def.subset_with_partition_keys(partition_keys)
)

def __contains__(self, item: AssetKeyPartitionKey) -> bool:
if not self.is_partitioned:
Expand Down
Empty file.
Loading

0 comments on commit 2a9e0f9

Please sign in to comment.