Uploaded image for project: 'Apache Airflow'
  1. Apache Airflow
  2. AIRFLOW-6454

add test for time taken by scheduler to run dag of diff num of tasks (2 vs 20 vs 200 vs 2000 vs 20000 simple 1 line print tasks)

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.10.7
    • Fix Version/s: None
    • Component/s: tests
    • Labels:
      None

      Description

      LUIGI vs AIRFLOW

       

      200 sequential tasks (so no parallelism):

       

      LUIGI:
      mkdir -p test_output8
      pip install luigi
      #no need to start web server, scheduler or meta db
      #8.3secs total time for all 200
      time python3 -m luigi --module cloop --local-scheduler ManyMany

       

      AIRFLOW:
      #1032 sec total time for all 200, .16s per task but 5sec gap between tasks
      #intention was for tasks in the DAG to be completely sequential ie task 3 must wait for task 2 which must wait for task 1..etc but chain() not working as intended?? so used default_pool=1
      airflow initdb
      nohup airflow webserver -p 8080 &
      nohup airflow scheduler &
      airflow trigger_dag looper2
      #look at dagrun start-endtime

       

      cloop.py

      import os
      #import time
      
      import luigi
      
      # To run:
      # cd ~/luigi_workflows
      # pythonpath=.. luigi --module=luigi_workflows.test_resources ManyMany --workers=100
      
      class Sleep(luigi.Task):
          #resources = {'foo': 10}
      
          fname = luigi.Parameter()
      
          def requires(self):
              #print(self)
              zin=self.fname
              ii=int(zin.split('_')[1])
              if ii > 1:
                  return Sleep(fname='marker_{}'.format(ii-1))
              else:
                  []
      
          def full_path(self):
              return os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test_output8', self.fname)
      
          def run(self):
              #time.sleep(1)
              with open(self.full_path(), 'w') as f:
                  print('', file=f)
      
          def output(self):
              return luigi.LocalTarget(self.full_path())
      
      
      class Many(luigi.WrapperTask):
          n = luigi.IntParameter()
      
          def requires(self):
              for i in range(self.n):
                  yield Sleep(fname='marker_{}'.format(i))
      
      
      class ManyMany(luigi.WrapperTask):
          n = luigi.IntParameter(default=200)
      
          def requires(self):
              for i in range(self.n):
                  yield Many(n=self.n)
      

      looper2.py

      import airflow
      from airflow.models import DAG
      from airflow.operators.bash_operator import BashOperator
      from airflow.operators.dummy_operator import DummyOperator
      from airflow.utils.helpers import chain
      
      args = {
          'owner': 'airflow',
          'retries': 3,
          'start_date': airflow.utils.dates.days_ago(2)
      }
      
      dag = DAG(
          dag_id='looper2', default_args=args,
          schedule_interval=None)
      
      chain([DummyOperator(task_id='op' + str(i), dag=dag) for i in range(1, 201)])
      
      if __name__ == "__main__":
          dag.cli()
      

      I saw similar test in
      https://github.com/apache/airflow/pull/5096 but it did not seem to be sequential or using scheduler

      Possible test scenarios:
      1. 1 DAG with 200 tasks running sequentially
      2. 1 DAG with 200 tasks running all in parallel (200 slots)
      3. 1 DAG with 200 tasks running all in parallel (48 slots)
      4. 200 DAGs each with 1 task
      Then repeat above changing 200 to 2000 or 20.etc

      Qs:
      1. any plans for an 'in-memory' scheduler like Luigi's?
      2. Anyone open to a Luigi Operator?
      3. Any speedups to make existing scheduler faster? Noting that the tasks here are sequential (should be similar time to 200 dags of 1 task each)

      ControlM comparison:
      is it envisioned that airflow becomes a replacement for https://www.bmcsoftware.uk/it-solutions/control-m.html ?
      execution_date seems similar to Order Date, DAG seems similar to job, tasks in a dag seem similar to a command called by a job but some of the items I see missing:
      1. integrating public holiday calendars,
      2. ability to specify schedule like 11am on '2nd weekday of the month', 'last 5 days of the month', 'last business day of the month'
      3. ability to visualise dependencies between dags (there does not seem to be a high level way to say at 11am schedule DAGc after DAGa and DAGb, then at 3pm schedule DAGd after DAGc only if DAGc was successful )
      4. ability to click 1 to many dags in a UI and change their state to killed/success (force ok).etc and have it instantly affect task instances (ie stopping them)
      5. ability to set whole DAGs to 'dummy' on certain days of the week. ie DAGb (runs 7 days a week and do stuff) must run after DAGa for each execdate (DAGa should do stuff on mon-fri but on sat/sun DAGa should 'do' nothing ie entire dag is 'dummy' just to satisfy 'IN condition' of DAGb)
      6. ability to change the number of tasks within a DAG for a diff exec date without 'stuffing' up the scheduler/metadb
      7. ability to 'order up' any day in the past/future (for all or some dags) and keep it on 'hold', visualise which dags 'would' be scheduled, see dag dependencies, and choose to run all/some (or just do nothing and delete them) of the DAGs while maintaining dependencies between them and optionally 'forcing ok' some to skip dependencies.
      8. ability to feed in conf (ie arguments) to a DAG from a UI or change the host the dag runs on
      9. ability to rerun an entire 'exec date' and maintain audit trail in the db of timings of the 1st run of that exec date, plus allow different conf on 2nd run.
      10. faster execution,
      a) it seems if I want 15 different dag ids of 300 tasks each and all should run exact same tasks (just with different conf arguments) the dagbag has to parse 4500 tasks instead of recognising a single set of 300 differed only by conf
      b) 'push' flow of tasks within a dag, rather than gaps between tasks
      c) scheduler does not get overloaded with 100k tasks
      11. dagrun timeout (without maxruns constraint)
      12. enforce depends on prior exec date of a dag with schedules that may only be weekly, certain days a week
      13. multi pools (ie quantitative resources) on a single dag
      14. ability to edit schedules via the UI
      15. audit trail of changes to a DAG (not tasks but things like schedule, runas user)

      At the moment:
      ControlM=Enterprise features, stability, speed but no python definitions of tasks
      Luigi=Speed and python definitions of tasks but no scheduling
      Airflow=Community momentum and python definitions of tasks but not fast and lacking some features of ControlM

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                toopt4 t oo
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated: