Quickstart for GX Cloud and Airflow
In this quickstart, you'll learn how to use GX Cloud with Apache Airflow. You'll create a simple DAG that runs a Checkpoint that you have already set up in GX Cloud, and then trigger it through a local installation of an Airflow server.
Apache Airflow is an orchestration tool that allows you to schedule and monitor your data pipelines. For more information about Apache Airflow, see the Apache Airflow documentation.
Prerequisites
You have a GX Cloud Beta account.
You have the Astro CLI installed.
You have connected GX Cloud to a Data Asset on a Data Source.
You have created an Expectation Suite and added Expectations.
You have added a Checkpoint to your Expectation.
Create a local Airflow project and set dependencies
Open a terminal, navigate to the directory where you want to create your Airflow project, and then run the following code:
Terminal inputmkdir gx-cloud-airflow && cd gx-cloud-airflow
astro dev initAfter running the code, a new directory is created, you're taken to that directory, and a new Airflow project is initialized.
Browse to the directory where you created your Airflow project, open the
requirements.txt
file, and then add the following text as a new line:airflow-provider-great-expectations==0.2.7
This text adds the GX Airflow Provider to the Airflow project.
Save your changes and close the
requirements.txt
file.Open the
packages.txt
file and add the following text as a new line:libgeos-c1v5
This text adds the
libgeos-c1v5
library to the Airflow project.Save your changes and close the
packages.txt
file.
Create a DAG file for your GX Cloud Checkpoint
Open a terminal, browse to the
dags
folder of your Airflow project, and then run the following code to create a new DAG namedgx_dag.py
:Terminal inputtouch gx_dag.py
Open the
gx_dag.py
DAG file and add the following code:import os
import great_expectations as gx
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def run_gx_airflow():
os.environ["GX_CLOUD_ACCESS_TOKEN"] = "<YOUR_ACCESS_TOKEN>"
os.environ["GX_CLOUD_ORGANIZATION_ID"] = "<YOUR_CLOUD_ORGANIZATION_ID>"
# Replace <YOUR_ACCESS_TOKEN> and <YOUR_CLOUD_ORGANIZATION_ID> with your credentials.
# To get your user access token and organization ID, see:
# (https://docs.greatexpectations.io/docs/cloud/set_up_gx_cloud#get-your-user-access-token-and-organization-id).
context = gx.get_context()
checkpoint_name = '<YOUR_CHECKPOINT_NAME>'
# Replace <YOUR_CHECKPOINT_NAME> with the name of the Checkpoint you'd like to run.
checkpoint = context.get_checkpoint(name = checkpoint_name)
checkpoint.run()
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 8, 9),
}
gx_dag = DAG(
'gx_dag',
default_args=default_args,
schedule_interval= '0 0 * * *', # This is set to run daily at midnight. Adjust as needed.
catchup=False
)
run_gx_task = PythonOperator(
task_id='gx_airflow',
python_callable=run_gx_airflow,
dag=gx_dag,
)
run_gx_taskSave your changes and close the
gx_dag.py
DAG file.
Run the DAG
Run the following command in the root directory of your Airflow project to start the server:
Terminal inputastro dev start
Sign in to Airflow. The default username and password are
admin
.In the Actions column, click Trigger DAG for
gx_airflow
and confirm your DAG runs as expected.