Skip to content

Commit

Permalink
output_required on asset decorator (dagster-io#9327)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria authored Aug 16, 2022
1 parent d61fc55 commit 623d240
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 0 deletions.
19 changes: 19 additions & 0 deletions docs/content/concepts/assets/software-defined-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,25 @@ def my_configurable_asset(context):

Refer to the [Config schema documentation](/concepts/configuration/config-schema) for more configuration info and examples.

### Conditional materialization

In some cases, an asset may not need to be updated in storage each time the decorated function is executed. In this case you can use the `output_required` parameter along with `yield` syntax to invoke this behavior. If the `output_required` parameter is set to `False`, you may indicate to the Dagster framework that no data should be persisted to storage by not yielding an output from your computation function. If an output is not emitted during computation, no asset materialization event will be created, the I/O manager will not be invoked, downstream assets will not be materialized, and asset sensors monitoring the asset will not trigger.

```python file=/concepts/assets/conditional_materialization.py startafter=start_conditional endbefore=end_conditional
@asset(output_required=False)
def may_not_materialize():
# to simulate an asset that may not always materialize.
random.seed()
if random.randint(1, 10) < 5:
yield Output([1, 2, 3, 4])


@asset
def downstream(may_not_materialize):
# will not run when may_not_materialize doesn't materialize the asset
return may_not_materialize + [5]
```

---

## Viewing and materializing assets in Dagit
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import random

from dagster import Output, asset

# start_conditional


@asset(output_required=False)
def may_not_materialize():
# to simulate an asset that may not always materialize.
random.seed()
if random.randint(1, 10) < 5:
yield Output([1, 2, 3, 4])


@asset
def downstream(may_not_materialize):
# will not run when may_not_materialize doesn't materialize the asset
return may_not_materialize + [5]


# end_conditional
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import random

from dagster import AssetKey, Output, RunRequest, asset, asset_sensor, job, op


@asset(output_required=False)
def may_not_materialize(context):
random.seed()
rand_num = random.randint(1, 10)
context.log.info(
f"Random number is {rand_num}. Asset will {'not' if rand_num >=5 else ''} materialize."
)
if rand_num < 5:
yield Output([1, 2, 3])


@asset
def downstream_conditional(may_not_materialize):
return may_not_materialize + [4]


@op
def success_op(context):
context.log.info("success!")


@job
def success_job():
success_op()


@asset_sensor(asset_key=AssetKey("may_not_materialize"), job=success_job)
def may_not_materialize_sensor(context, asset_event): # pylint: disable=unused-argument
return RunRequest(run_key=None)


def get_conditional_assets_repo():
return [may_not_materialize, downstream_conditional, may_not_materialize_sensor, success_job]
6 changes: 6 additions & 0 deletions python_modules/dagster-test/dagster_test/toys/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dagster_test.toys.big_honkin_asset_graph import big_honkin_asset_group
from dagster_test.toys.branches import branch_failed_job, branch_job
from dagster_test.toys.composition import composition_job
from dagster_test.toys.conditional_assets import get_conditional_assets_repo
from dagster_test.toys.cross_repo_assets import (
downstream_asset_group1,
downstream_asset_group2,
Expand Down Expand Up @@ -152,3 +153,8 @@ def downstream_assets_repository2():
@repository
def graph_backed_asset_repository():
return [graph_backed_group]


@repository
def conditional_assets_repository():
return get_conditional_assets_repo()
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def asset(
partitions_def: Optional[PartitionsDefinition] = ...,
op_tags: Optional[Dict[str, Any]] = ...,
group_name: Optional[str] = ...,
output_required: bool = ...,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
...

Expand All @@ -88,6 +89,7 @@ def asset(
partitions_def: Optional[PartitionsDefinition] = None,
op_tags: Optional[Dict[str, Any]] = None,
group_name: Optional[str] = None,
output_required: bool = True,
) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]:
"""Create a definition for how to compute an asset.
Expand Down Expand Up @@ -139,6 +141,9 @@ def asset(
(Experimental) A mapping of resource keys to resource definitions. These resources
will be initialized during execution, and can be accessed from the
context within the body of the function.
output_required (bool): Whether the decorated function will always materialize an asset.
Defaults to True. If False, the function can return None, which will not be materialized to
storage and will halt execution of downstream assets.
Examples:
Expand Down Expand Up @@ -178,6 +183,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
partitions_def=partitions_def,
op_tags=op_tags,
group_name=group_name,
output_required=output_required,
)(fn)

return inner
Expand All @@ -201,6 +207,7 @@ def __init__(
partitions_def: Optional[PartitionsDefinition] = None,
op_tags: Optional[Dict[str, Any]] = None,
group_name: Optional[str] = None,
output_required: bool = True,
):
self.name = name

Expand All @@ -222,6 +229,7 @@ def __init__(
self.op_tags = op_tags
self.resource_defs = dict(check.opt_mapping_param(resource_defs, "resource_defs"))
self.group_name = group_name
self.output_required = output_required

def __call__(self, fn: Callable) -> AssetsDefinition:
asset_name = self.name or fn.__name__
Expand Down Expand Up @@ -253,6 +261,7 @@ def __call__(self, fn: Callable) -> AssetsDefinition:
io_manager_key=io_manager_key,
dagster_type=self.dagster_type if self.dagster_type else NoValueSentinel,
description=self.description,
is_required=self.output_required,
)

op = _Op(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import pickle
import warnings
from tempfile import TemporaryDirectory

Expand Down Expand Up @@ -285,3 +286,51 @@ def foo_resource(init_context):
)
assert result.success
assert result.asset_materializations_for_node("the_asset")[0].metadata_entries == []


def test_conditional_materialize():
should_materialize = True

@asset(output_required=False)
def the_asset():
if should_materialize:
yield Output(5)

@asset
def downstream(the_asset):
if the_asset:
return the_asset + 1
else:
# if we get here then the_asset is None and downstream execution was not halted
assert False

# Verify that result of materialize was persisted
with TemporaryDirectory() as temp_dir:
with instance_for_test(temp_dir=temp_dir) as instance:
result = materialize([the_asset, downstream], instance=instance)
assert result.success

assert result.asset_materializations_for_node("the_asset")[0].metadata_entries[
0
].value == MetadataValue.path(os.path.join(temp_dir, "storage", "the_asset"))
with open(os.path.join(temp_dir, "storage", "the_asset"), "rb") as f:
assert pickle.load(f) == 5

assert result.asset_materializations_for_node("downstream")[0].metadata_entries[
0
].value == MetadataValue.path(os.path.join(temp_dir, "storage", "downstream"))
with open(os.path.join(temp_dir, "storage", "downstream"), "rb") as f:
assert pickle.load(f) == 6

should_materialize = False

result = materialize([the_asset, downstream], instance=instance)
assert result.success

assert len(result.asset_materializations_for_node("the_asset")) == 0
with open(os.path.join(temp_dir, "storage", "the_asset"), "rb") as f:
assert pickle.load(f) == 5

assert len(result.asset_materializations_for_node("downstream")) == 0
with open(os.path.join(temp_dir, "storage", "downstream"), "rb") as f:
assert pickle.load(f) == 6

0 comments on commit 623d240

Please sign in to comment.