This tutorial will describe how to integrate SQL based transformations with Airbyte syncs using specialized transformation tool: DBT.
This tutorial is the second part of the previous tutorial Connecting EL with T using SQL.
The tool in charge of transformation behind the scenes is actually called DBT (Data Build Tool).
Before generating the SQL files as we've seen previously, Airbyte is setting up internally a Docker image where DBT is installed, and a DBT project is created as described in their documentation. It is run afterward, thanks to DBT CLI.
In the future, we will work on improving this DBT integration further... For example, it would probably be easier to customize and import your own DBT project within the Airbyte pipeline or connect with the DBT Cloud.
However, for now, let's see how to interact with the DBT tool.
Since the whole DBT project is properly configured, it is possible to invoke the CLI from within the docker image to trigger transformation processing:
#!/usr/bin/env bash
docker run --rm -i -v airbyte_workspace:/data -w /data/$NORMALIZE_WORKSPACE/normalize --network host --entrypoint /usr/local/bin/dbt airbyte/normalization debug --profiles-dir=. --project-dir=.
docker run --rm -i -v airbyte_workspace:/data -w /data/$NORMALIZE_WORKSPACE/normalize --network host --entrypoint /usr/local/bin/dbt airbyte/normalization run --profiles-dir=. --project-dir=.
Example Output:
Running with dbt=0.18.1
dbt version: 0.18.1
python version: 3.7.9
python path: /usr/local/bin/python
os info: Linux-4.19.121-linuxkit-x86_64-with-debian-10.6
Using profiles.yml file at ./profiles.yml
Using dbt_project.yml file at /data/5/0/normalize/dbt_project.yml
Configuration:
profiles.yml file [OK found and valid]
dbt_project.yml file [OK found and valid]
Required dependencies:
- git [OK found]
Connection:
host: localhost
port: 3000
user: postgres
database: postgres
schema: quarantine
search_path: None
keepalives_idle: 0
sslmode: None
Connection test: OK connection ok
Running with dbt=0.18.1
Found 1 model, 0 tests, 0 snapshots, 0 analyses, 302 macros, 0 operations, 0 seed files, 1 source
14:37:10 | Concurrency: 32 threads (target='prod')
14:37:10 |
14:37:10 | 1 of 1 START table model quarantine.covid_epidemiology....................................................... [RUN]
14:37:11 | 1 of 1 OK created table model quarantine.covid_epidemiology.................................................. [SELECT 17911 in 0.33s]
14:37:11 |
14:37:11 | Finished running 1 table model in 0.50s.
Completed successfully
Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
As seen in the tutorial on exploring workspace folder, it is possible to browse the normalize
folder and examine further logs if an error occurs.
In particular, we can also take a look at the DBT models generated by Airbyte and export them to the local host filesystem:
#!/usr/bin/env bash
TUTORIAL_DIR="$(pwd)/tutorial/"
rm -rf $TUTORIAL_DIR/normalization-files
mkdir -p $TUTORIAL_DIR/normalization-files
docker cp airbyte-server:/tmp/workspace/$NORMALIZE_WORKSPACE/normalize/ $TUTORIAL_DIR/normalization-files
NORMALIZE_DIR=$TUTORIAL_DIR/normalization-files/normalize
cd $NORMALIZE_DIR
cat $NORMALIZE_DIR/models/generated/*.sql
Example Output:
with
covid_epidemiology_node as (
select
_airbyte_emitted_at,
{{ dbt_utils.current_timestamp_in_utc() }} as _airbyte_normalized_at,
cast({{ json_extract_scalar('_airbyte_data', ['date']) }} as {{ dbt_utils.type_string() }}) as date,
cast({{ json_extract_scalar('_airbyte_data', ['new_recovered']) }} as {{ dbt_utils.type_float() }}) as new_recovered,
cast({{ json_extract_scalar('_airbyte_data', ['new_tested']) }} as {{ dbt_utils.type_float() }}) as new_tested,
cast({{ json_extract_scalar('_airbyte_data', ['total_deceased']) }} as {{ dbt_utils.type_float() }}) as total_deceased,
cast({{ json_extract_scalar('_airbyte_data', ['new_deceased']) }} as {{ dbt_utils.type_float() }}) as new_deceased,
cast({{ json_extract_scalar('_airbyte_data', ['new_confirmed']) }} as {{ dbt_utils.type_float() }}) as new_confirmed,
cast({{ json_extract_scalar('_airbyte_data', ['total_confirmed']) }} as {{ dbt_utils.type_float() }}) as total_confirmed,
cast({{ json_extract_scalar('_airbyte_data', ['total_tested']) }} as {{ dbt_utils.type_float() }}) as total_tested,
cast({{ json_extract_scalar('_airbyte_data', ['total_recovered']) }} as {{ dbt_utils.type_float() }}) as total_recovered,
cast({{ json_extract_scalar('_airbyte_data', ['key']) }} as {{ dbt_utils.type_string() }}) as key
from {{ source('quarantine', 'covid_epidemiology_raw') }}
),
covid_epidemiology_with_id as (
select
*,
{{ dbt_utils.surrogate_key([
'date',
'new_recovered',
'new_tested',
'total_deceased',
'new_deceased',
'new_confirmed',
'total_confirmed',
'total_tested',
'total_recovered',
'key'
]) }} as _airbyte_covid_epidemiology_hashid
from covid_epidemiology_node
)
select * from covid_epidemiology_with_id
If you have DBT installed on your machine, you can then view, edit, customize and run the dbt models in your project if you want to bypass the normalization steps generated by Airbyte!
#!/usr/bin/env bash
dbt deps --profiles-dir=$NORMALIZE_DIR --project-dir=$NORMALIZE_DIR
dbt run --profiles-dir=$NORMALIZE_DIR --project-dir=$NORMALIZE_DIR --full-refresh
Example Output:
Running with dbt=0.18.1
Installing https://github.com/fishtown-analytics/[email protected]
Installed from revision 0.6.2
Running with dbt=0.18.1
Found 1 model, 0 tests, 0 snapshots, 0 analyses, 302 macros, 0 operations, 0 seed files, 1 source
15:37:54 | Concurrency: 32 threads (target='prod')
15:37:54 |
15:37:55 | 1 of 1 START table model quarantine.covid_epidemiology....................................................... [RUN]
15:37:55 | 1 of 1 OK created table model quarantine.covid_epidemiology.................................................. [SELECT 17911 in 0.30s]
15:37:55 |
15:37:55 | Finished running 1 table model in 0.51s.
Completed successfully
Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
For the moment, it is not possible to make changes to the generated DBT models and commit them back so Airbyte can use it in its next sync of a certain source/destination combination. But, we'll work on such integration in the near future!
Our DBT models are currently composed of only one model per final table to replicate, we would probably refine this and grow the transformation steps further, adding UI elements to configure what should be included or not in the normalization step.
However, if you have ideas on what to do with DBT from this point on, we would be glad to hear your feedbacks and ideas. Thank you!