774 questions
0
votes
0
answers
14
views
Airflow BigQueryInsertJobOperator and job re-attachment
The inciting problem
One of our DAGs suffers from an issue wherein a BigQueryInsertJobOperator task fails due to it becoming a "zombie task" (from Airflow's perspective); in the "Event ...
3
votes
2
answers
179
views
Retrieve the error cause / exception from failed task instances
Let's say I have a DAG with some source1, source2, ... tasks and a sink task, with a simple [source1, source2, ...] >> sink dependency.
All of them are @task-annotated Python operators, and the ...
Advice
0
votes
0
replies
55
views
How apply airflow db migration and save them into postgress docker image?
I use docker-compose to run airflow. Postgres section is
postgres:
image: postgres:12.16
environment:
- POSTGRES_USER
- POSTGRES_PASSWORD
- POSTGRES_DB
healthcheck:
test: [ "...
Best practices
0
votes
0
replies
49
views
How to manage (Google Cloud Composer) Airflow roles with infrastructure as code?
Are there any best practices for handling roles in code?
Most of our setup is managed via Terraform. It would be great if there was a possibility.
There is a Terraform provider, however using the ...
Tooling
1
vote
0
replies
74
views
masking secrets airflow 2.4.3
I’m using Apache Airflow 2.4.3 and trying to securely store a Snowflake connection with a private key inside the connection’s extras JSON field.
I want to mask the sensitive private_key_content field ...
0
votes
0
answers
82
views
MWAA Airflow task_instance_mutation_hook not working
I have added a airflow_local_setting.py file to my mwaa s3 bucket at the root of dags folder. I have a dag_policy defined which is being picked up and works as expected.
In the same file I have added ...
0
votes
0
answers
76
views
Airflow backfill job randomly experiences error: Task state changed externally
I am using Google Cloud Composer environment and Apache Airflow to run my DAG.
Almost all tasks are run in deferrable mode, since they are long-running.
I noticed that when running a backfill job for ...
0
votes
1
answer
59
views
Airflow DAG task dependency issue
I have a task dependency like this:
task1 (must succeed)
|
v
+-------------------+
| |
task2-A task2-B
| |
v ...
0
votes
0
answers
44
views
How to correctly handle Airflow DAG go-live
We're migrating some scheduled jobs from to Airflow. The DAGs are written at some point in time, tested and deployed to the production environment. Another team is responsible for enabling these DAGs ...
0
votes
0
answers
39
views
Airflow Bitnami Helm 17.2.4: DB Connection Fails Only With Custom Logger (No Import Error)
I am using Airflow with the Bitnami Helm chart (version 17.2.4).
I added a custom JSON logger using a ConfigMap and set the environment variable AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS=...
0
votes
0
answers
27
views
Insert task_id from curl in apache airflow
How can I pass the task_id parameter through curl? I use KubernetesPodOperator
After naming the variable tasks = """ {{ dag_run.conf.image_tag }} """ and specifying ...
0
votes
0
answers
29
views
Airflow DAGs Queued Not Skipped
I want to create a logic that can run 2 DAGs in parallel.
When more DAGs triggered and 2 DAGs already running I want to skip the upcoming DAGs.
Current logic queue the upcoming DAGs and not skip them ...
0
votes
0
answers
215
views
Airflow MWAA update from 2.4.3 to 2.10.3 worker OOM
I updated my MWAA environment from 2.4.3 to 2.10.3 and I keep getting the following error randomly when i am executing multiple task at the same time: INFO - Task exited with return code -9. For more ...
0
votes
1
answer
271
views
How Does Airflow Handle Multiprocessing Tasks on an 8-Core CPU When parallelism Is Set to 32?
I’m using Apache Airflow with the LocalExecutor on a machine that has an 8-core CPU.
Let’s say I have a DAG with two tasks that can run in parallel, and each of these tasks uses Python's ...
2
votes
1
answer
296
views
In Airflow 2.10 can I use dynamic task mapping with BranchPythonOperator?
Below is a minimal implementation of a branch operator using the taskflow api.
The dag will execute either odd_task or even_task based on the string given by branch_on_condition. odd_task or even_task ...