Skip to content

Commit

Permalink
Normalization: Upgrade MySQL to dbt 1.0.0 (airbytehq#11470)
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere authored Jun 15, 2022
1 parent 7d9f8c9 commit e8146e5
Show file tree
Hide file tree
Showing 24 changed files with 149 additions and 143 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.2.4
LABEL io.airbyte.version=0.2.5
LABEL io.airbyte.name=airbyte/normalization
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# the content will be overwritten by the transform function

# Name your package! Package names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# and underscores. A good package name should reflect your organization"s
# name or the intended use of these models
name: "airbyte_utils"
version: "1.0"
Expand All @@ -13,18 +13,18 @@ config-version: 2
profile: "normalize"

# These configurations specify where dbt should look for different types of files.
# The `source-paths` config, for example, states that source models can be found
# in the "models/" directory. You probably won't need to change these!
source-paths: ["models"]
# The `model-paths` config, for example, states that source models can be found
# in the "models/" directory. You probably won"t need to change these!
model-paths: ["models"]
docs-paths: ["docs"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
data-paths: ["data"]
seed-paths: ["data"]
macro-paths: ["macros"]

target-path: "../build" # directory which will store compiled SQL files
log-path: "../logs" # directory which will store DBT logs
modules-path: "/dbt" # directory which will store external DBT dependencies
packages-install-path: "/dbt" # directory which will store external DBT dependencies

clean-targets: # directories to be removed by `dbt clean`
- "build"
Expand All @@ -37,7 +37,7 @@ quoting:
schema: false
identifier: true

# You can define configurations for models in the `source-paths` directory here.
# You can define configurations for models in the `model-paths` directory here.
# Using these configurations, you can enable or disable models, change how they
# are materialized, and more!
models:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

packages:
- git: "https://github.com/fishtown-analytics/dbt-utils.git"
revision: 0.6.4
revision: 0.8.2
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
concat({{ fields|join(', ') }})
{%- endmacro %}

{% macro mysql__concat(fields) -%}
{#-- MySQL doesn't support the '||' operator as concatenation by default --#}
concat({{ fields|join(', ') }})
{%- endmacro %}

{% macro sqlserver__concat(fields) -%}
{#-- CONCAT() in SQL SERVER accepts from 2 to 254 arguments, we use batches for the main concat, to overcome the limit. --#}
{% set concat_chunks = [] %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ function main() {
openssh "${PROJECT_DIR}/ssh.json"
trap 'closessh' EXIT

set +e # allow script to continue running even if next commands fail to run properly
# We don't run dbt 1.0.x on all destinations (because their plugins don't support it yet)
# So we need to only pass `--event-buffer-size` if it's supported by DBT.
check_dbt_event_buffer_size
Expand All @@ -130,7 +131,6 @@ function main() {
dbt_additional_args=""
fi

set +e # allow script to continue running even if next commands fail to run properly
# Run dbt to compile and execute the generated normalization models
dbt ${dbt_additional_args} run --profiles-dir "${PROJECT_DIR}" --project-dir "${PROJECT_DIR}"
DBT_EXIT_CODE=$?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ def run_check_dbt_command(normalization_image: str, command: str, cwd: str, forc
"""
Run dbt subprocess while checking and counting for "ERROR", "FAIL" or "WARNING" printed in its outputs
"""
if normalization_image.startswith("airbyte/normalization-oracle") or normalization_image.startswith("airbyte/normalization-mysql"):
if normalization_image.startswith("airbyte/normalization-oracle"):
dbtAdditionalArgs = []
else:
dbtAdditionalArgs = ["--event-buffer-size=10000"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
name: airbyte_utils
version: "1.0"
version: '1.0'
config-version: 2
profile: normalize
source-paths:
- models
model-paths:
- models
docs-paths:
- docs
- docs
analysis-paths:
- analysis
- analysis
test-paths:
- tests
data-paths:
- data
- tests
seed-paths:
- data
macro-paths:
- macros
- macros
target-path: ../build
log-path: ../logs
modules-path: /dbt
packages-install-path: /dbt
clean-targets:
- build
- dbt_modules
- build
- dbt_modules
quoting:
database: true
schema: false
Expand All @@ -42,7 +42,7 @@ models:
+materialized: view
vars:
dbt_utils_dispatch_list:
- airbyte_utils
- airbyte_utils
json_column: _airbyte_data
models_to_source:
nested_stream_with_co_1g_into_long_names_ab1: test_normalization._airbyte_raw_nested_s__lting_into_long_names
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
test_normalization.`nested_stream_with_co___long_names_partition__dbt_tmp`
as (

with __dbt__CTE__nested_stream_with_co_2g_names_partition_ab1 as (
with __dbt__cte__nested_stream_with_co_2g_names_partition_ab1 as (

-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: test_normalization.`nested_stream_with_co_1g_into_long_names_scd`
Expand All @@ -24,10 +24,10 @@ from test_normalization.`nested_stream_with_co_1g_into_long_names_scd` as table_
where 1 = 1
and `partition` is not null

), __dbt__CTE__nested_stream_with_co_2g_names_partition_ab2 as (
), __dbt__cte__nested_stream_with_co_2g_names_partition_ab2 as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__CTE__nested_stream_with_co_2g_names_partition_ab1
-- depends_on: __dbt__cte__nested_stream_with_co_2g_names_partition_ab1
select
_airbyte_nested_strea__nto_long_names_hashid,
double_array_data,
Expand All @@ -37,23 +37,23 @@ select

CURRENT_TIMESTAMP
as _airbyte_normalized_at
from __dbt__CTE__nested_stream_with_co_2g_names_partition_ab1
from __dbt__cte__nested_stream_with_co_2g_names_partition_ab1
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition
where 1 = 1

), __dbt__CTE__nested_stream_with_co_2g_names_partition_ab3 as (
), __dbt__cte__nested_stream_with_co_2g_names_partition_ab3 as (

-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__CTE__nested_stream_with_co_2g_names_partition_ab2
-- depends_on: __dbt__cte__nested_stream_with_co_2g_names_partition_ab2
select
md5(cast(concat(coalesce(cast(_airbyte_nested_strea__nto_long_names_hashid as char), ''), '-', coalesce(cast(double_array_data as char), ''), '-', coalesce(cast(`DATA` as char), '')) as char)) as _airbyte_partition_hashid,
tmp.*
from __dbt__CTE__nested_stream_with_co_2g_names_partition_ab2 tmp
from __dbt__cte__nested_stream_with_co_2g_names_partition_ab2 tmp
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition
where 1 = 1

)-- Final base SQL model
-- depends_on: __dbt__CTE__nested_stream_with_co_2g_names_partition_ab3
-- depends_on: __dbt__cte__nested_stream_with_co_2g_names_partition_ab3
select
_airbyte_nested_strea__nto_long_names_hashid,
double_array_data,
Expand All @@ -64,7 +64,7 @@ select
CURRENT_TIMESTAMP
as _airbyte_normalized_at,
_airbyte_partition_hashid
from __dbt__CTE__nested_stream_with_co_2g_names_partition_ab3
from __dbt__cte__nested_stream_with_co_2g_names_partition_ab3
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition from test_normalization.`nested_stream_with_co_1g_into_long_names_scd`
where 1 = 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
test_normalization.`nested_stream_with_co___names_partition_data__dbt_tmp`
as (

with __dbt__CTE__nested_stream_with_co_3es_partition_data_ab1 as (
with __dbt__cte__nested_stream_with_co_3es_partition_data_ab1 as (

-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: test_normalization.`nested_stream_with_co___long_names_partition`
Expand All @@ -20,7 +20,7 @@ with numbers as (
select


p0.generated_number * pow(2, 0)
p0.generated_number * power(2, 0)


+ 1
Expand Down Expand Up @@ -68,10 +68,10 @@ left join joined on _airbyte_partition_hashid = joined._airbyte_hashid
where 1 = 1
and `DATA` is not null

), __dbt__CTE__nested_stream_with_co_3es_partition_data_ab2 as (
), __dbt__cte__nested_stream_with_co_3es_partition_data_ab2 as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__CTE__nested_stream_with_co_3es_partition_data_ab1
-- depends_on: __dbt__cte__nested_stream_with_co_3es_partition_data_ab1
select
_airbyte_partition_hashid,
cast(currency as char(1024)) as currency,
Expand All @@ -80,23 +80,23 @@ select

CURRENT_TIMESTAMP
as _airbyte_normalized_at
from __dbt__CTE__nested_stream_with_co_3es_partition_data_ab1
from __dbt__cte__nested_stream_with_co_3es_partition_data_ab1
-- DATA at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA
where 1 = 1

), __dbt__CTE__nested_stream_with_co_3es_partition_data_ab3 as (
), __dbt__cte__nested_stream_with_co_3es_partition_data_ab3 as (

-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__CTE__nested_stream_with_co_3es_partition_data_ab2
-- depends_on: __dbt__cte__nested_stream_with_co_3es_partition_data_ab2
select
md5(cast(concat(coalesce(cast(_airbyte_partition_hashid as char), ''), '-', coalesce(cast(currency as char), '')) as char)) as _airbyte_data_hashid,
tmp.*
from __dbt__CTE__nested_stream_with_co_3es_partition_data_ab2 tmp
from __dbt__cte__nested_stream_with_co_3es_partition_data_ab2 tmp
-- DATA at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA
where 1 = 1

)-- Final base SQL model
-- depends_on: __dbt__CTE__nested_stream_with_co_3es_partition_data_ab3
-- depends_on: __dbt__cte__nested_stream_with_co_3es_partition_data_ab3
select
_airbyte_partition_hashid,
currency,
Expand All @@ -106,7 +106,7 @@ select
CURRENT_TIMESTAMP
as _airbyte_normalized_at,
_airbyte_data_hashid
from __dbt__CTE__nested_stream_with_co_3es_partition_data_ab3
from __dbt__cte__nested_stream_with_co_3es_partition_data_ab3
-- DATA at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA from test_normalization.`nested_stream_with_co___long_names_partition`
where 1 = 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
test_normalization.`nested_stream_with_co__ion_double_array_data__dbt_tmp`
as (

with __dbt__CTE__nested_stream_with_co_3double_array_data_ab1 as (
with __dbt__cte__nested_stream_with_co_3double_array_data_ab1 as (

-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: test_normalization.`nested_stream_with_co___long_names_partition`
Expand All @@ -20,7 +20,7 @@ with numbers as (
select


p0.generated_number * pow(2, 0)
p0.generated_number * power(2, 0)


+ 1
Expand Down Expand Up @@ -68,10 +68,10 @@ left join joined on _airbyte_partition_hashid = joined._airbyte_hashid
where 1 = 1
and double_array_data is not null

), __dbt__CTE__nested_stream_with_co_3double_array_data_ab2 as (
), __dbt__cte__nested_stream_with_co_3double_array_data_ab2 as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__CTE__nested_stream_with_co_3double_array_data_ab1
-- depends_on: __dbt__cte__nested_stream_with_co_3double_array_data_ab1
select
_airbyte_partition_hashid,
cast(id as char(1024)) as id,
Expand All @@ -80,23 +80,23 @@ select

CURRENT_TIMESTAMP
as _airbyte_normalized_at
from __dbt__CTE__nested_stream_with_co_3double_array_data_ab1
from __dbt__cte__nested_stream_with_co_3double_array_data_ab1
-- double_array_data at nested_stream_with_complex_columns_resulting_into_long_names/partition/double_array_data
where 1 = 1

), __dbt__CTE__nested_stream_with_co_3double_array_data_ab3 as (
), __dbt__cte__nested_stream_with_co_3double_array_data_ab3 as (

-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__CTE__nested_stream_with_co_3double_array_data_ab2
-- depends_on: __dbt__cte__nested_stream_with_co_3double_array_data_ab2
select
md5(cast(concat(coalesce(cast(_airbyte_partition_hashid as char), ''), '-', coalesce(cast(id as char), '')) as char)) as _airbyte_double_array_data_hashid,
tmp.*
from __dbt__CTE__nested_stream_with_co_3double_array_data_ab2 tmp
from __dbt__cte__nested_stream_with_co_3double_array_data_ab2 tmp
-- double_array_data at nested_stream_with_complex_columns_resulting_into_long_names/partition/double_array_data
where 1 = 1

)-- Final base SQL model
-- depends_on: __dbt__CTE__nested_stream_with_co_3double_array_data_ab3
-- depends_on: __dbt__cte__nested_stream_with_co_3double_array_data_ab3
select
_airbyte_partition_hashid,
id,
Expand All @@ -106,7 +106,7 @@ select
CURRENT_TIMESTAMP
as _airbyte_normalized_at,
_airbyte_double_array_data_hashid
from __dbt__CTE__nested_stream_with_co_3double_array_data_ab3
from __dbt__cte__nested_stream_with_co_3double_array_data_ab3
-- double_array_data at nested_stream_with_complex_columns_resulting_into_long_names/partition/double_array_data from test_normalization.`nested_stream_with_co___long_names_partition`
where 1 = 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
test_normalization.`nested_stream_with_co___long_names_partition__dbt_tmp`
as (

with __dbt__CTE__nested_stream_with_co_2g_names_partition_ab1 as (
with __dbt__cte__nested_stream_with_co_2g_names_partition_ab1 as (

-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: test_normalization.`nested_stream_with_co_1g_into_long_names_scd`
Expand All @@ -24,10 +24,10 @@ from test_normalization.`nested_stream_with_co_1g_into_long_names_scd` as table_
where 1 = 1
and `partition` is not null

), __dbt__CTE__nested_stream_with_co_2g_names_partition_ab2 as (
), __dbt__cte__nested_stream_with_co_2g_names_partition_ab2 as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__CTE__nested_stream_with_co_2g_names_partition_ab1
-- depends_on: __dbt__cte__nested_stream_with_co_2g_names_partition_ab1
select
_airbyte_nested_strea__nto_long_names_hashid,
double_array_data,
Expand All @@ -37,23 +37,23 @@ select

CURRENT_TIMESTAMP
as _airbyte_normalized_at
from __dbt__CTE__nested_stream_with_co_2g_names_partition_ab1
from __dbt__cte__nested_stream_with_co_2g_names_partition_ab1
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition
where 1 = 1

), __dbt__CTE__nested_stream_with_co_2g_names_partition_ab3 as (
), __dbt__cte__nested_stream_with_co_2g_names_partition_ab3 as (

-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__CTE__nested_stream_with_co_2g_names_partition_ab2
-- depends_on: __dbt__cte__nested_stream_with_co_2g_names_partition_ab2
select
md5(cast(concat(coalesce(cast(_airbyte_nested_strea__nto_long_names_hashid as char), ''), '-', coalesce(cast(double_array_data as char), ''), '-', coalesce(cast(`DATA` as char), '')) as char)) as _airbyte_partition_hashid,
tmp.*
from __dbt__CTE__nested_stream_with_co_2g_names_partition_ab2 tmp
from __dbt__cte__nested_stream_with_co_2g_names_partition_ab2 tmp
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition
where 1 = 1

)-- Final base SQL model
-- depends_on: __dbt__CTE__nested_stream_with_co_2g_names_partition_ab3
-- depends_on: __dbt__cte__nested_stream_with_co_2g_names_partition_ab3
select
_airbyte_nested_strea__nto_long_names_hashid,
double_array_data,
Expand All @@ -64,7 +64,7 @@ select
CURRENT_TIMESTAMP
as _airbyte_normalized_at,
_airbyte_partition_hashid
from __dbt__CTE__nested_stream_with_co_2g_names_partition_ab3
from __dbt__cte__nested_stream_with_co_2g_names_partition_ab3
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition from test_normalization.`nested_stream_with_co_1g_into_long_names_scd`
where 1 = 1

Expand Down
Loading

0 comments on commit e8146e5

Please sign in to comment.