top of page

Orchestrating Snowflake queries in Airflow

  • Writer: Digital Hive
    Digital Hive
  • Jul 4
  • 4 min read
Discover how to orchestrate Snowflake SQL queries using Apache Airflow efficiently.

When building data pipelines with Apache Airflow and Snowflake, executing SQL queries is a core operation. Whether you're managing test data, orchestrating data transformations, or cleaning up unused artifacts, it's important to understand the options available for running queries efficiently and scalable. This blog explores three ways to execute SQL queries on Snowflake from Airflow:


  1. Using the SnowflakeOperator

  2. Using the SnowflakeHook

  3. Using a Python function wrapped SnowflakeOperator.


Network policy update

If you're using managed Airflow (e.g., AWS MWAA), you often need to whitelist the egress IP of your Airflow environment in Snowflake’s network policy. Without this, Snowflake may block the connection. You can add the IP using:

1. ALTER NETWORK POLICY my_policy SET ALLOWED_IP_LIST=('your_airflow_egress_ip');

Creating a Snowflake connection in Airflow

The connection to Snowflake in Airflow can be created and maintained using the Airflow UI connections. Follow the steps below to create the connection.


  1. In the Airflow UI, navigate to Admin > Connections

  2. Click + Add

  3. Fill out:

    • Conn Id: SNOWFLAKE_CONN_ID

    • Conn Type: Snowflake

    • Provide credentials and connection details: account, user*, password, role, warehouse, database, schema.


*Preferably you would have a dedicated Service Account User with a private key. There are dedicated fields for this available.


Define Airflow Variables

In many cases, you may want to maintain reusable variables that are specific to a DAG or environment. This can also be managed directly from the Airflow UI.


  1. In the Airflow UI, navigate to Admin > Variables

  2. Click + Add

  3. Fill out:

    • Key: MY_DAG_VARIABLE_ID

    • Val: Json dictionary with the variables that you want to pass to the DAG. This should contain the name of the connection made in the previouse step (SNOWFLAKE_CONN_ID).

    • Description:


You can access the connection variable from within the dag with the following code:

1. From airflow.models import Variable
2.  
3. snowflake_connection = Variable.get("DAG_VARIABLE_ID", deserialize_json=True)["SNOWFLAKE_CONN_ID"]

Approach 1: Static SQL with the SnowflakeOperator

The SnowflakeOperator offers a straightforward approach for executing SQL on Snowflake directly from Airflow. This approach is best for executing static queries as templated queries would need to be passed from another task that would generate the query.

1. from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
 2.  
 3. snowflake_direct_operator_task = SnowflakeOperator(
 4.     task_id="delete_test_data",
 5.     snowflake_conn_id=snowflake_connection,
 6.     sql="DELETE FROM dev_staging.orders WHERE is_test_data = TRUE;",
 7.     autocommit=True,
 8.     dag=dag,
 9. )

Pros

  • SQL is executed on Snowflake compute, not Airflow

  • Tasks are fully visible and managed in the Airflow UI

  • Retries, logging, and failures are handled automatically


Cons

  • The SQL is static templated query usage is not straight foreword.

  • Looping or parameterization is difficult without rewriting the DAG


Approach 2: SQL execution using SnowflakeHook

Using the SnowflakeHook to connect to Snowflake allows SQL query templating and full control over query execution from within a Python function. This approach involves manually opening a connection and cursor. The function will then be executed in a task that uses the PythonOperator.

1. from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
2. from airflow.operators.python_operator import PythonOperator
3.  
4. def execute_snowflake_cursor(**kwargs):
5.     hook = SnowflakeHook(snowflake_conn_id=snowflake_connection)
6.     conn = hook.get_conn()
7.     cursor = conn.cursor()
8.     try:
9.         cursor.execute("DELETE FROM dev_staging.orders WHERE is_test_data = TRUE;")
10.         results = cursor.fetchall()
11.         print("Results:", results)
12.    finally:
13.         cursor.close()
14.         conn.close()
15.  

16. # Define task using the PythonOperator
17. execute_snowflake_cursor_task = PythonOperator(
18.     task_id="execute_snowflake_indirect_operator_task",
19.     python_callable=execute_snowflake_operator,
20.     op_kwargs={'snowflake_connection': snowflake_connection},  # Pass the connection here
21.     provide_context=True,
22.     dag=dag
23. )

Pros

  • Full flexibility in Python

  • Ideal for logic-heavy tasks, metadata inspection, or dynamic SQL

  • Can integrate complex control flow


Cons

  • SQL is executed inside the Airflow worker, increasing load and runtime

  • Less scalable for large operations

  • You manage error handling and connection lifecycle manually


This method is suitable for smaller, metadata-driven, or low-latency tasks where query results need to be processed immediately in Python.


Approach 3: Dynamic SnowflakeOperator Inside a Python function

The third approach combines the flexibility of Python with the performance of the SnowflakeOperator. You dynamically create and execute SnowflakeOperator tasks from inside a PythonOperator. This enables looping over dynamically generated SQL while preserving execution on Snowflake.

 1. from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
 2.  
 3. def execute_dynamic_snowflake_ops(**kwargs):
 4.     table_names = ["orders_2023", "orders_2024"]  # Dynamically calculated in real use
 5.  
 6.     for table in table_names:
 7.         sql_stmt = f"DELETE FROM {table} WHERE is_test_data = TRUE;"
 8.         dynamic_op = SnowflakeOperator(
 9.             task_id=f"cleanup_{table}",
10.             snowflake_conn_id=snowflake_connection,
11.             sql=sql_stmt,
12.             autocommit=True,
13.             dag=kwargs["dag"],
14.         )
15.         dynamic_op.execute(context=kwargs)
16.  
17. # define task using the PythonOperator
18. execute_snowflake_cursor_task = PythonOperator(
19.    task_id="execute_snowflake_indirect_operator_task",
20.    python_callable=execute_snowflake_operator,
21.    op_kwargs={'snowflake_connection': snowflake_connection},  # Pass the connection here
22.    provide_context=True,
23.    dag=dag

Pros

  • Flexibility to calculate query content and targets dynamically

  • Keeps execution on Snowflake, avoiding Airflow compute usage

  • Useful for batch-style or templated SQL operations


Cons

  • All subqueries run within one Python task, reducing visibility in the Airflow UI

  • Failures may be harder to trace to specific queries

  • Loses automatic retry behavior for individual queries

  • More complex to manage and test


This hybrid method shines when you need to loop over dynamic inputs (dates, teams, business units) but still want Snowflake to do the work, not Airflow.


Summary Comparison

Feature

SnowflakeOperator

SnowflakeHook

SnowflakeOperator in Python Function

Compute Location

Snowflake

Airflow Worker

Snowflake

Flexibility

Low

High

High

Airflow Resource Load

Low

High

Low

Airflow UI Task Visibility

High

Medium

Medium

Dynamic Query Generation

Low

High

High

Best Use Case

Static queries

Metadata-based logic

Dynamic batch SQL execution

Conclusion

Use SnowflakeOperator when your SQL is static and doesn't depend on runtime variables. It’s Snowflake’s recommended method, as it keeps compute on Snowflake and lets Airflow orchestration.


Use SnowflakeHook with a cursor only when you need Python-level logic, such as looping over metadata or handling small result sets. Snowflake advises against this for heavy workloads, since execution happens on the Airflow worker.


Use SnowflakeOperator inside a Python function when you want to generate queries dynamically at runtime, for example when looping over tables or dates. This keeps compute on Snowflake, while allowing flexible control through Python.


From Snowflake's perspective, both the first and third approaches are aligned with best practices. Let Snowflake handle compute, and let Airflow orchestrate. This avoids overloading your scheduler and ensures scalable performance.


References

Comments


bottom of page