Skip to content

Commit

Permalink
🎉 Incremental Normalization (airbytehq#7162)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopheDuong authored Oct 29, 2021
1 parent 9c78351 commit 5fc50df
Show file tree
Hide file tree
Showing 62 changed files with 1,833 additions and 346 deletions.
3 changes: 2 additions & 1 deletion airbyte-integrations/bases/base-normalization/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ integration_tests/normalization_test_output/*/*/*.json
integration_tests/normalization_test_output/*/*/*.md
integration_tests/normalization_test_output/*/*/macros/
integration_tests/normalization_test_output/*/*/tests/
integration_tests/normalization_test_output/*/*/models/dbt_data_tests/
integration_tests/normalization_test_output/*/*/models/dbt_schema_tests/

integration_tests/normalization_test_output/*/*/modified_models/
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.56
LABEL io.airbyte.version=0.1.58
LABEL io.airbyte.name=airbyte/normalization
5 changes: 4 additions & 1 deletion airbyte-integrations/bases/base-normalization/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ or directly with pytest:
NORMALIZATION_TEST_TARGET=postgres pytest airbyte-integrations/bases/base-normalization/integration_tests

Note that these tests are connecting and processing data on top of real data warehouse destinations.
Therefore, valid credentials files are expected to be injected in the `secrets/` folder in order to run
Therefore, valid credentials files are expected to be injected in the `secrets/` folder in order to run
(not included in git repository).

This is usually automatically done by the CI thanks to the `tools/bin/ci_credentials.sh` script or you can
Expand Down Expand Up @@ -217,6 +217,9 @@ So, for each target destination, the steps run by the tests are:
7. Execute dbt cli command: `dbt tests` from the test workspace folder to run verifications and checks with dbt.
8. Optional checks (nothing for the moment)

Note that the tests are using the normalization code from the python files directly, so it is not necessary to rebuild the docker images
in between when iterating on the code base. However, dbt cli and destination connectors are invoked thanks to the dev docker images.

### Integration Test Checks:

#### dbt schema tests:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,20 @@ quoting:
# are materialized, and more!
models:
airbyte_utils:
+materialized: table
generated:
airbyte_ctes:
+tags: airbyte_internal_cte
+materialized: ephemeral
airbyte_views:
+tags: airbyte_internal_views
+materialized: view
airbyte_incremental:
+tags: incremental_tables
+materialized: incremental
airbyte_tables:
+tags: normalized_tables
+materialized: table
+materialized: table
airbyte_views:
+tags: airbyte_internal_views
+materialized: view

vars:
dbt_utils_dispatch_list: ['airbyte_utils']
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,22 @@ quoting:
# are materialized, and more!
models:
airbyte_utils:
+materialized: table
generated:
airbyte_ctes:
+tags: airbyte_internal_cte
+materialized: ephemeral
airbyte_views:
+tags: airbyte_internal_views
+materialized: view
airbyte_incremental:
+tags: incremental_tables
# incremental is not enabled for MySql yet
#+materialized: incremental
+materialized: table
airbyte_tables:
+tags: normalized_tables
+materialized: table
+materialized: table
airbyte_views:
+tags: airbyte_internal_views
+materialized: view

vars:
dbt_utils_dispatch_list: ['airbyte_utils']
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,22 @@ quoting:
# are materialized, and more!
models:
airbyte_utils:
+materialized: table
generated:
airbyte_ctes:
+tags: airbyte_internal_cte
+materialized: ephemeral
airbyte_views:
+tags: airbyte_internal_views
+materialized: view
airbyte_incremental:
+tags: incremental_tables
# incremental is not enabled for Oracle yet
#+materialized: incremental
+materialized: table
airbyte_tables:
+tags: normalized_tables
+materialized: table
+materialized: table
airbyte_views:
+tags: airbyte_internal_views
+materialized: view

vars:
dbt_utils_dispatch_list: ['airbyte_utils']
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,21 @@ quoting:
# are materialized, and more!
models:
airbyte_utils:
+materialized: table
generated:
airbyte_ctes:
+tags: airbyte_internal_cte
+materialized: ephemeral
airbyte_views:
+tags: airbyte_internal_views
+materialized: view
airbyte_incremental:
+tags: incremental_tables
+materialized: incremental
+on_schema_change: sync_all_columns
airbyte_tables:
+tags: normalized_tables
+materialized: table
+materialized: table
airbyte_views:
+tags: airbyte_internal_views
+materialized: view

dispatch:
- macro_namespace: dbt_utils
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{% macro mysql__current_timestamp() %}
CURRENT_TIMESTAMP
{% endmacro %}

{% macro oracle__current_timestamp() %}
CURRENT_TIMESTAMP
{% endmacro %}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{#
These macros control how incremental models are updated in Airbyte's normalization step
- get_max_normalized_cursor retrieve the value of the last normalized data
- incremental_clause controls the predicate to filter on new data to process incrementally
#}
{% macro incremental_clause(col_emitted_at) -%}
{{ adapter.dispatch('incremental_clause')(col_emitted_at) }}
{%- endmacro %}
{%- macro default__incremental_clause(col_emitted_at) -%}
{% if is_incremental() %}
and {{ col_emitted_at }} >= (select max({{ col_emitted_at }}) from {{ this }})
{% endif %}
{%- endmacro -%}
{# -- see https://on-systems.tech/113-beware-dbt-incremental-updates-against-snowflake-external-tables/ #}
{%- macro snowflake__incremental_clause(col_emitted_at) -%}
{% if is_incremental() %}
and {{ col_emitted_at }} >= cast('{{ get_max_normalized_cursor(col_emitted_at) }}' as {{ type_timestamp_with_timezone() }})
{% endif %}
{%- endmacro -%}
{% macro get_max_normalized_cursor(col_emitted_at) %}
{% if execute and is_incremental() %}
{% if env_var('INCREMENTAL_CURSOR', 'UNSET') == 'UNSET' %}
{% set query %}
select coalesce(max({{ col_emitted_at }}), cast('1970-01-01 00:00:00' as {{ type_timestamp_with_timezone() }})) from {{ this }}
{% endset %}
{% set max_cursor = run_query(query).columns[0][0] %}
{% do return(max_cursor) %}
{% else %}
{% do return(env_var('INCREMENTAL_CURSOR')) %}
{% endif %}
{% endif %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{#
This overrides the behavior of the macro `should_full_refresh` so full refresh are triggered if:
- the dbt cli is run with --full-refresh flag or the model is configured explicitly to full_refresh
- the column _airbyte_ab_id does not exists in the normalized tables and make sure it is well populated.
#}

{%- macro need_full_refresh(col_ab_id, target_table=this) -%}
{%- if not execute -%}
{{ return(false) }}
{%- endif -%}
{%- set found_column = [] %}
{%- set cols = adapter.get_columns_in_relation(target_table) -%}
{%- for col in cols -%}
{%- if col.column == col_ab_id -%}
{% do found_column.append(col.column) %}
{%- endif -%}
{%- endfor -%}
{%- if found_column -%}
{{ return(false) }}
{%- else -%}
{{ dbt_utils.log_info(target_table ~ "." ~ col_ab_id ~ " does not exist. The table needs to be rebuilt in full_refresh") }}
{{ return(true) }}
{%- endif -%}
{%- endmacro -%}

{%- macro should_full_refresh() -%}
{% set config_full_refresh = config.get('full_refresh') %}
{%- if config_full_refresh is none -%}
{% set config_full_refresh = flags.FULL_REFRESH %}
{%- endif -%}
{%- if not config_full_refresh -%}
{% set config_full_refresh = need_full_refresh(get_col_ab_id(), this) %}
{%- endif -%}
{% do return(config_full_refresh) %}
{%- endmacro -%}

{%- macro get_col_ab_id() -%}
{{ adapter.dispatch('get_col_ab_id')() }}
{%- endmacro -%}

{%- macro default__get_col_ab_id() -%}
_airbyte_ab_id
{%- endmacro -%}

{%- macro oracle__get_col_ab_id() -%}
"_AIRBYTE_AB_ID"
{%- endmacro -%}

{%- macro snowflake__get_col_ab_id() -%}
_AIRBYTE_AB_ID
{%- endmacro -%}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{#
Similar to the star macro here: https://github.com/dbt-labs/dbt-utils/blob/main/macros/sql/star.sql

This star_intersect macro takes an additional 'intersect' relation as argument.
Its behavior is to select columns from both 'intersect' and 'from' relations with the following rules:
- if the columns are existing in both 'from' and the 'intersect' relations, then the column from 'intersect' is used
- if it's not in the both relation, then only the column in the 'from' relation is used
#}
{% macro star_intersect(from, intersect, from_alias=False, intersect_alias=False, except=[]) -%}
{%- do dbt_utils._is_relation(from, 'star_intersect') -%}
{%- do dbt_utils._is_ephemeral(from, 'star_intersect') -%}
{%- do dbt_utils._is_relation(intersect, 'star_intersect') -%}
{%- do dbt_utils._is_ephemeral(intersect, 'star_intersect') -%}
{#-- Prevent querying of db in parsing mode. This works because this macro does not create any new refs. #}
{%- if not execute -%}
{{ return('') }}
{% endif %}
{%- set include_cols = [] %}
{%- set cols = adapter.get_columns_in_relation(from) -%}
{%- set except = except | map("lower") | list %}
{%- for col in cols -%}
{%- if col.column|lower not in except -%}
{% do include_cols.append(col.column) %}
{%- endif %}
{%- endfor %}
{%- set include_intersect_cols = [] %}
{%- set intersect_cols = adapter.get_columns_in_relation(intersect) -%}
{%- for col in intersect_cols -%}
{%- if col.column|lower not in except -%}
{% do include_intersect_cols.append(col.column) %}
{%- endif %}
{%- endfor %}
{%- for col in include_cols %}
{%- if col in include_intersect_cols -%}
{%- if intersect_alias %}{{ intersect_alias }}.{% else %}{%- endif -%}{{ adapter.quote(col)|trim }}
{%- if not loop.last %},{{ '\n ' }}{% endif %}
{%- else %}
{%- if from_alias %}{{ from_alias }}.{% else %}{{ from }}.{%- endif -%}{{ adapter.quote(col)|trim }} as {{ adapter.quote(col)|trim }}
{%- if not loop.last %},{{ '\n ' }}{% endif %}
{%- endif %}
{%- endfor -%}
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import os
import random
import re
import shutil
import socket
import string
import subprocess
Expand Down Expand Up @@ -298,21 +297,25 @@ def get_normalization_image(destination_type: DestinationType) -> str:
else:
return "airbyte/normalization:dev"

def dbt_run(self, destination_type: DestinationType, test_root_dir: str):
def dbt_check(self, destination_type: DestinationType, test_root_dir: str):
"""
Run the dbt CLI to perform transformations on the test raw data in the destination
"""
normalization_image: str = self.get_normalization_image(destination_type)
# Perform sanity check on dbt project settings
assert self.run_check_dbt_command(normalization_image, "debug", test_root_dir)
assert self.run_check_dbt_command(normalization_image, "deps", test_root_dir)
final_sql_files = os.path.join(test_root_dir, "final")
shutil.rmtree(final_sql_files, ignore_errors=True)

def dbt_run(self, destination_type: DestinationType, test_root_dir: str, force_full_refresh: bool = False):
"""
Run the dbt CLI to perform transformations on the test raw data in the destination
"""
normalization_image: str = self.get_normalization_image(destination_type)
# Compile dbt models files into destination sql dialect, then run the transformation queries
assert self.run_check_dbt_command(normalization_image, "run", test_root_dir)
assert self.run_check_dbt_command(normalization_image, "run", test_root_dir, force_full_refresh)

@staticmethod
def run_check_dbt_command(normalization_image: str, command: str, cwd: str) -> bool:
def run_check_dbt_command(normalization_image: str, command: str, cwd: str, force_full_refresh: bool = False) -> bool:
"""
Run dbt subprocess while checking and counting for "ERROR", "FAIL" or "WARNING" printed in its outputs
"""
Expand All @@ -327,7 +330,7 @@ def run_check_dbt_command(normalization_image: str, command: str, cwd: str) -> b
"-v",
f"{cwd}/build:/build",
"-v",
f"{cwd}/final:/build/run/airbyte_utils/models/generated",
f"{cwd}/logs:/logs",
"-v",
"/tmp:/tmp",
"--network",
Expand All @@ -340,6 +343,9 @@ def run_check_dbt_command(normalization_image: str, command: str, cwd: str) -> b
"--profiles-dir=/workspace",
"--project-dir=/workspace",
]
if force_full_refresh:
commands.append("--full-refresh")
command = f"{command} --full-refresh"
print("Executing: ", " ".join(commands))
print(f"Equivalent to: dbt {command} --profiles-dir={cwd} --project-dir={cwd}")
with open(os.path.join(cwd, "dbt_output.log"), "ab") as f:
Expand Down Expand Up @@ -424,6 +430,6 @@ def get_test_targets() -> List[str]:
"""
if os.getenv(NORMALIZATION_TEST_TARGET):
target_str = os.getenv(NORMALIZATION_TEST_TARGET)
return [d.value for d in {DestinationType.from_string(s) for s in target_str.split(",")}]
return [d.value for d in {DestinationType.from_string(s.strip()) for s in target_str.split(",")}]
else:
return [d.value for d in DestinationType]
Loading

0 comments on commit 5fc50df

Please sign in to comment.