Uploaded image for project: 'Apache Airflow'
  1. Apache Airflow
  2. AIRFLOW-2219

Race condition to DagRun.verify_integrity between Scheduler and Webserver

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Trivial
    • Resolution: Unresolved
    • Affects Version/s: 1.8.1, 1.9.0
    • Fix Version/s: None
    • Component/s: database
    • Labels:
      None

      Description

      Symptoms:

      • Triggering dag causes the 404 nuke page with an error message along the lines of: psycopg2.IntegrityError: duplicate key value violates unique constraint "task_instance_pkey" when calling DagRun.verify_integrity

      Or

      • Similar error in scheduler log for dag file when scheduling a DAG. (Example exception at the end of description)

      This occurs because Dag.create_dagrun commits a the dag_run entry to the database and then runs verify_integrity to add the task_instances immediately. However, the scheduler already picks up a dag run before all task_instances are created and also calls verify_integrity to create task_instances at the same time.

      I don't think this actually breaks anything in particular. The exception happens either on the webpage or in the scheduler logs:

      • If it occurs in the UI, it just scares people thinking something broke but the task_instances will be created by the scheduler.
      • If the error shows up in the scheduler, the task_instances are created by the webserver and it continues processing the DAG during the next loop.

       
      I'm not sure if DagRun.verify_integrity is necessary for both SchedulerJob._process_task_instances as well Dag.create_dagrun but perhaps we can just stick to one?

       

      Traceback (most recent call last):
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1170, in _execute_context
          context)
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 683, in do_executemany
          cursor.executemany(statement, parameters)
      psycopg2.IntegrityError: duplicate key value violates unique constraint "task_instance_pkey"
      DETAIL:  Key (task_id, dag_id, execution_date)=(docker_task_10240_7680_0, chunkedgraph_edgetask_scheduler, 2018-03-15 23:46:57.116673) already exists.
      The above exception was the direct cause of the following exception:
      Traceback (most recent call last):
        File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 371, in helper
          pickle_dags)
        File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
          result = func(*args, **kwargs)
        File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1792, in process_file
          self._process_dags(dagbag, dags, ti_keys_to_schedule)
        File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1391, in _process_dags
          self._process_task_instances(dag, tis_out)
        File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 915, in _process_task_instances
          run.verify_integrity(session=session)
        File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
          result = func(*args, **kwargs)
        File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 4786, in verify_integrity
          session.commit()
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 943, in commit
          self.transaction.commit()
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 467, in commit
          self._prepare_impl()
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 447, in _prepare_impl
          self.session.flush()
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2254, in flush
          self._flush(objects)
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2380, in _flush
          transaction.rollback(_capture_exception=True)
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 66, in __exit__
          compat.reraise(exc_type, exc_value, exc_tb)
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 187, in reraise
          raise value
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2344, in _flush
          flush_context.execute()
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 391, in execute
          rec.execute(self)
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 556, in execute
          uow
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 181, in save_obj
          mapper, table, insert)
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 830, in _emit_insert_statements
          execute(statement, multiparams)
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 948, in execute
          return meth(self, multiparams, params)
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 269, in _execute_on_connection
          return connection._execute_clauseelement(self, multiparams, params)
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1060, in _execute_clauseelement
          compiled_sql, distilled_params
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1200, in _execute_context
          context)
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1413, in _handle_dbapi_exception
          exc_info
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
          reraise(type(exception), exception, tb=exc_tb, cause=cause)
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 186, in reraise
          raise value.with_traceback(tb)
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1170, in _execute_context
          context)
        File "/usr/local/lib/python3.6/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 683, in do_executemany
          cursor.executemany(statement, parameters)
      
      DETAIL: Key (task_id, dag_id, execution_date)=(docker_task_10240_7680_0, chunkedgraph_edgetask_scheduler, 2018-03-15 23:46:57.116673) already exists.
      

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              wwong Will Wong
            • Votes:
              3 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated: