Uploaded image for project: 'Apache Airflow'
  1. Apache Airflow
  2. AIRFLOW-74

SubdagOperators can consume all celeryd worker processes

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.6.2, 1.7.0, 1.7.1
    • Fix Version/s: 1.10.0
    • Component/s: celery
    • Labels:
      None
    • Environment:
      Airflow 1.7.1rc3 with CeleryExecutor
      1 webserver
      1 scheduler
      2 workers

      Description

      If the amount of concurrent ```SubdagOperator``` running >= the no. of celery worker processes tasks are unable to work. All SDOs come to a complete halt. Futhermore performance of a DAG is drastically reduced even before full saturation of the workers as less workers are gradually available for actual tasks. A workaround for this is to specify ```SequentialExecutor``` be used by the ```SubdagOperator```

      ```
      from datetime import timedelta, datetime
      from airflow.models import DAG, Pool
      from airflow.operators import BashOperator, SubDagOperator, DummyOperator
      from airflow.executors import SequentialExecutor
      import airflow

      1. -----------------------------------------------------------------\
      2. DEFINE THE POOLS
      3. -----------------------------------------------------------------/
        session = airflow.settings.Session()
        for p in ['test_pool_1', 'test_pool_2', 'test_pool_3']:
        pool = (
        session.query(Pool)
        .filter(Pool.pool == p)
        .first())
        if not pool:
        session.add(Pool(pool=p, slots=8))
        session.commit()
      1. -----------------------------------------------------------------\
      2. DEFINE THE DAG
      3. -----------------------------------------------------------------/
      1. Define the Dag Name. This must be unique.
        dag_name = 'hanging_subdags_n16_sqe'
      1. Default args are passed to each task
        default_args = {
        'owner': 'Airflow',
        'depends_on_past': False,
        'start_date': datetime(2016, 04, 10),
        'retries': 0,
        'retry_interval': timedelta(minutes=5),
        'email': ['your@email.com'],
        'email_on_failure': True,
        'email_on_retry': True,
        'wait_for_downstream': False,
        }
      1. Create the dag object
        dag = DAG(dag_name,
        default_args=default_args,
        schedule_interval='0 0 * * *'
        )
      1. -----------------------------------------------------------------\
      2. DEFINE THE TASKS
      3. -----------------------------------------------------------------/

      def get_subdag(dag, sd_id, pool=None):
      subdag = DAG(
      dag_id='

      {parent_dag}

      .

      {sd_id}'.format(
      parent_dag=dag.dag_id,
      sd_id=sd_id),
      params=dag.params,
      default_args=dag.default_args,
      template_searchpath=dag.template_searchpath,
      user_defined_macros=dag.user_defined_macros,
      )

      t1 = BashOperator(
      task_id='{sd_id}

      _step_1'.format(
      sd_id=sd_id
      ),
      bash_command='echo "hello" && sleep 60',
      dag=subdag,
      pool=pool,
      executor=SequentialExecutor
      )

      t2 = BashOperator(
      task_id='

      {sd_id}

      _step_two'.format(
      sd_id=sd_id
      ),
      bash_command='echo "hello" && sleep 15',
      dag=subdag,
      pool=pool,
      executor=SequentialExecutor
      )

      t2.set_upstream(t1)

      sdo = SubDagOperator(
      task_id=sd_id,
      subdag=subdag,
      retries=0,
      retry_delay=timedelta(seconds=5),
      dag=dag,
      depends_on_past=True,
      )

      return sdo

      start_task = DummyOperator(
      task_id='start',
      dag=dag
      )

      for n in range(1, 17):
      sd_i = get_subdag(dag=dag, sd_id='level_1_

      {n}'.format(n=n), pool='test_pool_1')
      sd_ii = get_subdag(dag=dag, sd_id='level_2_{n}

      '.format(n=n), pool='test_pool_2')
      sd_iii = get_subdag(dag=dag, sd_id='level_3_

      {n}

      '.format(n=n), pool='test_pool_3')

      sd_i.set_upstream(start_task)
      sd_ii.set_upstream(sd_i)
      sd_iii.set_upstream(sd_ii)
      ```

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                zgl zgl
                Reporter:
                stevenyk Steven Yvinec-Kruyk
              • Votes:
                6 Vote for this issue
                Watchers:
                14 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: