Skip to main content
Version: 0.18.0

Quickstart for GX Cloud and Airflow

In this quickstart, you'll learn how to use GX Cloud with Apache Airflow. You'll create a simple DAG that runs a Checkpoint that you have already set up in GX Cloud, and then trigger it through a local installation of an Airflow server.

Apache Airflow is an orchestration tool that allows you to schedule and monitor your data pipelines. For more information about Apache Airflow, see the Apache Airflow documentation.

Prerequisites

Create a local Airflow project and set dependencies

  1. Open a terminal, navigate to the directory where you want to create your Airflow project, and then run the following code:

    Terminal input
    mkdir gx-cloud-airflow && cd gx-cloud-airflow
    astro dev init

    After running the code, a new directory is created, you're taken to that directory, and a new Airflow project is initialized.

  2. Browse to the directory where you created your Airflow project, open the requirements.txt file, and then add the following text as a new line:

    airflow-provider-great-expectations==0.2.7

    This text adds the GX Airflow Provider to the Airflow project.

  3. Save your changes and close the requirements.txt file.

  4. Open the packages.txt file and add the following text as a new line:

    libgeos-c1v5

    This text adds the libgeos-c1v5 library to the Airflow project.

  5. Save your changes and close the packages.txt file.

Create a DAG file for your GX Cloud Checkpoint

  1. Open a terminal, browse to the dags folder of your Airflow project, and then run the following code to create a new DAG named gx_dag.py:

    Terminal input
    touch gx_dag.py
  2. Open the gx_dag.py DAG file and add the following code:

    import os
    import great_expectations as gx
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from datetime import datetime

    def run_gx_airflow():
    os.environ["GX_CLOUD_ACCESS_TOKEN"] = "<YOUR_ACCESS_TOKEN>"
    os.environ["GX_CLOUD_ORGANIZATION_ID"] = "<YOUR_CLOUD_ORGANIZATION_ID>"

    # Replace <YOUR_ACCESS_TOKEN> and <YOUR_CLOUD_ORGANIZATION_ID> with your credentials.
    # To get your user access token and organization ID, see:
    # (https://docs.greatexpectations.io/docs/cloud/set_up_gx_cloud#get-your-user-access-token-and-organization-id).

    context = gx.get_context()
    checkpoint_name = '<YOUR_CHECKPOINT_NAME>'
    # Replace <YOUR_CHECKPOINT_NAME> with the name of the Checkpoint you'd like to run.
    checkpoint = context.get_checkpoint(name = checkpoint_name)
    checkpoint.run()

    default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 8, 9),
    }

    gx_dag = DAG(
    'gx_dag',
    default_args=default_args,
    schedule_interval= '0 0 * * *', # This is set to run daily at midnight. Adjust as needed.
    catchup=False
    )

    run_gx_task = PythonOperator(
    task_id='gx_airflow',
    python_callable=run_gx_airflow,
    dag=gx_dag,
    )

    run_gx_task
  3. Save your changes and close the gx_dag.py DAG file.

Run the DAG

  1. Run the following command in the root directory of your Airflow project to start the server:

    Terminal input
    astro dev start
  2. Sign in to Airflow. The default username and password are admin.

  3. In the Actions column, click Trigger DAG for gx_airflow and confirm your DAG runs as expected.