Uploaded image for project: 'Apache Airflow'
  1. Apache Airflow
  2. AIRFLOW-6015

Celery needed even if LocalExecutor is configured

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Not A Problem
    • Affects Version/s: 1.10.6
    • Fix Version/s: None
    • Component/s: configuration
    • Labels:
      None
    • Environment:
      ubuntu 16.04
      Python 3.5 with virtualenv

      Description

      Hello,

       

      Celery is required to start airflow worker even if the executor configured is LocalExecutor.

      If i install apache-airflow[celery], the worker daemon well starts and uses Celery as executor.

      log

      Nov 19 19:47:27 airflow-1 systemd[1]: Started Airflow worker daemon.
      Nov 19 19:47:28 airflow-1 bash[10663]: [2019-11-19 19:47:28,226] {settings.py:252} INFO - settings.configure_orm(): Using pool settings. pool_size=128, max_overflow=10, pool_recycle=600, pid=10668
      Nov 19 19:47:29 airflow-1 bash[10663]: Traceback (most recent call last):
      Nov 19 19:47:29 airflow-1 bash[10663]:   File "/home/airflow/airflow/venv/bin/airflow", line 37, in <module>
      Nov 19 19:47:29 airflow-1 bash[10663]:     args.func(args)
      Nov 19 19:47:29 airflow-1 bash[10663]:   File "/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/utils/cli.py", line 74, in wrapper
      Nov 19 19:47:29 airflow-1 bash[10663]:     return f(*args, **kwargs)
      Nov 19 19:47:29 airflow-1 bash[10663]:   File "/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/bin/cli.py", line 1075, in worker
      Nov 19 19:47:29 airflow-1 bash[10663]:     from airflow.executors.celery_executor import app as celery_app
      Nov 19 19:47:29 airflow-1 bash[10663]:   File "/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/executors/celery_executor.py", line 27, in <module>
      Nov 19 19:47:29 airflow-1 bash[10663]:     from celery import Celery
      Nov 19 19:47:29 airflow-1 bash[10663]: ImportError: No module named 'celery'
      Nov 19 19:47:30 airflow-1 systemd[1]: airflow-worker.service: Main process exited, code=exited, status=1/FAILURE
      Nov 19 19:47:30 airflow-1 systemd[1]: airflow-worker.service: Unit entered failed state.
      Nov 19 19:47:30 airflow-1 systemd[1]: airflow-worker.service: Failed with result 'exit-code'.
      Nov 19 19:47:40 airflow-1 systemd[1]: airflow-worker.service: Service hold-off time over, scheduling restart.
      Nov 19 19:47:40 airflow-1 systemd[1]: Stopped Airflow worker daemon.
      

       

      airflow.cfg

       

      [core]
      # The home folder for airflow, default is ~/airflow
      # airflow_home = /home/airflow/airflow
      # The folder where your airflow pipelines live, most likely a
      # subfolder in a code repository
      # This path must be absolute
      dags_folder = /home/airflow/airflow/dags
      # The folder where airflow should store its log files
      # This path must be absolute
      base_log_folder = /var/log/airflow# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
      # must supply a remote location URL (starting with either 's3://...' or
      # 'gs://...') and an Airflow connection id that provides access to the storage
      # location.
      remote_base_log_folder =
      remote_log_conn_id =
      # Use server-side encryption for logs stored in S3
      encrypt_s3_logs = False
      # DEPRECATED option for remote log storage, use remote_base_log_folder instead!
      s3_log_folder = None
      # The executor class that airflow should use. Choices include
      # SequentialExecutor, LocalExecutor, CeleryExecutor
      executor = LocalExecutor
      # The SqlAlchemy connection string to the metadata database.
      # SqlAlchemy supports many different database engine, more information
      # their website
      sql_alchemy_conn = mysql://airflow:xxx@localhost:3306/airflow
      # The SqlAlchemy pool size is the maximum number of database connections
      # in the pool.
      sql_alchemy_pool_size = 128
      # The SqlAlchemy pool recycle is the number of seconds a connection
      # can be idle in the pool before it is invalidated. This config does
      # not apply to sqlite.
      sql_alchemy_pool_recycle = 600
      # The amount of parallelism as a setting to the executor. This defines
      # the max number of task instances that should run simultaneously
      # on this airflow installation
      parallelism = 4
      # The number of task instances allowed to run concurrently by the scheduler
      dag_concurrency = 4
      # Are DAGs paused by default at creation
      dags_are_paused_at_creation = True
      # When not using pools, tasks are run in the "default pool",
      # whose size is guided by this config element
      non_pooled_task_slot_count = 128
      # The maximum number of active DAG runs per DAG
      max_active_runs_per_dag = 4
      # Whether to load the examples that ship with Airflow. It's good to
      # get started, but you probably want to set this to False in a production
      # environment
      load_examples = False
      # Where your Airflow plugins are stored
      plugins_folder = /home/airflow/airflow/plugins
      # Secret key to save connection passwords in the db
      fernet_key = xxx
      # Whether to disable pickling dags
      donot_pickle = False
      # How long before timing out a python file import while filling the DagBag
      dagbag_import_timeout = 30
      # The class to use for running task instances in a subprocess
      task_runner = StandardTaskRunner
      # If set, tasks without a `run_as_user` argument will be run with this user
      # Can be used to de-elevate a sudo user running Airflow when executing tasks
      default_impersonation =
      # What security module to use (for example kerberos):
      security =
      # Turn unit test mode on (overwrites many configuration options with test
      # values at runtime)
      unit_test_mode = False
      
      [cli]
      # In what way should the cli access the API. The LocalClient will use the
      # database directly, while the json_client will use the api running on the
      # webserver
      api_client = airflow.api.client.local_client
      endpoint_url = http://localhost:8080
      
      [api]
      # How to authenticate users of the API
      auth_backend = airflow.api.auth.backend.deny_all
      
      [operators]
      # The default owner assigned to each new operator, unless
      # provided explicitly or passed via `default_args`
      default_owner = Airflow
      default_cpus = 1
      default_ram = 512
      default_disk = 512
      default_gpus = 0
      
      [webserver]
      # The base url of your website as airflow cannot guess what domain or
      # cname you are using. This is used in automated emails that
      # airflow sends to point links to the right web server
      base_url = http://airflow-1.example.com:8080
      # The ip specified when starting the web server
      web_server_host = 0.0.0.0
      # The port on which to run the web server
      web_server_port = 8080
      # Paths to the SSL certificate and key for the web server. When both are
      # provided SSL will be enabled. This does not change the web server port.
      #web_server_ssl_cert = /etc/ssl/certs/example.com.crt
      #web_server_ssl_key = /etc/ssl//etc/ssl/private/example.com.key
      # Number of seconds the gunicorn webserver waits before timing out on a worker
      web_server_worker_timeout = 600
      web_server_master_timeout = 600
      # Number of workers to refresh at a time. When set to 0, worker refresh is
      # disabled. When nonzero, airflow periodically refreshes webserver workers by
      # bringing up new ones and killing old ones.
      worker_refresh_batch_size = 0
      # Number of seconds to wait before refreshing a batch of workers.
      worker_refresh_interval = 30
      # Secret key used to run your flask app
      secret_key = xxx
      # Number of workers to run the Gunicorn web server
      workers = 4
      # The worker class gunicorn should use. Choices include
      # sync (default), eventlet, gevent
      worker_class = sync
      # Log files for the gunicorn webserver. '-' means log to stderr.
      access_logfile = -
      error_logfile = -
      rbac = True
      # Expose the configuration file in the web server
      expose_config = False
      # Set to true to turn on authentication:
      # http://pythonhosted.org/airflow/security.html
      #web-authentication
      authenticate = True
      auth_backend = airflow.contrib.auth.backends.password_auth
      # Filter the list of dags by owner name (requires authentication to be enabled)
      filter_by_owner = False
      # Filtering mode. Choices include user (default) and ldapgroup.
      # Ldap group filtering requires using the ldap backend
      #
      # Note that the ldap server needs the "memberOf" overlay to be set up
      # in order to user the ldapgroup mode.
      owner_mode = user
      # Default DAG orientation. Valid values are:
      # LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top)
      dag_orientation = LR
      # Puts the webserver in demonstration mode; blurs the names of Operators for
      # privacy.
      demo_mode = False
      # The amount of time (in secs) webserver will wait for initial handshake
      # while fetching logs from other worker machine
      log_fetch_timeout_sec = 5
      # By default, the webserver shows paused DAGs. Flip this to hide paused
      # DAGs by default
      hide_paused_dags_by_default = False
      
      [email]
      email_backend = airflow.utils.email.send_email_smtp[smtp]
      # If you want airflow to send emails on retries, failure, and you want to use
      # the airflow.utils.email.send_email_smtp function, you have to configure an
      # smtp server here
      smtp_host = localhost
      smtp_starttls = True
      smtp_ssl = False
      # Uncomment and set the user/pass settings if you want to use SMTP AUTH
      smtp_user = airflow
      smtp_password = xxx
      smtp_port = 587
      smtp_mail_from = airflow@airflow-1.example.com
      
      [schedduler]
      # Task instances listen for external kill signal (when you clear tasks
      # from the CLI or the UI), this defines the frequency at which they should
      # listen (in seconds).
      job_heartbeat_sec = 5
      # The scheduler constantly tries to trigger new tasks (look at the
      # scheduler section in the docs for more information). This defines
      # how often the scheduler should run (in seconds).
      scheduler_heartbeat_sec = 5
      # after how much time should the scheduler terminate in seconds
      # -1 indicates to run continuously (see also num_runs)
      run_duration = -1
      # after how much time a new DAGs should be picked up from the filesystem
      min_file_process_interval = 0
      dag_dir_list_interval = 300
      # How often should stats be printed to the logs
      print_stats_interval = 30
      child_process_log_directory = /var/log/airflow/scheduler
      # Local task jobs periodically heartbeat to the DB. If the job has
      # not heartbeat in this many seconds, the scheduler will mark the
      # associated task instance as failed and will re-schedule the task.
      scheduler_zombie_task_threshold = 300
      # Turn off scheduler catchup by setting this to False.
      # Default behavior is unchanged and
      # Command Line Backfills still work, but the scheduler
      # will not do scheduler catchup if this is False,
      # however it can be set on a per DAG basis in the
      # DAG definition (catchup)
      catchup_by_default = True
      # The scheduler can run multiple threads in parallel to schedule dags.
      # This defines how many threads will run. However airflow will never
      # use more threads than the amount of cpu cores available.
      max_threads = 2
      authenticate = False
      
      [admin]
      # UI to hide sensitive variable fields when set to True
      hide_sensitive_variable_fields = True
      

       

       

       

       

       

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              mcinquin Mathieu Cinquin
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: