Uploaded image for project: 'Apache Airflow'
  1. Apache Airflow
  2. AIRFLOW-3136

Scheduler Failing the Task retries run while processing Executor Events

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.9.0
    • Fix Version/s: 1.10.1, 2.0.0
    • Component/s: scheduler
    • Labels:
      None

      Description

      Following behaviour is observed with Airflow 1.9 with LocalExecutor mode

       

      Airflow scheduler processes the executor events in "_process_executor_events(self, simple_dag_bag, session=None)" function of jobs.py.
      The events are identified by key which is composed of dag id, task id, execution date. So all retries of a task have the same key.

      If task retry interval is very small like 30 seconds than scheduler might schedule the next retry run while the previous task run result is still in the executor event queue.
      Current task run might be in queued state while scheduler is processing the executor's previous events Which might make scheduler to fail the current run because of following code in the jobs.py file

      def _process_executor_events(self, simple_dag_bag, session=None):
      """
      Respond to executor events.
      """

      1. TODO: this shares quite a lot of code with _manage_executor_state

      TI = models.TaskInstance
      for key, state in list(self.executor.get_event_buffer(simple_dag_bag.dag_ids)
      .items()):
      dag_id, task_id, execution_date = key
      self.log.info(
      "Executor reports %s.%s execution_date=%s as %s",
      dag_id, task_id, execution_date, state
      )
      if state == State.FAILED or state == State.SUCCESS:
      qry = session.query(TI).filter(TI.dag_id == dag_id,
      TI.task_id == task_id,
      TI.execution_date == execution_date)
      ti = qry.first()
      if not ti:
      self.log.warning("TaskInstance %s went missing from the database", ti)
      continue

      TODO: should we fail RUNNING as well, as we do in Backfills?
      if ti.state == State.QUEUED:
      msg = ("Executor reports task instance %s finished (%s) "
      "although the task says its %s. Was the task "
      "killed externally?".format(ti, state, ti.state))

       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                vardan Vardan Gupta
                Reporter:
                ramandumcs raman
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: