Uploaded image for project: 'Apache Airflow'
  1. Apache Airflow
  2. AIRFLOW-5354

Scheduler - constant CPU usage of 25% with nothing running and scheduling loop running too frequently

Agile BoardAttach filesAttach ScreenshotVotersWatch issueWatchersLinkUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.10.4
    • Fix Version/s: 2.0.0
    • Component/s: scheduler
    • Labels:
      None

      Description

      I've put logging level to debug, cpu utilisation is constant 25% but no dags are running (they have none schedule since i only externally trigger). Why is the scheduling loop running every 2 seconds? can I make it every 30 seconds?

       

      here is some of the scheduler log

       

      [2019-08-30 09:28:57,538] {scheduler_job.py:1363} DEBUG - Starting Loop...
      [2019-08-30 09:28:57,539] {scheduler_job.py:1374} DEBUG - Harvesting DAG parsing results
      [2019-08-30 09:28:57,539] {dag_processing.py:635} DEBUG - Received message of type DagParsingStat
      [2019-08-30 09:28:57,539] {dag_processing.py:635} DEBUG - Received message of type DagParsingStat
      [2019-08-30 09:28:57,539] {dag_processing.py:635} DEBUG - Received message of type DagParsingStat
      [2019-08-30 09:28:57,539] {dag_processing.py:635} DEBUG - Received message of type DagParsingStat
      [2019-08-30 09:28:57,540] {scheduler_job.py:1376} DEBUG - Harvested 0 SimpleDAGs
      [2019-08-30 09:28:57,540] {scheduler_job.py:1411} DEBUG - Heartbeating the executor
      [2019-08-30 09:28:57,540] {base_executor.py:124} DEBUG - 0 running task instances
      [2019-08-30 09:28:57,540] {base_executor.py:125} DEBUG - 0 in queue
      [2019-08-30 09:28:57,540] {base_executor.py:126} DEBUG - 3 open slots
      [2019-08-30 09:28:57,540] {base_executor.py:135} DEBUG - Calling the <class 'airflow.executors.local_executor.LocalExecutor'> sync method
      [2019-08-30 09:28:57,541] {scheduler_job.py:1432} DEBUG - Ran scheduling loop in 0.00 seconds
      [2019-08-30 09:28:57,541] {scheduler_job.py:1435} DEBUG - Sleeping for 1.00 seconds
      [2019-08-30 09:28:58,543] {scheduler_job.py:1447} DEBUG - Sleeping for 1.00 seconds to prevent excessive logging
      [2019-08-30 09:28:59,541] {scheduler_job.py:1363} DEBUG - Starting Loop...
      [2019-08-30 09:28:59,541] {scheduler_job.py:1374} DEBUG - Harvesting DAG parsing results
      [2019-08-30 09:28:59,541] {dag_processing.py:635} DEBUG - Received message of type DagParsingStat
      [2019-08-30 09:28:59,542] {dag_processing.py:635} DEBUG - Received message of type DagParsingStat
      [2019-08-30 09:28:59,542] {dag_processing.py:635} DEBUG - Received message of type DagParsingStat
      [2019-08-30 09:28:59,542] {dag_processing.py:635} DEBUG - Received message of type DagParsingStat
      [2019-08-30 09:28:59,542] {scheduler_job.py:1376} DEBUG - Harvested 0 SimpleDAGs
      [2019-08-30 09:28:59,542] {scheduler_job.py:1411} DEBUG - Heartbeating the executor
      [2019-08-30 09:28:59,542] {base_executor.py:124} DEBUG - 0 running task instances
      [2019-08-30 09:28:59,543] {base_executor.py:125} DEBUG - 0 in queue
      [2019-08-30 09:28:59,543] {base_executor.py:126} DEBUG - 3 open slots
      [2019-08-30 09:28:59,543] {base_executor.py:135} DEBUG - Calling the <class 'airflow.executors.local_executor.LocalExecutor'> sync method
      [2019-08-30 09:28:59,544] {scheduler_job.py:1432} DEBUG - Ran scheduling loop in 0.00 seconds
      [2019-08-30 09:28:59,544] {scheduler_job.py:1435} DEBUG - Sleeping for 1.00 seconds
      [2019-08-30 09:29:00,545] {scheduler_job.py:1447} DEBUG - Sleeping for 1.00 seconds to prevent excessive logging
      [2019-08-30 09:29:01,544] {scheduler_job.py:1363} DEBUG - Starting Loop...
      [2019-08-30 09:29:01,545] {scheduler_job.py:1374} DEBUG - Harvesting DAG parsing results
      [2019-08-30 09:29:01,545] {dag_processing.py:635} DEBUG - Received message of type DagParsingStat
      [2019-08-30 09:29:01,545] {dag_processing.py:635} DEBUG - Received message of type DagParsingStat
      [2019-08-30 09:29:01,545] {dag_processing.py:635} DEBUG - Received message of type DagParsingStat
      [2019-08-30 09:29:01,545] {dag_processing.py:635} DEBUG - Received message of type DagParsingStat
      [2019-08-30 09:29:01,546] {scheduler_job.py:1376} DEBUG - Harvested 0 SimpleDAGs
      [2019-08-30 09:29:01,546] {scheduler_job.py:1411} DEBUG - Heartbeating the executor
      [2019-08-30 09:29:01,546] {base_executor.py:124} DEBUG - 0 running task instances
      [2019-08-30 09:29:01,546] {base_executor.py:125} DEBUG - 0 in queue
      [2019-08-30 09:29:01,547] {base_executor.py:126} DEBUG - 3 open slots
      [2019-08-30 09:29:01,547] {base_executor.py:135} DEBUG - Calling the <class 'airflow.executors.local_executor.LocalExecutor'> sync method
      [2019-08-30 09:29:01,548] {scheduler_job.py:1432} DEBUG - Ran scheduling loop in 0.00 seconds
      [2019-08-30 09:29:01,548] {scheduler_job.py:1435} DEBUG - Sleeping for 1.00 seconds
      [2019-08-30 09:29:02,549] {scheduler_job.py:1447} DEBUG - Sleeping for 1.00 seconds to prevent excessive logging
      [2019-08-30 09:29:03,548] {scheduler_job.py:1363} DEBUG - Starting Loop...
      [2019-08-30 09:29:03,548] {scheduler_job.py:1374} DEBUG - Harvesting DAG parsing results
      [2019-08-30 09:29:03,548] {dag_processing.py:635} DEBUG - Received message of type DagParsingStat
      [2019-08-30 09:29:03,549] {dag_processing.py:635} DEBUG - Received message of type DagParsingStat
      [2019-08-30 09:29:03,549] {dag_processing.py:635} DEBUG - Received message of type DagParsingStat
      [2019-08-30 09:29:03,549] {dag_processing.py:635} DEBUG - Received message of type DagParsingStat
      [2019-08-30 09:29:03,549] {scheduler_job.py:1376} DEBUG - Harvested 0 SimpleDAGs
      [2019-08-30 09:29:03,550] {scheduler_job.py:1411} DEBUG - Heartbeating the executor
      [2019-08-30 09:29:03,550] {base_executor.py:124} DEBUG - 0 running task instances
      [2019-08-30 09:29:03,550] {base_executor.py:125} DEBUG - 0 in queue
      [2019-08-30 09:29:03,550] {base_executor.py:126} DEBUG - 3 open slots
      [2019-08-30 09:29:03,550] {base_executor.py:135} DEBUG - Calling the <class 'airflow.executors.local_executor.LocalExecutor'> sync method
      [2019-08-30 09:29:03,551] {scheduler_job.py:1423} DEBUG - Heartbeating the scheduler
      [2019-08-30 09:29:03,564] {base_job.py:197} DEBUG - [heartbeat]
      [2019-08-30 09:29:03,565] {scheduler_job.py:1432} DEBUG - Ran scheduling loop in 0.02 seconds
      [2019-08-30 09:29:03,565] {scheduler_job.py:1435} DEBUG - Sleeping for 1.00 seconds
      [2019-08-30 09:29:04,567] {scheduler_job.py:1447} DEBUG - Sleeping for 0.98 seconds to prevent excessive logging

       

       

      ps -ef | grep -i airf
      ec2-user 5648 1 0 09:05 pts/1 00:00:05 /home/ec2-user/venv/bin/python2.7 /home/ec2-user/venv/bin/airflow scheduler
      ec2-user 5727 5648 0 09:05 pts/1 00:00:00 /home/ec2-user/venv/bin/python2.7 /home/ec2-user/venv/bin/airflow scheduler
      ec2-user 5734 5648 0 09:05 pts/1 00:00:00 /home/ec2-user/venv/bin/python2.7 /home/ec2-user/venv/bin/airflow scheduler
      ec2-user 5738 5648 0 09:05 pts/1 00:00:00 /home/ec2-user/venv/bin/python2.7 /home/ec2-user/venv/bin/airflow scheduler
      ec2-user 5739 5648 0 09:05 pts/1 00:00:00 /home/ec2-user/venv/bin/python2.7 /home/ec2-user/venv/bin/airflow scheduler
      ec2-user 5746 5648 0 09:05 pts/1 00:00:02 airflow scheduler – DagFileProcessorManager
      ec2-user 6161 1 0 09:05 pts/1 00:00:04 /home/ec2-user/venv/bin/python2.7 /home/ec2-user/venv/bin/airflow webserver -p 8080 --access_logfile /home/ec2-user/airflow/logs/airflow-webserver-access.log
      ec2-user 6243 6161 0 09:05 pts/1 00:00:01 gunicorn: master [airflow-webserver]
      ec2-user 6303 6243 0 09:05 pts/1 00:00:06 [ready] gunicorn: worker [airflow-webserver]
      ec2-user 6304 6243 0 09:05 pts/1 00:00:06 [ready] gunicorn: worker [airflow-webserver]
      ec2-user 12539 6243 0 09:16 pts/1 00:00:06 [ready] gunicorn: worker [airflow-webserver]
      ec2-user 18988 6243 0 09:26 pts/1 00:00:04 [ready] gunicorn: worker [airflow-webserver]

       

      airflow.cfg

      [core]
      dags_folder = /home/ec2-user/airflow/dags
      base_log_folder = /home/ec2-user/airflow/logs
      remote_logging = False
      remote_log_conn_id =
      remote_base_log_folder =
      encrypt_s3_logs = False
      logging_level = DEBUG
      fab_logging_level = DEBUG
      logging_config_class =
      colored_console_log = True
      colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
      colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter
      log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
      simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
      log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
      log_processor_filename_template = {{ filename }}.log
      dag_processor_manager_log_location = /home/ec2-user/airflow/logs/dag_processor_manager/dag_processor_manager.log
      hostname_callable = socket:getfqdn
      default_timezone = utc
      executor = LocalExecutor
      sql_alchemy_conn_cmd = /home/ec2-user/airflow/scripts/get_db_conn_str.sh
      sql_engine_encoding = utf-8
      sql_alchemy_pool_enabled = True
      sql_alchemy_pool_size = 5
      sql_alchemy_max_overflow = 10
      sql_alchemy_pool_recycle = 1800
      sql_alchemy_reconnect_timeout = 300
      sql_alchemy_schema =
      parallelism = 3
      dag_concurrency = 3
      dags_are_paused_at_creation = False
      max_active_runs_per_dag = 350
      load_examples = False
      plugins_folder = /home/ec2-user/airflow/plugins
      fernet_key = REDACT
      donot_pickle = False
      dagbag_import_timeout = 300
      task_runner = StandardTaskRunner
      default_impersonation =
      security =
      secure_mode = True
      unit_test_mode = False
      task_log_reader = task
      enable_xcom_pickling = True
      killed_task_cleanup_time = 60
      dag_run_conf_overrides_params = False
      worker_precheck = False
      dag_discovery_safe_mode = False
      [cli]
      api_client = airflow.api.client.local_client
      endpoint_url = http://localhost:8080
      [api]
      auth_backend = airflow.api.auth.backend.deny_all
      [lineage]
      backend =
      [atlas]
      sasl_enabled = False
      host =
      port = 21000
      username =
      password =
      [operators]
      default_owner = airflow
      default_cpus = 1
      default_ram = 512
      default_disk = 512
      default_gpus = 0
      [hive]
      default_hive_mapred_queue =
      [webserver]
      base_url = https://REDACT:8080
      web_server_host = 0.0.0.0
      web_server_port = 8080
      web_server_ssl_cert = /home/ec2-user/certs/REDACT
      web_server_ssl_key = /home/ec2-user/certs/REDACT
      web_server_master_timeout = 120
      web_server_worker_timeout = 120
      worker_refresh_batch_size = 1
      worker_refresh_interval = 600
      secret_key = temporary_key
      workers = 4
      worker_class = gevent
      access_logfile = -
      error_logfile = -
      expose_config = False
      authenticate = True
      auth_backend = airflow.contrib.auth.backends.ldap_auth
      filter_by_owner = False
      owner_mode = user
      dag_default_view = tree
      dag_orientation = LR
      demo_mode = False
      log_fetch_timeout_sec = 5
      hide_paused_dags_by_default = False
      page_size = 100
      rbac = False
      navbar_color = #007A87
      default_dag_run_display_number = 25
      enable_proxy_fix = False
      cookie_secure = False
      cookie_samesite =
      default_wrap = False
      [email]
      email_backend = airflow.utils.email.send_email_smtp
      [smtp]
      smtp_host = localhost
      smtp_starttls = True
      smtp_ssl = False
      smtp_port = 25
      smtp_mail_from = airflow@example.com
      [celery]
      celery_app_name = airflow.executors.celery_executor
      worker_concurrency = 16
      worker_log_server_port = 8793
      broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
      result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
      flower_host = 0.0.0.0
      flower_url_prefix =
      flower_port = 5555
      flower_basic_auth =
      default_queue = default
      sync_parallelism = 0
      celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
      ssl_active = False
      ssl_key =
      ssl_cert =
      ssl_cacert =
      pool = prefork
      [celery_broker_transport_options]
      [dask]
      cluster_address = 127.0.0.1:8786
      tls_ca =
      tls_cert =
      tls_key =
      [scheduler]
      job_heartbeat_sec = 5
      scheduler_heartbeat_sec = 25
      run_duration = -1
      min_file_process_interval = 90
      dag_dir_list_interval = 300
      print_stats_interval = 30
      scheduler_health_check_threshold = 30
      child_process_log_directory = /home/ec2-user/airflow/logs/scheduler
      scheduler_zombie_task_threshold = 300
      catchup_by_default = True
      max_tis_per_query = 2000
      statsd_on = False
      statsd_host = localhost
      statsd_port = 8125
      statsd_prefix = airflow
      max_threads = 1
      authenticate = False
      use_job_schedule = False
      [ldap]
      REDACT ldap stuff
      search_scope = SUBTREE
      ignore_malformed_schema = False
      [mesos]
      master = localhost:5050
      framework_name = Airflow
      task_cpu = 1
      task_memory = 256
      checkpoint = False
      authenticate = False
      [kerberos]
      ccache = /tmp/airflow_krb5_ccache
      principal = airflow
      reinit_frequency = 3600
      kinit_path = kinit
      keytab = airflow.keytab
      [github_enterprise]
      api_rev = v3
      [admin]
      hide_sensitive_variable_fields = True
      [elasticsearch]
      host =
      log_id_template = {dag_id}{task_id}{execution_date}-{try_number}
      end_of_log_mark = end_of_log
      frontend =
      write_stdout = False
      json_format = False
      json_fields = asctime, filename, lineno, levelname, message
      [kubernetes]
      worker_container_repository =
      worker_container_tag =
      worker_container_image_pull_policy = IfNotPresent
      delete_worker_pods = True
      worker_pods_creation_batch_size = 1
      namespace = default
      airflow_configmap =
      dags_in_image = False
      dags_volume_subpath =
      dags_volume_claim =
      logs_volume_subpath =
      logs_volume_claim =
      dags_volume_host =
      logs_volume_host =
      env_from_configmap_ref =
      env_from_secret_ref =
      git_repo =
      git_branch =
      git_subpath =
      git_user =
      git_password =
      git_sync_root = /git
      git_sync_dest = repo
      git_dags_folder_mount_point =
      git_ssh_key_secret_name =
      git_ssh_known_hosts_configmap_name =
      git_sync_container_repository = k8s.gcr.io/git-sync
      git_sync_container_tag = v3.1.1
      git_sync_init_container_name = git-sync-clone
      worker_service_account_name =
      image_pull_secrets =
      gcp_service_account_keys =
      in_cluster = True
      affinity =
      tolerations =
      kube_client_request_args =
      run_as_user =
      fs_group =
      [kubernetes_node_selectors]
      [kubernetes_annotations]
      [kubernetes_environment_variables]
      [kubernetes_secrets]
      [kubernetes_labels]

       

      airflow 1.10.4, localexecutor, python2, t3.large EC2

        Attachments

        Issue Links

          Activity

            People

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment