Uploaded image for project: 'Apache Airflow'
  1. Apache Airflow
  2. AIRFLOW-6202

sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback()

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Cannot Reproduce
    • Affects Version/s: 1.9.0
    • Fix Version/s: None
    • Component/s: database
    • Labels:
      None
    • Environment:
      airflow1.9

      Description

      Our airflow1.9 is deployed in docker with conda(python2.7) and the excutor model is celery and the database is AWS rds mysql 5.7. We got and error:

      [2019-12-09 02:23:40,831] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
      [2019-12-09 02:23:40,832] {base_task_runner.py:98} INFO - Subtask: File "/opt/conda/bin/airflow", line 7, in <module>
      [2019-12-09 02:23:40,832] {base_task_runner.py:98} INFO - Subtask: exec(compile(f.read(), file, 'exec'))
      [2019-12-09 02:23:40,832] {base_task_runner.py:98} INFO - Subtask: File "/src/airflow/airflow/bin/airflow", line 27, in <module>
      [2019-12-09 02:23:40,833] {base_task_runner.py:98} INFO - Subtask: args.func(args)
      [2019-12-09 02:23:40,833] {base_task_runner.py:98} INFO - Subtask: File "/src/airflow/airflow/bin/cli.py", line 392, in run
      [2019-12-09 02:23:40,835] {base_task_runner.py:98} INFO - Subtask: pool=args.pool,
      [2019-12-09 02:23:40,835] {base_task_runner.py:98} INFO - Subtask: File "/src/airflow/airflow/utils/db.py", line 50, in wrapper
      [2019-12-09 02:23:40,836] {base_task_runner.py:98} INFO - Subtask: result = func(*args, **kwargs)
      [2019-12-09 02:23:40,836] {base_task_runner.py:98} INFO - Subtask: File "/src/airflow/airflow/models.py", line 1533, in _run_raw_task
      [2019-12-09 02:23:40,840] {base_task_runner.py:98} INFO - Subtask: self.handle_failure(e, test_mode, context)
      [2019-12-09 02:23:40,840] {base_task_runner.py:98} INFO - Subtask: File "/src/airflow/airflow/models.py", line 1642, in handle_failure
      [2019-12-09 02:23:40,840] {base_task_runner.py:98} INFO - Subtask: session.merge(self)
      [2019-12-09 02:23:40,841] {base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2093, in merge
      [2019-12-09 02:23:40,841] {base_task_runner.py:98} INFO - Subtask: _resolve_conflict_map=_resolve_conflict_map,
      [2019-12-09 02:23:40,841] {base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2166, in _merge
      [2019-12-09 02:23:40,842] {base_task_runner.py:98} INFO - Subtask: merged = self.query(mapper.class_).get(key[1])
      [2019-12-09 02:23:40,842] {base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 959, in get
      [2019-12-09 02:23:40,843] {base_task_runner.py:98} INFO - Subtask: return self._get_impl(ident, loading.load_on_pk_identity)
      [2019-12-09 02:23:40,843] {base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 1069, in _get_impl
      [2019-12-09 02:23:40,844] {base_task_runner.py:98} INFO - Subtask: return db_load_fn(self, primary_key_identity)
      [2019-12-09 02:23:40,844] {base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 282, in load_on_pk_identity
      [2019-12-09 02:23:40,845] {base_task_runner.py:98} INFO - Subtask: return q.one()
      [2019-12-09 02:23:40,845] {base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 3282, in one
      [2019-12-09 02:23:40,846] {base_task_runner.py:98} INFO - Subtask: ret = self.one_or_none()
      [2019-12-09 02:23:40,846] {base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 3251, in one_or_none
      [2019-12-09 02:23:40,846] {base_task_runner.py:98} INFO - Subtask: ret = list(self)
      [2019-12-09 02:23:40,847] {base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 3324, in iter
      [2019-12-09 02:23:40,859] {base_task_runner.py:98} INFO - Subtask: return self._execute_and_instances(context)
      [2019-12-09 02:23:40,860] {base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 3346, in _execute_and_instances
      [2019-12-09 02:23:40,865] {base_task_runner.py:98} INFO - Subtask: querycontext, self._connection_from_session, close_with_result=True
      [2019-12-09 02:23:40,866] {base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 3361, in _get_bind_args
      [2019-12-09 02:23:40,870] {base_task_runner.py:98} INFO - Subtask: mapper=self._bind_mapper(), clause=querycontext.statement, **kw
      [2019-12-09 02:23:40,870] {base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 3339, in _connection_from_session
      [2019-12-09 02:23:40,871] {base_task_runner.py:98} INFO - Subtask: conn = self.session.connection(**kw)
      [2019-12-09 02:23:40,872] {base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1124, in connection
      [2019-12-09 02:23:40,876] {base_task_runner.py:98} INFO - Subtask: execution_options=execution_options,
      [2019-12-09 02:23:40,877] {base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1130, in _connection_for_bind
      [2019-12-09 02:23:40,881] {base_task_runner.py:98} INFO - Subtask: engine, execution_options
      [2019-12-09 02:23:40,882] {base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 408, in _connection_for_bind
      [2019-12-09 02:23:40,883] {base_task_runner.py:98} INFO - Subtask: self._assert_active()
      [2019-12-09 02:23:40,884] {base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 295, in _assert_active
      [2019-12-09 02:23:40,889] {base_task_runner.py:98} INFO - Subtask: code="7s2a",
      [2019-12-09 02:23:40,907] {base_task_runner.py:98} INFO - Subtask: sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
      [2019-12-09 02:23:40,911] {base_task_runner.py:98} INFO - Subtask: [SQL: UPDATE task_instance SET state=%s WHERE task_instance.task_id = %s AND task_instance.dag_id = %s AND task_instance.execution_date = %s]
      [2019-12-09 02:23:40,916] {base_task_runner.py:98} INFO - Subtask: [parameters: (u'queued', 'before_gs_delete', 'prod_Turner-Cartoon_daily_etl_v1.1.9.4.0.prod_Turner-Cartoon_daily_etl_v1.1.9.4.0_ingest_bigquery_subdag', datetime.datetime(2019, 12, 7, 5, 30))]
      [2019-12-09 02:23:40,933] {base_task_runner.py:98} INFO - Subtask: (Background on this error at: http://sqlalche.me/e/e3q8) (Background on this error at: http://sqlalche.me/e/7s2a)

       

      I have no idea to slove this and my configuration like below:

      [core]
      dags_folder = /usr/local/airflow/dags
      base_log_folder = /usr/local/airflow/logs
      remote_log_conn_id =
      encrypt_s3_logs = False
      logging_level = INFO
      logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
      logging_config_class =
      log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
      simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
      executor = CeleryExecutor
      sql_alchemy_conn = mysql://airflow:airflow@xxxx:3306/airflow
      sql_alchemy_pool_size = 5
      sql_alchemy_pool_recycle = 3600
      parallelism = 64
      dag_concurrency = 16
      Are DAGs paused by default at creation
      dags_are_paused_at_creation = True
      non_pooled_task_slot_count = 128
      max_active_runs_per_dag = 16
      load_examples = False
      plugins_folder = /usr/local/airflow/plugins
      fernet_key = 46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
      donot_pickle = False
      dagbag_import_timeout = 30
      task_runner = BashTaskRunner
      default_impersonation =
      security =
      unit_test_mode = False
      task_log_reader = file.task
      enable_xcom_pickling = True
      killed_task_cleanup_time = 60

      [cli]
      api_client = airflow.api.client.local_client
      endpoint_url = http://localhost:8080

      [api]
      auth_backend = airflow.api.auth.backend.default

      [operators]
      default_owner = Airflow
      default_cpus = 1
      default_ram = 512
      default_disk = 512
      default_gpus = 0

      [webserver]
      base_url = http://localhost:8080
      web_server_host = 0.0.0.0
      web_server_ssl_cert =
      web_server_ssl_key =
      web_server_worker_timeout = 120
      worker_refresh_batch_size = 1
      worker_refresh_interval = 30
      secret_key = temporary_key
      workers = 4
      worker_class = sync
      access_logfile = -
      error_logfile = -
      expose_config = True
      authenticate = False
      filter_by_owner = False
      owner_mode = user
      dag_default_view = tree
      dag_orientation = LR
      demo_mode = False
      log_fetch_timeout_sec = 5
      hide_paused_dags_by_default = True
      page_size = 100

      [email]
      email_backend = airflow.utils.email.send_email_smtp

      [smtp]
      smtp_host = xxxx
      smtp_starttls = True
      smtp_ssl = False
      smtp_user = xxxx
      smtp_password=xxxxx
      smtp_port = 25
      smtp_mail_from = xxxx

      [celery]
      celeryd_concurrency = 16
      broker_url = redis://xxxx:6379/1
      flower_host = 0.0.0.0
      flower_port = 5555
      default_queue = default
      celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG

      [dask]
      cluster_address = 127.0.0.1:8786

      [scheduler]
      job_heartbeat_sec = 5
      scheduler_heartbeat_sec = 5
      run_duration = -1
      min_file_process_interval = 0
      dag_dir_list_interval = 300
      print_stats_interval = 30
      child_process_log_directory = /usr/local/airflow/logs/scheduler
      scheduler_zombie_task_threshold = 300
      catchup_by_default = True
      max_tis_per_query = 0
      statsd_on = False
      statsd_host = localhost
      statsd_port = 8125
      statsd_prefix = airflow
      max_threads = 16
      authenticate = False

      [ldap]
      set this to ldaps://<your.ldap.server>:<port>
      uri =
      user_filter = objectClass=*
      user_name_attr = uid
      group_member_attr = memberOf
      superuser_filter =
      data_profiler_filter =
      bind_user = cn=Manager,dc=example,dc=com
      bind_password = insecure
      basedn = dc=example,dc=com
      cacert = /etc/ca/ldap_ca.crt
      search_scope = LEVEL

      [mesos]
      master = localhost:5050
      framework_name = Airflow
      task_cpu = 1
      task_memory = 256
      checkpoint = False
      failover_timeout = 604800
      authenticate = False
      default_principal = admin
      default_secret = admin

      [kerberos]
      ccache = /tmp/airflow_krb5_ccache
      principal = airflow
      reinit_frequency = 3600
      kinit_path = kinit
      keytab = airflow.keytab

      [github_enterprise]
      api_rev = v3

      [admin]
      hide_sensitive_variable_fields = False

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              jackyym yangming
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: