Skip to content

Commit

Permalink
[dagster-airflow] [docs] migration guide updates/considerations (dags…
Browse files Browse the repository at this point in the history
…ter-io#12198)

Co-authored-by: Erin Cochran <[email protected]>
  • Loading branch information
Ramshackle-Jamathon and erinkcochran87 authored Feb 9, 2023
1 parent 7150223 commit 33709f6
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 56 deletions.
189 changes: 147 additions & 42 deletions docs/content/integrations/airflow/migrating-to-dagster.mdx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: "Migrating Airflow to Dagster"
description: guide for doing a lift and shift migration of airflow to dagster
title: "Migrating Airflow to Dagster | Dagster Docs"
description: "Learn how do a lift-and-shift migration of airflow to Dagster."
---

# Migrating Airflow to Dagster
Expand All @@ -13,99 +13,204 @@ description: guide for doing a lift and shift migration of airflow to dagster
!
</Note>

Dagster can convert your Airflow DAGs into Dagster jobs, enabling a lift-and-shift migration from Airflow to Dagster without any rewriting.
Dagster can convert your Airflow DAGs into Dagster jobs, enabling a lift-and-shift migration from Airflow without any rewriting.

This guide will walk you through the steps of doing this migration.
This guide will walk you through the steps of performing this migration.

---

## Prerequisites

To get started, you will need to install the `dagster` and `dagster-airflow` Python packages:
To complete the migration, you'll need:

- **To perform some upfront analysis**. Refer to the [next section](#before-you-begin) for more detail.

- **To know the following about your Airflow setup**:

- What operator types are used in the DAGs you're migrating
- What Airflow connections your DAGs depend on
- What Airflow variables you've set
- What Airflow secrets backend you use
- Where the permissions that your DAGs depend on are defined

- **If using Dagster Cloud**, an existing [Dagster Cloud](/dagster-cloud) account. While your migrated Airflow DAGs will work with Dagster Open Source, this guide includes setup specific to Dagster Cloud.

**If you just signed up for a Cloud account**, follow the steps in the [Dagster Cloud Getting Started guide](/dagster-cloud/getting-started) before proceeding.

### Before you begin

You may be coming to this document/library in a skeptical frame of mind, having previously been burned by projects that claim to have 100% foolproof, automated migration tools. We want to assure you that we are _not_ making that claim.

The `dagster-airflow` migration library should be viewed as a powerful _accelerant_ of migration, rather than guaranteeing completely _automated_ migration. The amount of work required is proportional to the complexity of your installation. Smaller implementations of less complexity can be trivial to migrate, making it appear virtually automatic; larger, more complicated, or customized implementations require more investment.

For larger installations, teams that already adopt devops practices and/or have standardized usage of Airflow will have a smoother transition.

Some concrete examples:

- If you rely on the usage of the Airflow UI to set production connections or variables, this will require a change of workflow as you will no longer have the Airflow UI at the end of this migration. If instead you rely on code-as-infrastructure patterns to set connections or variables, migration is more straightforward.
- If you have standardized on a small set or even a single operator type (e.g. the `K8sPodOperator`), migration will be easier. If you use a large number of operator types with a wide range of infrastructure requirements, migration will be more work.
- If you dynamically generate your Airflow DAGs from a higher-level API or DSL (e.g. yaml), the migration will be more straightforward than all your stakeholders directly creating Airflow DAGs.

Even in the case that requires some infrastructure investment, `dagster-airflow` dramatically eases migration, typically by orders of magnitude. The cost can be borne by a single or small set of infrastructure-oriented engineers, which dramatically reduces coordination costs. You do not have to move all of your stakeholders over to new APIs simultaneously. In our experience, practitioners welcome the change, because of the immediate improvement in tooling, stability, and development speed.

---

## Limitations

Before kicking off a migration, note that there are some limitations to converting Airflow DAGs into Dagster jobs, assets, and schedules. The following features are currently unsupported:

- `retry-from-failure` in Dagster
- Using `prev_execution_date` in Airflow-templated DAGs
- `SubDAGOperators`
- Airflow Datasets
- Airflow Pools

Most of the current limitations are [due to a locally-scoped, ephemeral Airflow database being created for each run](https://github.com/dagster-io/dagster/issues/12209). If your DAGs implicitly rely on Airflow database state across `DagRuns`, then the current migration tooling won't work for you.

Note that we are working on extending support for these features. If interested in them, let us know in the `#dagster-airflow` channel of the Dagster Slack.

---

## Step 1: Prepare your project for a new Dagster Python module

While there are many ways to structure an Airflow git repository, this guide assumes you're using a repository structure that contains a single `./dags` DagBag directory that contains all your DAGs.

In the root of your repository, create a `dagster_migration.py` file.

---

## Step 2: Install Dagster Python packages alongside Airflow

<Note>
This step may require working through a number of version pins. Specifically,
installing Airflow 1.x.x versions may be challenging due to (usually) outdated
constraint files.
<br />
<br />
Don't get discouraged if you run into problems! Reach out to the Dagster Slack
for help.
</Note>

In this step, you'll install the `dagster`, `dagster-airflow`, and `dagit` Python packages alongside Airflow. **We strongly recommend using a virtualenv.**

To install everything, run:

```bash
pip install dagster dagster-airflow
pip install dagster dagster-airflow dagit
```

We also suggest verifying that you're installing the correct versions of your Airflow dependencies. Verifying the dependency versions will likely save you from debugging tricky errors later.

To check dependency versions, open your Airflow provider's UI and locate the version numbers. When finished, continue to the next step.

---

## Step 1: Create a new repository using make_dagster_repo_from_airflow_dags_path
## Step 3: Convert DAGS into Dagster definitions

In this step, you'll start writing Python!

The first step to migrating is to define a Dagster repository. To do this, we'll use <PyObject module="dagster_airflow" object="make_dagster_repo_from_airflow_dags_path" /> and pass it the file path of our Airflow Dag Bag. Dagster will then load the DagBag and convert all DAGs into Dagster jobs.
In the `dagster_migration.py` file you created in [Step 1](#step-1-prepare-your-project-for-a-new-dagster-python-module), use <PyObject module="dagster_airflow" object="make_dagster_definitions_from_airflow_dags_path" /> and pass in the file path of your Airflow DagBag. Dagster will load the DagBag and convert all DAGs into Dagster jobs and schedules.

```python file=/integrations/airflow/migrate_repo.py
import os

from dagster_airflow import (
make_dagster_repo_from_airflow_dags_path,
make_dagster_definitions_from_airflow_dags_path,
)

migrated_airflow_repo = make_dagster_repo_from_airflow_dags_path(
os.path.join(os.environ["AIRFLOW_HOME"], "dags"),
"migrated_airflow_repo",
migrated_airflow_definitions = make_dagster_definitions_from_airflow_dags_path(
os.path.abspath("./dags/"),
)
```

Under the hood, Dagster is running the exact operator code as you were in Airflow. You will be able to view your normal Airflow `stdout`/`stderr` logs as compute logs in Dagit.
---

## Step 4: Verify the DAGs are loading

In this step, you'll spin up Dagit, Dagster's web-based UI, and verify that your migrated DAGs are loading. **Note**: Unless the migrated DAGs depend on no Airflow configuration state or permissions, it's unlikely they'll execute correctly at this point. That's okay - we'll fix it in a bit. Starting Dagit is the first step in our development loop, allowing you to make a local change, view it in Dagit, and debug any errors.

1. Run the following to start Dagit:

```bash
dagster dev -f ./migrate_repo.py
```

2. In your browser, navigate to <http://localhost:3001>. You should see a list of Dagster jobs that correspond to the DAGs in your Airflow DagBag.

3. Run one of the simpler jobs, ideally one where you're familiar with the business logic. Note that it's likely to fail due to a configuration or permissions issue.

4. Using logs to identify and making configuration changes to fix the cause of the failure.

Repeat these steps as needed until the jobs run successfully.

### Containerized operator considerations

There are a variety of Airflow Operator types that are used to launch compute in various external execution environments, for example Kubernetes or Amazon ECS. When getting things working locally we'd recommend trying to execute those containers locally unless it's either unrealistic or impossible to emulate the cloud environment. For example if you use the K8sPodOperator, it likely means that you will need to have a local Kubernetes cluster running, and in that case we recommend docker's built-in Kubernetes environment. You also need to be able to pull down the container images that will be needed for execution to your local machine.

If local execution is impossible, we recommend using Branch Deployments in Dagster Cloud, which is a well-supported workflow for cloud-native development.

---

## Step 2: Define your Airflow connections
## Step 5: Transfer your Airflow configuration

By default, each job run of your migrated DAGs creates an ephemeral airflow metadatabase scoped to each job run. This means any [Airflow connections](https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html) that your DAG depends on will need to be created. To do this, you can provide a `connections` parameter to <PyObject module="dagster_airflow" object="make_dagster_repo_from_airflow_dags_path" />:
To port your Airflow configuration, we recommend using [environment variables](/guides/dagster/using-environment-variables-and-secrets) as much as possible. Specifically, we recommend using a `.env` file containing Airflow variables and/or a secrets backend configuration in the root of your project.

You'll also need to configure the [Airflow connections](https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html) that your DAGs depend on. To accomplish this, use the `connections` parameter instead of URI-encoded environment variables.

```python file=/integrations/airflow/migrate_repo_connections.py
import os

from airflow.models import Connection
from dagster_airflow import (
make_dagster_repo_from_airflow_dags_path,
)
from dagster_airflow import make_dagster_definitions_from_airflow_dags_path

migrated_airflow_repo = make_dagster_repo_from_airflow_dags_path(
os.path.join(os.environ["AIRFLOW_HOME"], "dags"),
"migrated_airflow_repo",
migrated_airflow_definitions = make_dagster_definitions_from_airflow_dags_path(
os.path.abspath("./dags/"),
connections=[
Connection(conn_id="http_default", conn_type="uri", host="https://google.com")
],
)
```

If you're running on Dagster Cloud and any of your connection fields are sensitive, you can securely pass them in using [environment variables](/dagster-cloud/developing-testing/environment-variables-and-secrets).
Iterate as needed until all configuration is correctly ported to your local environment.

---

## Step 3: Run your migrated DAGs locally

Once you've defined the repository, you can launch Dagit, Dagster's web UI. This allows you to view your newly migrated jobs and launch runs.
## Step 6: Move to production

```bash
dagster dev -f <your migrated dags>.py
```
<Note>
This step is applicable to Dagster Cloud. If deploying to your infrastructure,
refer to the <a href="/deployment">Deployment guides</a> for more info.
<br />
<br />
Additionally, until your Airflow DAGs execute successfully in your local environment,
we recommend waiting to move to production.
</Note>

If your local shell has the correct permissions, you can also test out running the migrated DAGs end-to-end.
In this step, you'll set up your project for use with Dagster Cloud.

---
<!-- Once you have your dagster cloud organization setup you can either choose to adapt your existing airflow repository to use the Dagster Cloud CI/CD deployment or copy over your code to the quickstart repository you made as part of the Dagster Cloud onboarding. -->

## Step 4: Setup your permissions
1. Complete the steps in the [Dagster Cloud Getting Started guide](/dagster-cloud/getting-started), if you haven't already. Proceed to the next step when your account is set up and you have the `dagster-cloud` CLI installed.

Your Airflow instance likely had specific IAM or Kubernetes permissions that allowed it to successfully run your Airflow DAGs. In order to run the migrated Dagster jobs, you'll need to duplicate these permissions for Dagster.
2. In the root of your project, create or modify the `dagster_cloud.yaml` file with the following code:

- **Using [Airflow connections](https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html) or [environment variables](/dagster-cloud/developing-testing/environment-variables-and-secrets)** are the easiest methods for defining permissions.
```yaml
locations:
- location_name: dagster_migration
code_source:
python_file: dagster_migration.py
```

- **If your Airflow DAGs used [`KubernetesPodOperators`](https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html)**, it's possible that you used the `in_cluster` config or loading a `kube_config` file. When migrating, we recommend switching to [using connections with a `kube_config` JSON blob](https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/connections/kubernetes.html) to make things easier.
3. Push your code and let Dagster Cloud's CI/CD run out a deployment of your migrated dags to cloud.

---

## Limitations
## Step 7: Migrate permissions to Dagster

There are a few limitations to converting Airflow DAGs into Dagster Jobs, Assets, and Schedules. The following features are currently unsupported:
Your Airflow instance likely had specific IAM or Kubernetes permissions that allowed it to successfully run your Airflow DAGs. To run the migrated Dagster jobs, you'll need to duplicate these permissions for Dagster.

- `retry-from-failure` in Dagster
- Using `prev_execution_date` in Airflow-templated DAGs
- `SubDAGOperators`
- Airflow Datasets
- **We recommend using [Airflow connections](https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html) or [environment variables](/dagster-cloud/developing-testing/environment-variables-and-secrets)** to define permissions whenever possible.

Most of the current limitations are due to a locally-scoped, ephemeral Airflow database being created for each run. If your DAGs implicitly rely on Airflow database state across `DagRuns`, then the current migration tooling won't work for you.
- **If you're unable to use Airflow connections or environment variables,** you can attach permissions directly to the infrastructure where you're deploying Dagster.

Note that we are working on extending support for these features. If interested in them, let us know in the #dagster-airflow channel of the Dagster Slack.
- **If your Airflow DAGs used [`KubernetesPodOperators`](https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html)**, it's possible that you loaded a `kube_config` file or used the `in_cluster` config. When migrating, we recommend switching to [using connections with a `kube_config` JSON blob](https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/connections/kubernetes.html) to make things easier.
7 changes: 3 additions & 4 deletions docs/content/integrations/airflow/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ To load all Airflow DAGS in a file path into a [Dagster repository](/concepts/re
import os

from dagster_airflow import (
make_dagster_repo_from_airflow_dags_path,
make_dagster_definitions_from_airflow_dags_path,
)

migrated_airflow_repo = make_dagster_repo_from_airflow_dags_path(
os.path.join(os.environ["AIRFLOW_HOME"], "dags"),
"migrated_airflow_repo",
migrated_airflow_definitions = make_dagster_definitions_from_airflow_dags_path(
os.path.abspath("./dags/"),
)
```

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import os

from dagster_airflow import (
make_dagster_repo_from_airflow_dags_path, # type: ignore # (old airflow)
make_dagster_definitions_from_airflow_dags_path,
)

migrated_airflow_repo = make_dagster_repo_from_airflow_dags_path(
os.path.join(os.environ["AIRFLOW_HOME"], "dags"),
"migrated_airflow_repo",
migrated_airflow_definitions = make_dagster_definitions_from_airflow_dags_path(
os.path.abspath("./dags/"),
)
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import os

from airflow.models import Connection
from dagster_airflow import (
make_dagster_repo_from_airflow_dags_path, # type: ignore # (old airflow)
)
from dagster_airflow import make_dagster_definitions_from_airflow_dags_path

migrated_airflow_repo = make_dagster_repo_from_airflow_dags_path(
os.path.join(os.environ["AIRFLOW_HOME"], "dags"),
"migrated_airflow_repo",
migrated_airflow_definitions = make_dagster_definitions_from_airflow_dags_path(
os.path.abspath("./dags/"),
connections=[
Connection(conn_id="http_default", conn_type="uri", host="https://google.com")
],
Expand Down

0 comments on commit 33709f6

Please sign in to comment.