Uploaded image for project: 'Apache Airflow'
  1. Apache Airflow
  2. AIRFLOW-6360

config option to skip task_stats from getting completed dagruns/tis

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.10.6
    • Fix Version/s: 2.0.0
    • Component/s: ui
    • Labels:
      None

      Description

      task_stats endpoint to display 'recent tasks' is very slow when someone has many dagruns or many tasks in a dag.

      BEFORE
      LastDagRun = (
      session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
      .join(Dag, Dag.dag_id == DagRun.dag_id)
      .filter(DagRun.state != State.RUNNING)
      .filter(Dag.is_active == True) # noqa: E712
      .filter(Dag.is_subdag == False) # noqa: E712
      .group_by(DagRun.dag_id)
      .subquery('last_dag_run')
      )
      RunningDagRun = (
      session.query(DagRun.dag_id, DagRun.execution_date)
      .join(Dag, Dag.dag_id == DagRun.dag_id)
      .filter(DagRun.state == State.RUNNING)
      .filter(Dag.is_active == True) # noqa: E712
      .filter(Dag.is_subdag == False) # noqa: E712
      .subquery('running_dag_run')
      )

      1. Select all task_instances from active dag_runs.
      2. If no dag_run is active, return task instances from most recent dag_run.
        LastTI = (
        session.query(TI.dag_id.label('dag_id'), TI.state.label('state'))
        .join(LastDagRun, and_(
        LastDagRun.c.dag_id == TI.dag_id,
        LastDagRun.c.execution_date == TI.execution_date))
        )
        RunningTI = (
        session.query(TI.dag_id.label('dag_id'), TI.state.label('state'))
        .join(RunningDagRun, and_(
        RunningDagRun.c.dag_id == TI.dag_id,
        RunningDagRun.c.execution_date == TI.execution_date))
        )

      UnionTI = union_all(LastTI, RunningTI).alias('union_ti')
      qry = (
      session.query(UnionTI.c.dag_id, UnionTI.c.state, sqla.func.count())
      .group_by(UnionTI.c.dag_id, UnionTI.c.state)
      )

      AFTER
      #we not interested in stats for dagruns already completed, only want active ones

      RunningDagRun = (
      session.query(DagRun.dag_id, DagRun.execution_date)
      .join(Dag, Dag.dag_id == DagRun.dag_id)
      .filter(DagRun.state == State.RUNNING,
      Dag.is_active,
      Dag.is_subdag == False) # noqa: E712
      .subquery('running_dag_run')
      )

      1. Select all task_instances from active dag_runs.
        qry = (
        session.query(TI.dag_id.label('dag_id'), TI.state.label('state'), sqla.func.count())
        .join(RunningDagRun, and_(
        RunningDagRun.c.dag_id == TI.dag_id,
        RunningDagRun.c.execution_date == TI.execution_date))
        .group_by(TI.dag_id, TI.state)
        )

        Attachments

          Activity

            People

            • Assignee:
              toopt4 t oo
              Reporter:
              toopt4 t oo
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: