Building a Python Destination

Summary

This article provides a checklist for how to create a Python destination. Each step in the checklist has a link to a more detailed explanation below.

Requirements

Docker and Python with the versions listed in the tech stack section. You can use any Python version between 3.7 and 3.9, but this tutorial was tested with 3.7.

Checklist

Creating a destination

  • Step 1: Create the destination using the template generator

  • Step 2: Setup the virtual environment

  • Step 3: Implement spec to define the configuration required to run the connector

  • Step 4: Implement check to provide a way to validate configurations provided to the connector

  • Step 5: Implement write to write data to the destination

  • Step 6: Set up Acceptance Tests

  • Step 7: Write unit tests or integration tests

  • Step 8: Update the docs (in docs/integrations/destinations/<destination-name>.md)

If you need help with any step of the process, feel free to submit a PR with your progress and any questions you have, or ask us on slack. Also reference the KvDB python destination implementation if you want to see an example of a working destination.

Explaining Each Step

Step 1: Create the destination using the template

Airbyte provides a code generator which bootstraps the scaffolding for our connector.

$ cd airbyte-integrations/connector-templates/generator # assumes you are starting from the root of the Airbyte project.
$ ./generate.sh

Select the Python Destination template and then input the name of your connector. We'll refer to the destination as destination-<name> in this tutorial, but you should replace <name> with the actual name you used for your connector e.g: redis or google-sheets.

Step 2: Setup the dev environment

Setup your Python virtual environment:

cd airbyte-integrations/connectors/destination-<name>
# Create a virtual environment in the .venv directory
python -m venv .venv
# activate the virtualenv
source .venv/bin/activate
# Install with the "tests" extra which provides test requirements
pip install '.[tests]'

This step sets up the initial python environment. All subsequent python or pip commands assume you have activated your virtual environment.

If you want your IDE to auto complete and resolve dependencies properly, point it at the python binary in airbyte-integrations/connectors/destination-<name>/.venv/bin/python. Also anytime you change the dependencies in the setup.py make sure to re-run the build command. The build system will handle installing all dependencies in the setup.py into the virtual environment.

Let's quickly get a few housekeeping items out of the way.

Dependencies

Python dependencies for your destination should be declared in airbyte-integrations/connectors/destination-<name>/setup.py in the install_requires field. You might notice that a couple of Airbyte dependencies are already declared there (mainly the Airbyte CDK and potentially some testing libraries or helpers). Keep those as they will be useful during development.

You may notice that there is a requirements.txt in your destination's directory as well. Do not touch this. It is autogenerated and used to install local Airbyte dependencies which are not published to PyPI. All your dependencies should be declared in setup.py.

Iterating on your implementation

Pretty much all it takes to create a destination is to implement the Destination interface. Let's briefly recap the three methods implemented by a Destination:

  1. spec: declares the user-provided credentials or configuration needed to run the connector

  2. check: tests if the user-provided configuration can be used to connect to the underlying data destination, and with the correct write permissions

  3. write: writes data to the underlying destination by reading a configuration, a stream of records from stdin, and a configured catalog describing the schema of the data and how it should be written to the destination

The destination interface is described in detail in the Airbyte Specification reference.

The generated files fill in a lot of information for you and have docstrings describing what you need to do to implement each method. The next few steps are just implementing that interface.

All logging should be done through the self.logger object available in the Destination class. Otherwise, logs will not be shown properly in the Airbyte UI.

Everyone develops differently but here are 3 ways that we recommend iterating on a destination. Consider using whichever one matches your style.

Run the destination using Python

You'll notice in your destination's directory that there is a python file called main.py. This file is the entrypoint for the connector:

# from airbyte-integrations/connectors/destination-<name>
python main.py spec
python main.py check --config secrets/config.json
# messages.jsonl should contain AirbyteMessages (described in the Airbyte spec)
cat messages.jsonl | python main.py write --config secrets/config.json --catalog sample_files/configured_catalog.json

The nice thing about this approach is that you can iterate completely within in python. The downside is that you are not quite running your destination as it will actually be run by Airbyte. Specifically you're not running it from within the docker container that will house it.

Run using Docker If you want to run your destination exactly as it will be run by Airbyte (i.e. within a docker container), you can use the following commands from the connector module directory (airbyte-integrations/connectors/destination-<name>):

# First build the container
docker build . -t airbyte/destination-<name>:dev
# Then use the following commands to run it
docker run --rm airbyte/destination-<name>:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-<name>:dev check --config /secrets/config.json
cat messages.jsonl | docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/sample_files:/sample_files airbyte/destination-<name>:dev read --config /secrets/config.json --catalog /sample_files/configured_catalog.json

Note: Each time you make a change to your implementation you need to re-build the connector image. docker build . -t airbyte/destination-<name>:dev. This ensures the new python code is added into the docker container.

The nice thing about this approach is that you are running your source exactly as it will be run by Airbyte. The tradeoff is that iteration is slightly slower, because you need to re-build the connector between each change.

TDD using standard tests

note: these tests aren't yet available for Python connectors but will be very soon. Until then you should use custom unit or integration tests for TDD.

Airbyte provides a standard test suite that is run against every destination. The objective of these tests is to provide some "free" tests that can sanity check that the basic functionality of the destination works. One approach to developing your connector is to simply run the tests between each change and use the feedback from them to guide your development.

If you want to try out this approach, check out Step 6 which describes what you need to do to set up the standard tests for your destination.

The nice thing about this approach is that you are running your destination exactly as Airbyte will run it in the CI. The downside is that the tests do not run very quickly.

Step 3: Implement spec

Each destination contains a specification written in JsonSchema that describes the inputs it requires and accepts. Defining the specification is a good place to start development. To do this, find the spec file generated in airbyte-integrations/connectors/destination-<name>/src/main/resources/spec.json. Edit it and you should be done with this step. The generated connector will take care of reading this file and converting it to the correct output.

Some notes about fields in the output spec:

  • supportsNormalization is a boolean which indicates if this connector supports basic normalization via DBT. If true, supportsDBT must also be true.

  • supportsDBT is a boolean which indicates whether this destination is compatible with DBT. If set to true, the user can define custom DBT transformations that run on this destination after each successful sync. This must be true if supportsNormalization is set to true.

  • supported_destination_sync_modes: An array of strings declaring the sync modes supported by this connector. The available options are:

    • overwrite: The connector can be configured to wipe any existing data in a stream before writing new data

    • append: The connector can be configured to append new data to existing data

    • append_dedupe: The connector can be configured to deduplicate (i.e: UPSERT) data in the destination based on the new data and primary keys

  • supportsIncremental: Whether the connector supports any append sync mode. Must be set to true if append or append_dedupe are included in the supported_destination_sync_modes.

Some helpful resources:

Once you've edited the file, see the spec operation in action:

python main.py spec

Step 4: Implement check

The check operation accepts a JSON object conforming to the spec.json. In other words if the spec.json said that the destination requires a username and password, the config object might be { "username": "airbyte", "password": "password123" }. It returns a json object that reports, given the credentials in the config, whether we were able to connect to the destination.

While developing, we recommend storing any credentials in secrets/config.json. Any secrets directory in the Airbyte repo is gitignored by default.

Implement the check method in the generated file destination_<name>/destination.py. Here's an example implementation from the KvDB destination.

Verify that the method is working by placing your config in secrets/config.json then running:

python main.py check --config secrets/config.json

Step 5: Implement write

The write operation is the main workhorse of a destination connector: it reads input data from the source and writes it to the underlying destination. It takes as input the config file used to run the connector as well as the configured catalog: the file used to describe the schema of the incoming data and how it should be written to the destination. Its "output" is two things:

  1. Data written to the underlying destination

  2. AirbyteMessages of type AirbyteStateMessage, written to stdout to indicate which records have been written so far during a sync. It's important to output these messages when possible in order to avoid re-extracting messages from the source. See the write operation protocol reference for more information.

To implement the write Airbyte operation, implement the write method in your generated destination.py file. Here is an example implementation from the KvDB destination connector.

Step 6: Set up Acceptance Tests

Coming soon. These tests are not yet available for Python destinations but will be very soon. For now please skip this step and rely on copious amounts of integration and unit testing.

Step 7: Write unit tests and/or integration tests

The Acceptance Tests are meant to cover the basic functionality of a destination. Think of it as the bare minimum required for us to add a destination to Airbyte. You should probably add some unit testing or custom integration testing in case you need to test additional functionality of your destination.

Add unit tests in unit_tests/ directory and integration tests in the integration_tests/ directory. Run them via

python -m pytest -s -vv integration_tests/

See the KvDB integration tests for an example of tests you can implement.

Step 8: Update the docs

Each connector has its own documentation page. By convention, that page should have the following path: in docs/integrations/destinations/<destination-name>.md. For the documentation to get packaged with the docs, make sure to add a link to it in docs/SUMMARY.md. You can pattern match doing that from existing connectors.

Wrapping up

Well done on making it this far! If you'd like your connector to ship with Airbyte by default, create a PR against the Airbyte repo and we'll work with you to get it across the finish line.