Following behaviour is observed with Airflow 1.9 with LocalExecutor mode
Airflow scheduler processes the executor events in "_process_executor_events(self, simple_dag_bag, session=None)" function of jobs.py.
The events are identified by key which is composed of dag id, task id, execution date. So all retries of a task have the same key.
If task retry interval is very small like 30 seconds than scheduler might schedule the next retry run while the previous task run result is still in the executor event queue.
Current task run might be in queued state while scheduler is processing the executor's previous events Which might make scheduler to fail the current run because of following code in the jobs.py file
def _process_executor_events(self, simple_dag_bag, session=None):
"""
Respond to executor events.
"""
- TODO: this shares quite a lot of code with _manage_executor_state
TI = models.TaskInstance
for key, state in list(self.executor.get_event_buffer(simple_dag_bag.dag_ids)
.items()):
dag_id, task_id, execution_date = key
self.log.info(
"Executor reports %s.%s execution_date=%s as %s",
dag_id, task_id, execution_date, state
)
if state == State.FAILED or state == State.SUCCESS:
qry = session.query(TI).filter(TI.dag_id == dag_id,
TI.task_id == task_id,
TI.execution_date == execution_date)
ti = qry.first()
if not ti:
self.log.warning("TaskInstance %s went missing from the database", ti)
continue
TODO: should we fail RUNNING as well, as we do in Backfills?
if ti.state == State.QUEUED:
msg = ("Executor reports task instance %s finished (%s) "
"although the task says its %s. Was the task "
"killed externally?".format(ti, state, ti.state))
- links to