forked from dagster-io/dagster
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[examples] Modern Data Stack + SDA Example (dagster-io#6862)
- Loading branch information
1 parent
6092f3e
commit 42d2d72
Showing
25 changed files
with
10,569 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
This is an example of how to use the Software-Defined Asset APIs alongside Modern Data Stack tools | ||
(specifically, Airbyte and dbt). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
|
||
target/ | ||
dbt_packages/ | ||
logs/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
Welcome to your new dbt project! | ||
|
||
### Using the starter project | ||
|
||
Try running the following commands: | ||
- dbt run | ||
- dbt test | ||
|
||
|
||
### Resources: | ||
- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction) | ||
- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers | ||
- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support | ||
- Find [dbt events](https://events.getdbt.com) near you | ||
- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices |
Empty file.
23 changes: 23 additions & 0 deletions
23
examples/modern_data_stack_assets/mds_dbt/config/profiles.yml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
mds_dbt: | ||
target: prod | ||
outputs: | ||
prod: | ||
type: postgres | ||
host: localhost | ||
port: 5432 | ||
user: postgres | ||
pass: password | ||
dbname: postgres_replica | ||
schema: public | ||
threads: 2 | ||
keepalives_idle: 0 | ||
skip_airbyte: | ||
type: postgres | ||
host: localhost | ||
port: 5432 | ||
user: postgres | ||
pass: password | ||
dbname: postgres | ||
schema: public | ||
threads: 2 | ||
keepalives_idle: 0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
# Name your project! Project names should contain only lowercase characters | ||
# and underscores. A good package name should reflect your organization's | ||
# name or the intended use of these models | ||
name: "mds_dbt" | ||
version: "1.0.0" | ||
config-version: 2 | ||
|
||
# This setting configures which "profile" dbt uses for this project. | ||
profile: "mds_dbt" | ||
|
||
# These configurations specify where dbt should look for different types of files. | ||
# The `model-paths` config, for example, states that models in this project can be | ||
# found in the "models/" directory. You probably won't need to change these! | ||
analysis-paths: ["analyses"] | ||
test-paths: ["tests"] | ||
macro-paths: ["macros"] | ||
snapshot-paths: ["snapshots"] | ||
|
||
target-path: "target" # directory which will store compiled SQL files | ||
clean-targets: # directories to be removed by `dbt clean` | ||
- "target" | ||
- "dbt_packages" | ||
|
||
models: | ||
mds_dbt: | ||
+materialized: "table" |
Empty file.
11 changes: 11 additions & 0 deletions
11
examples/modern_data_stack_assets/mds_dbt/models/daily_order_summary.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
select | ||
date_trunc('d', oc.order_time::timestamp) as order_date, | ||
sum(oc.order_value) as total_value, | ||
count(*) as num_orders | ||
from | ||
{{ ref("orders_cleaned") }} oc | ||
join | ||
{{ ref("users_augmented") }} ua | ||
on oc.user_id = ua.user_id | ||
where not ua.is_bot | ||
group by 1 order by 1 |
1 change: 1 addition & 0 deletions
1
examples/modern_data_stack_assets/mds_dbt/models/orders_cleaned.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
select * from {{ source('postgres_replica', 'orders') }} |
36 changes: 36 additions & 0 deletions
36
examples/modern_data_stack_assets/mds_dbt/models/schema.yml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
version: 2 | ||
|
||
models: | ||
- name: daily_order_summary | ||
description: "Daily metrics for orders placed on this platform." | ||
columns: | ||
- name: order_date | ||
description: "The UTC day for which these orders were aggregated." | ||
data_type: "date" | ||
- name: total_value | ||
description: "The total value of all orders placed on this day." | ||
data_type: "float" | ||
- name: num_orders | ||
description: "The total number of orders placed on this day." | ||
data_type: "int" | ||
- name: orders_cleaned | ||
description: "Filtered version of the raw orders data." | ||
columns: | ||
- name: "user_id" | ||
description: "Platform id of the user that placed this order." | ||
data_type: "int" | ||
- name: "order_time" | ||
description: "The timestamp (in UTC) that this order was placed." | ||
data_type: "timestamp" | ||
- name: "order_value" | ||
description: "The dollar amount that this order was placed for." | ||
data_type: "float" | ||
- name: users_augmented | ||
description: "Raw users data augmented with backend data." | ||
columns: | ||
- name: "user_id" | ||
description: "Platform id for this user." | ||
data_type: "int" | ||
- name: "is_spam" | ||
description: "True if this user has been marked as a fraudulent account." | ||
data_type: "bool" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
version: 2 | ||
|
||
sources: | ||
- name: postgres_replica | ||
database: postgres_replica | ||
schema: public | ||
tables: | ||
- name: users | ||
- name: orders |
1 change: 1 addition & 0 deletions
1
examples/modern_data_stack_assets/mds_dbt/models/users_augmented.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
select * from {{ source('postgres_replica', 'users') }} |
Empty file.
1 change: 1 addition & 0 deletions
1
examples/modern_data_stack_assets/modern_data_stack_assets/__init__.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from .assets import analytics_assets |
53 changes: 53 additions & 0 deletions
53
examples/modern_data_stack_assets/modern_data_stack_assets/assets.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
from typing import Any, Tuple | ||
|
||
import numpy as np | ||
import pandas as pd | ||
from dagster_airbyte import airbyte_resource, build_airbyte_assets | ||
from dagster_dbt import dbt_cli_resource, load_assets_from_dbt_project | ||
from scipy import optimize | ||
|
||
from dagster import AssetGroup, Output, asset | ||
|
||
from .constants import * | ||
from .pandas_io_manager import pandas_io_manager | ||
|
||
airbyte_assets = build_airbyte_assets( | ||
connection_id=AIRBYTE_CONNECTION_ID, destination_tables=["orders", "users"] | ||
) | ||
|
||
dbt_assets = load_assets_from_dbt_project( | ||
project_dir=DBT_PROJECT_DIR, io_manager_key="pandas_io_manager" | ||
) | ||
|
||
|
||
@asset(compute_kind="python") | ||
def order_forecast_model(daily_order_summary: pd.DataFrame) -> Any: | ||
"""Model parameters that best fit the observed data""" | ||
df = daily_order_summary | ||
return tuple( | ||
optimize.curve_fit( | ||
f=model_func, xdata=df.order_date.astype(np.int64), ydata=df.num_orders, p0=[10, 100] | ||
)[0] | ||
) | ||
|
||
|
||
@asset(compute_kind="python", io_manager_key="pandas_io_manager") | ||
def predicted_orders( | ||
daily_order_summary: pd.DataFrame, order_forecast_model: Tuple[float, float] | ||
) -> pd.DataFrame: | ||
"""Predicted orders for the next 30 days based on the fit paramters""" | ||
a, b = order_forecast_model | ||
start_date = daily_order_summary.order_date.max() | ||
future_dates = pd.date_range(start=start_date, end=start_date + pd.DateOffset(days=30)) | ||
predicted_data = model_func(x=future_dates.astype(np.int64), a=a, b=b) | ||
return pd.DataFrame({"order_date": future_dates, "num_orders": predicted_data}) | ||
|
||
|
||
analytics_assets = AssetGroup( | ||
airbyte_assets + dbt_assets + [order_forecast_model, predicted_orders], | ||
resource_defs={ | ||
"airbyte": airbyte_resource.configured(AIRBYTE_CONFIG), | ||
"dbt": dbt_cli_resource.configured(DBT_CONFIG), | ||
"pandas_io_manager": pandas_io_manager.configured(PG_CONFIG), | ||
}, | ||
).build_job("Assets") |
20 changes: 20 additions & 0 deletions
20
examples/modern_data_stack_assets/modern_data_stack_assets/constants.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
import numpy as np | ||
from dagster_postgres.utils import get_conn_string | ||
|
||
from dagster.utils import file_relative_path | ||
|
||
|
||
def model_func(x, a, b): | ||
return a * np.exp(b * (x / 10**18 - 1.6095)) | ||
|
||
|
||
AIRBYTE_CONNECTION_ID = "your_airbyte_connection_id" | ||
AIRBYTE_CONFIG = {"host": "localhost", "port": "8000"} | ||
DBT_PROJECT_DIR = file_relative_path(__file__, "../mds_dbt") | ||
DBT_PROFILES_DIR = file_relative_path(__file__, "../mds_dbt/config") | ||
DBT_CONFIG = {"project_dir": DBT_PROJECT_DIR, "profiles_dir": DBT_PROFILES_DIR} | ||
PG_CONFIG = { | ||
"con_string": get_conn_string( | ||
username="postgres", password="password", hostname="localhost", db_name="postgres_replica" | ||
) | ||
} |
44 changes: 44 additions & 0 deletions
44
examples/modern_data_stack_assets/modern_data_stack_assets/create_data.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
import random | ||
|
||
import numpy as np | ||
import pandas as pd | ||
from dagster_postgres.utils import get_conn_string | ||
|
||
START_DATE = pd.to_datetime("2021-01-01") | ||
END_DATE = pd.to_datetime("2022-01-01") | ||
|
||
N_USERS = 100 | ||
N_ORDERS = 10000 | ||
|
||
|
||
def random_dates(start, end): | ||
|
||
start_u = start.value // 10**9 | ||
end_u = end.value // 10**9 | ||
|
||
dist = np.random.standard_exponential(size=N_ORDERS) / 10 | ||
|
||
clipped_flipped_dist = 1 - dist[dist <= 1] | ||
|
||
return pd.to_datetime((clipped_flipped_dist * (end_u - start_u)) + start_u, unit="s") | ||
|
||
|
||
con_string = get_conn_string( | ||
username="postgres", password="password", hostname="localhost", db_name="postgres" | ||
) | ||
|
||
users = pd.DataFrame( | ||
{"user_id": range(N_USERS), "is_bot": [random.choice([True, False]) for _ in range(N_USERS)]} | ||
) | ||
|
||
users.to_sql("users", con=con_string, if_exists="replace") | ||
|
||
orders = pd.DataFrame( | ||
{ | ||
"user_id": [random.randint(0, N_USERS) for _ in range(N_ORDERS)], | ||
"order_time": random_dates(START_DATE, END_DATE), | ||
"order_value": np.random.normal(loc=100.0, scale=15.0, size=N_ORDERS), | ||
} | ||
) | ||
|
||
orders.to_sql("orders", con=con_string, if_exists="replace") |
26 changes: 26 additions & 0 deletions
26
examples/modern_data_stack_assets/modern_data_stack_assets/forecasting.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
from typing import Any, Tuple | ||
|
||
import numpy as np | ||
import pandas as pd | ||
from scipy import optimize | ||
|
||
from .constants import model_func | ||
|
||
|
||
def order_forecast_model(daily_order_summary: pd.DataFrame) -> Tuple[Any, Any]: | ||
"""Model parameters that best fit the observed data""" | ||
df = daily_order_summary | ||
return optimize.curve_fit( | ||
f=model_func, xdata=df.order_date.astype(np.int64), ydata=df.num_orders, p0=[10, 100] | ||
)[0] | ||
|
||
|
||
def predicted_orders( | ||
daily_order_summary: pd.DataFrame, order_forecast_model: Tuple[float, float] | ||
) -> pd.DataFrame: | ||
"""Predicted orders for the next 30 days based on the fit paramters""" | ||
a, b = order_forecast_model | ||
start_date = daily_order_summary.order_date.max() | ||
future_dates = pd.date_range(start=start_date, end=start_date + pd.DateOffset(days=30)) | ||
predicted_data = model_func(x=future_dates.astype(np.int64), a=a, b=b) | ||
return pd.DataFrame({"order_date": future_dates, "num_orders": predicted_data}) |
34 changes: 34 additions & 0 deletions
34
examples/modern_data_stack_assets/modern_data_stack_assets/pandas_io_manager.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
import pandas as pd | ||
|
||
from dagster import IOManager, check, io_manager | ||
|
||
|
||
class PandasIOManager(IOManager): | ||
"""Sample IOManager to handle loading the contents of tables as pandas DataFrames. | ||
Does not handle cases where data is written to different schemas for different outputs, and | ||
uses the name of the asset key as the table name. | ||
""" | ||
|
||
def __init__(self, con_string: str): | ||
self._con = con_string | ||
|
||
def handle_output(self, context, obj): | ||
if isinstance(obj, pd.DataFrame): | ||
# write df to table | ||
obj.to_sql(name=context.asset_key.path[-1], con=self._con, if_exists="replace") | ||
elif obj is None: | ||
# dbt has already written the data to this table | ||
pass | ||
else: | ||
raise check.CheckError(f"Unsupported object type {type(obj)} for PandasIOManager.") | ||
|
||
def load_input(self, context) -> pd.DataFrame: | ||
"""Load the contents of a table as a pandas DataFrame.""" | ||
model_name = context.upstream_output.asset_key.path[-1] | ||
return pd.read_sql(f"SELECT * FROM {model_name}", con=self._con) | ||
|
||
|
||
@io_manager(config_schema={"con_string": str}) | ||
def pandas_io_manager(context): | ||
return PandasIOManager(context.resource_config["con_string"]) |
3 changes: 3 additions & 0 deletions
3
examples/modern_data_stack_assets/modern_data_stack_assets_tests/test_repo_loads.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
def test_repo_loads(): | ||
# placeholder for future testing | ||
assert True |
Oops, something went wrong.