Uploaded image for project: 'Apache Airflow'
  1. Apache Airflow
  2. AIRFLOW-584

Airflow Pool does not limit running tasks

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.7.1.3
    • Fix Version/s: None
    • Component/s: scheduler
    • Labels:
    • Environment:
      Ubuntu 14.04

      Description

      Airflow pools are not limiting the number of running task instances for the following dag in 1.7.1.3

      Steps to recreate:
      Create a pool of size 5 through the UI.

      The following dag has 52 tasks with increasing priority corresponding to the task number. There should only ever be 5 tasks running at a time however I observed 29 'used slots' in a pool with 5 slots

      dag_name = 'pools_bug'
      
      default_args = {
          'owner': 'airflow',
          'depends_on_past': False,
          'start_date': datetime(2016, 10, 20),
          'email_on_failure': False,
          'retries': 1
      }
      
      dag = DAG(dag_name, default_args=default_args, schedule_interval="0 8 * * *")
      start = DummyOperator(task_id='start', dag=dag)
      end = DummyOperator(task_id='end', dag=dag)
      
      for i in range(50):
          sleep_command = 'sleep 10'
          task_name = 'task-{}'.format(i)
          op = BashOperator(
              task_id=task_name,
              bash_command=sleep_command,
              execution_timeout=timedelta(hours=4),
              priority_weight=i,
              pool=dag_name,
              dag=dag)
      
          start.set_downstream(op)
          end.set_upstream(op)
      

      Relevant configurations from airflow.cfg:

      [core]
      # The executor class that airflow should use. Choices include
      # SequentialExecutor, LocalExecutor, CeleryExecutor
      executor = CeleryExecutor
      
      # The amount of parallelism as a setting to the executor. This defines
      # the max number of task instances that should run simultaneously
      # on this airflow installation
      parallelism = 64
      
      # The number of task instances allowed to run concurrently by the scheduler
      dag_concurrency = 64
      
      # The maximum number of active DAG runs per DAG
      max_active_runs_per_dag = 1
      
      [celery]
      # This section only applies if you are using the CeleryExecutor in
      # [core] section above
      
      # The app name that will be used by celery
      celery_app_name = airflow.executors.celery_executor
      
      # The concurrency that will be used when starting workers with the
      # "airflow worker" command. This defines the number of task instances that
      # a worker will take, so size up your workers based on the resources on
      # your worker box and the nature of your tasks
      celeryd_concurrency = 64
      
      [scheduler]
      # Task instances listen for external kill signal (when you clear tasks
      # from the CLI or the UI), this defines the frequency at which they should
      # listen (in seconds).
      job_heartbeat_sec = 5
      
      # The scheduler constantly tries to trigger new tasks (look at the
      # scheduler section in the docs for more information). This defines
      # how often the scheduler should run (in seconds).
      scheduler_heartbeat_sec = 5
      


        Attachments

        1. Screen Shot 2019-02-20 at 1.37.24 PM.png
          23 kB
          Grant Nicholas
        2. img2.png
          19 kB
          David Kegley
        3. img1.png
          15 kB
          David Kegley

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              kegs David Kegley
            • Votes:
              5 Vote for this issue
              Watchers:
              16 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: