![]() Raise AirflowException("Failing task because one or more upstream tasks failed. And finally, fail the task if fail_this_task=True:. ![]() From the returned task instances, get the states and check if State.FAILED is in there:.Query all upstream task instances for the current task:.Annotate with and add argument session=None, so you can query the Airflow DB with session.Look at the _finally function, which is called by the PythonOperator. Succesful_task = DummyOperator(task_id="succesful_task", dag=dag) Raise AirflowException("Failing task because one or more upstream tasks failed.") The status of the DAG Run depends on the tasks states. Any time the DAG is executed, a DAG Run is created and all tasks inside it are executed. Upstream_states = įail_this_task = State.FAILED in upstream_states Home Core Concepts DAG Runs DAG Runs A DAG Run is an object representing an instantiation of the DAG in time. TaskInstance.task_id.in_(task.upstream_task_ids), TaskInstance.execution_date = execution_date, A key capability of Airflow is that these DAG Runs are atomic, idempotent items, and the scheduler, by default, will examine the lifetime of the DAG (from start to end/now, one interval at a time) and kick off a DAG Run for any interval that has not been run (or has been cleared). Start = BashOperator(task_id="start", bash_command="echo start", dag=dag)įailing_task = BashOperator(task_id="failing_task", bash_command="exit 1", _finally(task, execution_date, dag, session=None, **_): It became a bit of a superfluous task, but here's the end result:įrom airflow.models import DAG, TaskInstance, BaseOperatorįrom _operator import DummyOperatorįrom _operator import PythonOperatorįrom import provide_sessionįrom _rule import TriggerRuleĭefault_args = I thought it was an interesting question and spent some time figuring out how to achieve it without an extra dummy task. This post seems to be related, but the answer does not suit my needs, since the downstream task end must be executed (hence the mandatory trigger_rule). How can I configure my DAG so that if one of the tasks failed, the whole DAG is marked as FAILED? Example to reproduce import datetimeįrom _operator import BashOperatorĮnd.trigger_rule = trigger_DONE However, since end is the last task and succeeds, the DAG is always marked as SUCCESS. Using that, end is properly executed if special_task fails. For that, I used the trigger rule ALL_DONE: end.trigger_rule = trigger_DONE The task in the middle can succeed or fail, but end must always be executed (imagine this is a task for cleanly closing resources). I have the following DAG with 3 tasks: start -> special_task -> end
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |