This tutorial will describe how to integrate SQL based transformations with Airbyte syncs using specialized transformation tool: DBT.
This tutorial is the second part of the previous tutorial Connecting EL with T using SQL.
The tool in charge of transformation behind the scenes is actually called DBT (Data Build Tool).
Before generating the SQL files as we've seen previously, Airbyte is setting up internally a Docker image where DBT is installed, and a DBT project is created as described in their documentation. It is run afterward, thanks to DBT CLI.
In the future, we will work on improving this DBT integration further... For example, it would probably be easier to customize and import your own DBT project within the Airbyte pipeline or connect with the DBT Cloud.
However, for now, let's see how to interact with the DBT tool.
Since the whole DBT project is properly configured, it is possible to invoke the CLI from within the docker image to trigger transformation processing:
#!/usr/bin/env bashdocker run --rm -i -v airbyte_workspace:/data -w /data/$NORMALIZE_WORKSPACE/normalize --network host --entrypoint /usr/local/bin/dbt airbyte/normalization debug --profiles-dir=. --project-dir=.docker run --rm -i -v airbyte_workspace:/data -w /data/$NORMALIZE_WORKSPACE/normalize --network host --entrypoint /usr/local/bin/dbt airbyte/normalization run --profiles-dir=. --project-dir=.
Example Output:
Running with dbt=0.18.1dbt version: 0.18.1python version: 3.7.9python path: /usr/local/bin/pythonos info: Linux-4.19.121-linuxkit-x86_64-with-debian-10.6Using profiles.yml file at ./profiles.ymlUsing dbt_project.yml file at /data/5/0/normalize/dbt_project.yml​Configuration:profiles.yml file [OK found and valid]dbt_project.yml file [OK found and valid]​Required dependencies:- git [OK found]​Connection:host: localhostport: 3000user: postgresdatabase: postgresschema: quarantinesearch_path: Nonekeepalives_idle: 0sslmode: NoneConnection test: OK connection ok​Running with dbt=0.18.1Found 1 model, 0 tests, 0 snapshots, 0 analyses, 302 macros, 0 operations, 0 seed files, 1 source​14:37:10 | Concurrency: 32 threads (target='prod')14:37:10 |14:37:10 | 1 of 1 START table model quarantine.covid_epidemiology....................................................... [RUN]14:37:11 | 1 of 1 OK created table model quarantine.covid_epidemiology.................................................. [SELECT 17911 in 0.33s]14:37:11 |14:37:11 | Finished running 1 table model in 0.50s.​Completed successfully​Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
As seen in the tutorial on exploring workspace folder, it is possible to browse the normalize
folder and examine further logs if an error occurs.
In particular, we can also take a look at the DBT models generated by Airbyte and export them to the local host filesystem:
#!/usr/bin/env bash​TUTORIAL_DIR="$(pwd)/tutorial/"rm -rf $TUTORIAL_DIR/normalization-filesmkdir -p $TUTORIAL_DIR/normalization-files​docker cp airbyte-server:/tmp/workspace/$NORMALIZE_WORKSPACE/normalize/ $TUTORIAL_DIR/normalization-files​NORMALIZE_DIR=$TUTORIAL_DIR/normalization-files/normalizecd $NORMALIZE_DIRcat $NORMALIZE_DIR/models/generated/*.sql
Example Output:
withcovid_epidemiology_node as (select_airbyte_emitted_at,{{ dbt_utils.current_timestamp_in_utc() }} as _airbyte_normalized_at,cast({{ json_extract_scalar('_airbyte_data', ['date']) }} as {{ dbt_utils.type_string() }}) as date,cast({{ json_extract_scalar('_airbyte_data', ['new_recovered']) }} as {{ dbt_utils.type_float() }}) as new_recovered,cast({{ json_extract_scalar('_airbyte_data', ['new_tested']) }} as {{ dbt_utils.type_float() }}) as new_tested,cast({{ json_extract_scalar('_airbyte_data', ['total_deceased']) }} as {{ dbt_utils.type_float() }}) as total_deceased,cast({{ json_extract_scalar('_airbyte_data', ['new_deceased']) }} as {{ dbt_utils.type_float() }}) as new_deceased,cast({{ json_extract_scalar('_airbyte_data', ['new_confirmed']) }} as {{ dbt_utils.type_float() }}) as new_confirmed,cast({{ json_extract_scalar('_airbyte_data', ['total_confirmed']) }} as {{ dbt_utils.type_float() }}) as total_confirmed,cast({{ json_extract_scalar('_airbyte_data', ['total_tested']) }} as {{ dbt_utils.type_float() }}) as total_tested,cast({{ json_extract_scalar('_airbyte_data', ['total_recovered']) }} as {{ dbt_utils.type_float() }}) as total_recovered,cast({{ json_extract_scalar('_airbyte_data', ['key']) }} as {{ dbt_utils.type_string() }}) as keyfrom {{ source('quarantine', 'covid_epidemiology_raw') }}),covid_epidemiology_with_id as (select*,{{ dbt_utils.surrogate_key(['date','new_recovered','new_tested','total_deceased','new_deceased','new_confirmed','total_confirmed','total_tested','total_recovered','key']) }} as _airbyte_covid_epidemiology_hashidfrom covid_epidemiology_node)select * from covid_epidemiology_with_id
If you have DBT installed on your machine, you can then view, edit, customize and run the dbt models in your project if you want to bypass the normalization steps generated by Airbyte!
#!/usr/bin/env bash​dbt deps --profiles-dir=$NORMALIZE_DIR --project-dir=$NORMALIZE_DIRdbt run --profiles-dir=$NORMALIZE_DIR --project-dir=$NORMALIZE_DIR --full-refresh
Example Output:
Running with dbt=0.18.1Installing https://github.com/fishtown-analytics/[email protected]Installed from revision 0.6.2​Running with dbt=0.18.1Found 1 model, 0 tests, 0 snapshots, 0 analyses, 302 macros, 0 operations, 0 seed files, 1 source​15:37:54 | Concurrency: 32 threads (target='prod')15:37:54 |15:37:55 | 1 of 1 START table model quarantine.covid_epidemiology....................................................... [RUN]15:37:55 | 1 of 1 OK created table model quarantine.covid_epidemiology.................................................. [SELECT 17911 in 0.30s]15:37:55 |15:37:55 | Finished running 1 table model in 0.51s.​Completed successfully​Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
For the moment, it is not possible to make changes to the generated DBT models and commit them back so Airbyte can use it in its next sync of a certain source/destination combination. But, we'll work on such integration in the near future!
Our DBT models are currently composed of only one model per final table to replicate, we would probably refine this and grow the transformation steps further, adding UI elements to configure what should be included or not in the normalization step.
However, if you have ideas on what to do with DBT from this point on, we would be glad to hear your feedbacks and ideas. Thank you!