Orchestrating Snowflake queries in Airflow
- Digital Hive
- Jul 4
- 4 min read

When building data pipelines with Apache Airflow and Snowflake, executing SQL queries is a core operation. Whether you're managing test data, orchestrating data transformations, or cleaning up unused artifacts, it's important to understand the options available for running queries efficiently and scalable. This blog explores three ways to execute SQL queries on Snowflake from Airflow:
Using the SnowflakeOperator
Using the SnowflakeHook
Using a Python function wrapped SnowflakeOperator.
Network policy update
If you're using managed Airflow (e.g., AWS MWAA), you often need to whitelist the egress IP of your Airflow environment in Snowflake’s network policy. Without this, Snowflake may block the connection. You can add the IP using:
1. ALTER NETWORK POLICY my_policy SET ALLOWED_IP_LIST=('your_airflow_egress_ip');
Creating a Snowflake connection in Airflow
The connection to Snowflake in Airflow can be created and maintained using the Airflow UI connections. Follow the steps below to create the connection.
In the Airflow UI, navigate to Admin > Connections
Click + Add
Fill out:
Conn Id: SNOWFLAKE_CONN_ID
Conn Type: Snowflake
Provide credentials and connection details: account, user*, password, role, warehouse, database, schema.
*Preferably you would have a dedicated Service Account User with a private key. There are dedicated fields for this available.
Define Airflow Variables
In many cases, you may want to maintain reusable variables that are specific to a DAG or environment. This can also be managed directly from the Airflow UI.
In the Airflow UI, navigate to Admin > Variables
Click + Add
Fill out:
Key: MY_DAG_VARIABLE_ID
Val: Json dictionary with the variables that you want to pass to the DAG. This should contain the name of the connection made in the previouse step (SNOWFLAKE_CONN_ID).
Description:
You can access the connection variable from within the dag with the following code:
1. From airflow.models import Variable
2.
3. snowflake_connection = Variable.get("DAG_VARIABLE_ID", deserialize_json=True)["SNOWFLAKE_CONN_ID"]
Approach 1: Static SQL with the SnowflakeOperator
The SnowflakeOperator offers a straightforward approach for executing SQL on Snowflake directly from Airflow. This approach is best for executing static queries as templated queries would need to be passed from another task that would generate the query.
1. from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
2.
3. snowflake_direct_operator_task = SnowflakeOperator(
4. task_id="delete_test_data",
5. snowflake_conn_id=snowflake_connection,
6. sql="DELETE FROM dev_staging.orders WHERE is_test_data = TRUE;",
7. autocommit=True,
8. dag=dag,
9. )
Pros
SQL is executed on Snowflake compute, not Airflow
Tasks are fully visible and managed in the Airflow UI
Retries, logging, and failures are handled automatically
Cons
The SQL is static templated query usage is not straight foreword.
Looping or parameterization is difficult without rewriting the DAG
Approach 2: SQL execution using SnowflakeHook
Using the SnowflakeHook to connect to Snowflake allows SQL query templating and full control over query execution from within a Python function. This approach involves manually opening a connection and cursor. The function will then be executed in a task that uses the PythonOperator.
1. from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
2. from airflow.operators.python_operator import PythonOperator
3.
4. def execute_snowflake_cursor(**kwargs):
5. hook = SnowflakeHook(snowflake_conn_id=snowflake_connection)
6. conn = hook.get_conn()
7. cursor = conn.cursor()
8. try:
9. cursor.execute("DELETE FROM dev_staging.orders WHERE is_test_data = TRUE;")
10. results = cursor.fetchall()
11. print("Results:", results)
12. finally:
13. cursor.close()
14. conn.close()
15.
16. # Define task using the PythonOperator
17. execute_snowflake_cursor_task = PythonOperator(
18. task_id="execute_snowflake_indirect_operator_task",
19. python_callable=execute_snowflake_operator,
20. op_kwargs={'snowflake_connection': snowflake_connection}, # Pass the connection here
21. provide_context=True,
22. dag=dag
23. )
Pros
Full flexibility in Python
Ideal for logic-heavy tasks, metadata inspection, or dynamic SQL
Can integrate complex control flow
Cons
SQL is executed inside the Airflow worker, increasing load and runtime
Less scalable for large operations
You manage error handling and connection lifecycle manually
This method is suitable for smaller, metadata-driven, or low-latency tasks where query results need to be processed immediately in Python.
Approach 3: Dynamic SnowflakeOperator Inside a Python function
The third approach combines the flexibility of Python with the performance of the SnowflakeOperator. You dynamically create and execute SnowflakeOperator tasks from inside a PythonOperator. This enables looping over dynamically generated SQL while preserving execution on Snowflake.
1. from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
2.
3. def execute_dynamic_snowflake_ops(**kwargs):
4. table_names = ["orders_2023", "orders_2024"] # Dynamically calculated in real use
5.
6. for table in table_names:
7. sql_stmt = f"DELETE FROM {table} WHERE is_test_data = TRUE;"
8. dynamic_op = SnowflakeOperator(
9. task_id=f"cleanup_{table}",
10. snowflake_conn_id=snowflake_connection,
11. sql=sql_stmt,
12. autocommit=True,
13. dag=kwargs["dag"],
14. )
15. dynamic_op.execute(context=kwargs)
16.
17. # define task using the PythonOperator
18. execute_snowflake_cursor_task = PythonOperator(
19. task_id="execute_snowflake_indirect_operator_task",
20. python_callable=execute_snowflake_operator,
21. op_kwargs={'snowflake_connection': snowflake_connection}, # Pass the connection here
22. provide_context=True,
23. dag=dag
Pros
Flexibility to calculate query content and targets dynamically
Keeps execution on Snowflake, avoiding Airflow compute usage
Useful for batch-style or templated SQL operations
Cons
All subqueries run within one Python task, reducing visibility in the Airflow UI
Failures may be harder to trace to specific queries
Loses automatic retry behavior for individual queries
More complex to manage and test
This hybrid method shines when you need to loop over dynamic inputs (dates, teams, business units) but still want Snowflake to do the work, not Airflow.
Summary Comparison
Feature | SnowflakeOperator | SnowflakeHook | SnowflakeOperator in Python Function |
Compute Location | Snowflake | Airflow Worker | Snowflake |
Flexibility | Low | High | High |
Airflow Resource Load | Low | High | Low |
Airflow UI Task Visibility | High | Medium | Medium |
Dynamic Query Generation | Low | High | High |
Best Use Case | Static queries | Metadata-based logic | Dynamic batch SQL execution |
Conclusion
Use SnowflakeOperator when your SQL is static and doesn't depend on runtime variables. It’s Snowflake’s recommended method, as it keeps compute on Snowflake and lets Airflow orchestration.
Use SnowflakeHook with a cursor only when you need Python-level logic, such as looping over metadata or handling small result sets. Snowflake advises against this for heavy workloads, since execution happens on the Airflow worker.
Use SnowflakeOperator inside a Python function when you want to generate queries dynamically at runtime, for example when looping over tables or dates. This keeps compute on Snowflake, while allowing flexible control through Python.
From Snowflake's perspective, both the first and third approaches are aligned with best practices. Let Snowflake handle compute, and let Airflow orchestrate. This avoids overloading your scheduler and ensures scalable performance.
References
apache-airflow-providers-snowflake: https://airflow.apache.org/docs/apache-airflow-providers-snowflake/stable/index.html
Snowflake Connector for Python: https://docs.snowflake.com/en/developer-guide/python-connector/python-connector
Comments