Skip to content

Latest commit

 

History

History
150 lines (104 loc) · 5.48 KB

apache-airflow.md

File metadata and controls

150 lines (104 loc) · 5.48 KB
title excerpt products keywords
Integrate Apache Airflow with Timescale Cloud
Apache Airflow is a platform to programmatically author, schedule, and monitor workflows. Integrate Apache Airflow with Timescale Cloud and create a data pipeline
cloud
mst
self_hosted
connect
integrate
apache
airflow

import IntegrationPrereqs from "versionContent/_partials/_integration-prereqs.mdx";

Integrate Apache Airflow with $CLOUD_LONG

Apache Airflow® is a platform created by the community to programmatically author, schedule, and monitor workflows.

A DAG (Directed Acyclic Graph) is the core concept of Airflow, collecting Tasks together, organized with dependencies and relationships to say how they should run. You declare a DAG in a Python file in the $AIRFLOW_HOME/dags folder of your Airflow instance.

This page shows you how to use a Python connector in a DAG to integrate Apache Airflow with a $SERVICE_LONG.

Prerequisites

This example DAG uses the company table you create in Optimize time-series data in hypertables

Install python connectivity libraries

To install the Python libraries required to connect to $CLOUD_LONG:

  1. Enable PostgreSQL connections between Airflow and $CLOUD_LONG

    pip install psycopg2-binary
  2. Enable PostgreSQL connection types in the Airflow UI

    pip install apache-airflow-providers-postgres

Create a connection between Airflow and your $SERVICE_LONG

In your Airflow instance, securely connect to your $SERVICE_LONG:

  1. Run Airflow

    On your development machine, run the following command:

    airflow standalone

    The username and password for Airflow UI are displayed in the standalone | Login with username line in the output.

  2. Add a connection from Airflow to your $SERVICE_LONG

    1. In your browser, navigate to localhost:8080, then select Admin > Connections.
    2. Click + (Add a new record), then use your connection info to fill in the form. The Connection Type is Postgres.

Exchange data between Airflow and your $SERVICE_LONG

To exchange data between Airflow and your $SERVICE_LONG:

  1. Create and execute a DAG

    To insert data in your $SERVICE_LONG from Airflow:

    1. In $AIRFLOW_HOME/dags/timescale_dag.py, add the following code:

      from airflow import DAG
      from airflow.operators.python_operator import PythonOperator
      from airflow.hooks.postgres_hook import PostgresHook
      from datetime import datetime
      
      def insert_data_to_timescale():
          hook = PostgresHook(postgres_conn_id='the ID of the connenction you created')
          conn = hook.get_conn()
          cursor = conn.cursor()
          """
            This could be any query. This example inserts data into the table
            you create in:
      
            https://docs.timescale.com/getting-started/latest/try-key-features-timescale-products/#optimize-time-series-data-in-hypertables
           """            
          cursor.execute("INSERT INTO crypto_assets (symbol, name) VALUES (%s, %s)",
           ('NEW/Asset','New Asset Name'))
          conn.commit()
          cursor.close()
          conn.close()
      
      default_args = {
          'owner': 'airflow',
          'start_date': datetime(2023, 1, 1),
          'retries': 1,
      }
      
      dag = DAG('timescale_dag', default_args=default_args, schedule_interval='@daily')
      
      insert_task = PythonOperator(
          task_id='insert_data',
          python_callable=insert_data_to_timescale,
          dag=dag,
      )

      This DAG uses the company table created in Create regular PostgreSQL tables for relational data.

    2. In your browser, refresh the Airflow UI.

    3. In Search DAGS, type timescale_dag and press ENTER.

    4. Press the play icon and trigger the DAG: daily eth volume of assets

  2. Verify that the data appears in $CLOUD_LONG

    1. In Timescale Console, navigate to your service and click SQL editor.

    2. Run a query to view your data. For example: SELECT symbol, name FROM company;.

      You see the new rows inserted in the table.

You have successfully integrated Apache Airflow with $CLOUD_LONG and created a data pipeline.