Uploaded image for project: 'Apache Airflow'
  1. Apache Airflow
  2. AIRFLOW-6785

DagBag tries to run hook inside SubDagOperator

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.10.4, 1.10.5, 1.10.6, 1.10.7, 1.10.8, 1.10.9
    • Fix Version/s: None
    • Component/s: DAG, hooks, operators
    • Labels:
      None

      Description

      The following worked in 1.10.1, but not in 1.10.9. It seems that the DagBag tries to execute the query inside the SubdagOperator, which tries to connect to the database.

      Assuming:

      AIRFLOW_CONN_POSTGRES_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
      

       

      import unittest
      
      from airflow.models import DagBag
      
      
      class TestDags(unittest.TestCase):
          """
          Generic tests that all DAGs in the repository should be able to pass.
          """
          LOAD_SECOND_THRESHOLD = 2
      
          def setUp(self):
              self.dagbag = DagBag()
      
          def test_dagbag_import(self):
              """
              Verify that Airflow will be able to import all DAGs in the repository.
              """
              self.assertFalse(
                  len(self.dagbag.import_errors),
                  'There should be no DAG failures. Got: {}'.format(
                      self.dagbag.import_errors
                  )
              )
      
      

       

       

      from datetime import datetime, timedelta
      
      from airflow import DAG
      from airflow.hooks.postgres_hook import PostgresHook
      from airflow.operators.dummy_operator import DummyOperator
      from airflow.operators.python_operator import PythonOperator
      from airflow.operators.subdag_operator import SubDagOperator
      
      default_args = {
          "owner": "airflow",
          "depends_on_past": False,
          "start_date": datetime(2020, 2, 2),
          "email": ["airflow@airflow.com"],
          "email_on_failure": False,
          "email_on_retry": False,
          "retries": 1,
          "retry_delay": timedelta(minutes=5),
      }
      
      
      def get_data(**kwargs):
          df = PostgresHook(
              postgres_conn_id=kwargs['postgres_conn_id']
          ).get_pandas_df("select 1;")
          return df
      
      
      def subdag(parent_dag_name, child_dag_name, args):
      
          dag_subdag = DAG(
              dag_id='%s.%s' % (parent_dag_name, child_dag_name),
              default_args=args,
              schedule_interval=None,
          )
      
          run_query = PythonOperator(
              task_id=f'get_data_sub',
              python_callable=get_data,
              op_kwargs={
                  'postgres_conn_id': 'postgres_conn'
              },
              provide_context=True,
              dag=dag_subdag
          )
      
          return dag_subdag
      
      
      dag = DAG("test-hook-sub", default_args=default_args, schedule_interval=None)
      
      start = DummyOperator(
          task_id='kick_off',
          dag=dag
      )
      
      section_1 = SubDagOperator(
          task_id='section-1',
          subdag=subdag("test-hook-sub", 'section-1', default_args),
          dag=dag,
      )
      
      start >> section_1
      

      Error:

      ../lib/python3.6/site-packages/airflow/utils/db.py:74: in wrapper
          return func(*args, **kwargs)
      ../lib/python3.6/site-packages/airflow/utils/decorators.py:98: in wrapper
          result = func(*args, **kwargs)
      ../lib/python3.6/site-packages/airflow/operators/subdag_operator.py:77: in __init__
          .filter(Pool.pool == self.pool)
      ../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3287: in first
          ret = list(self[0:1])
      ../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3065: in __getitem__
          return list(res)
      ../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3389: in __iter__
          return self._execute_and_instances(context)
      ../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3411: in _execute_and_instances
          querycontext, self._connection_from_session, close_with_result=True
      ../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3426: in _get_bind_args
          mapper=self._bind_mapper(), clause=querycontext.statement, **kw
      ../lib/python3.6/site-packages/sqlalchemy/orm/query.py:3404: in _connection_from_session
          conn = self.session.connection(**kw)
      ../lib/python3.6/site-packages/sqlalchemy/orm/session.py:1133: in connection
          execution_options=execution_options,
      ../lib/python3.6/site-packages/sqlalchemy/orm/session.py:1139: in _connection_for_bind
          engine, execution_options
      ../lib/python3.6/site-packages/sqlalchemy/orm/session.py:432: in _connection_for_bind
          conn = bind._contextual_connect()
      ../lib/python3.6/site-packages/sqlalchemy/engine/base.py:2242: in _contextual_connect
          self._wrap_pool_connect(self.pool.connect, None),
      ../lib/python3.6/site-packages/sqlalchemy/engine/base.py:2280: in _wrap_pool_connect
          e, dialect, self
      ../lib/python3.6/site-packages/sqlalchemy/engine/base.py:1547: in _handle_dbapi_exception_noconnection
          util.raise_from_cause(sqlalchemy_exception, exc_info)
      ../lib/python3.6/site-packages/sqlalchemy/util/compat.py:398: in raise_from_cause
          reraise(type(exception), exception, tb=exc_tb, cause=cause)
      ../lib/python3.6/site-packages/sqlalchemy/util/compat.py:152: in reraise
          raise value.with_traceback(tb)
      ../lib/python3.6/site-packages/sqlalchemy/engine/base.py:2276: in _wrap_pool_connect
          return fn()
      ../lib/python3.6/site-packages/sqlalchemy/pool/base.py:363: in connect
          return _ConnectionFairy._checkout(self)
      ../lib/python3.6/site-packages/sqlalchemy/pool/base.py:773: in _checkout
          fairy = _ConnectionRecord.checkout(pool)
      ../lib/python3.6/site-packages/sqlalchemy/pool/base.py:492: in checkout
          rec = pool._do_get()
      ../lib/python3.6/site-packages/sqlalchemy/pool/impl.py:139: in _do_get
          self._dec_overflow()
      ../lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py:68: in __exit__
          compat.reraise(exc_type, exc_value, exc_tb)
      ../lib/python3.6/site-packages/sqlalchemy/util/compat.py:153: in reraise
          raise value
      ../lib/python3.6/site-packages/sqlalchemy/pool/impl.py:136: in _do_get
          return self._create_connection()
      ../lib/python3.6/site-packages/sqlalchemy/pool/base.py:308: in _create_connection
          return _ConnectionRecord(self)
      ../lib/python3.6/site-packages/sqlalchemy/pool/base.py:437: in __init__
          self.__connect(first_connect_check=True)
      ../lib/python3.6/site-packages/sqlalchemy/pool/base.py:652: in __connect
          connection = pool._invoke_creator(self)
      ../lib/python3.6/site-packages/sqlalchemy/engine/strategies.py:114: in connect
          return dialect.connect(*cargs, **cparams)
      ../lib/python3.6/site-packages/sqlalchemy/engine/default.py:489: in connect
          return self.dbapi.connect(*cargs, **cparams)
      ../lib/python3.6/site-packages/psycopg2/__init__.py:126: in connect
          conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
      E   sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) could not translate host name "postgres" to address: Name or service not known
      
      

       

      However the non-subdag version passes the test:

      from datetime import datetime, timedelta
      
      from airflow import DAG
      from airflow.hooks.postgres_hook import PostgresHook
      from airflow.operators.dummy_operator import DummyOperator
      from airflow.operators.python_operator import PythonOperator
      
      default_args = {
          "owner": "airflow",
          "depends_on_past": False,
          "start_date": datetime(2020, 2, 2),
          "email": ["airflow@airflow.com"],
          "email_on_failure": False,
          "email_on_retry": False,
          "retries": 1,
          "retry_delay": timedelta(minutes=5),
      }
      
      
      def get_data(**kwargs):
          """
          Returns DB data as a Pandas DataFrame
          """
          df = PostgresHook(
              postgres_conn_id=kwargs['postgres_conn_id']
          ).get_pandas_df("select 1;")
          return df
      
      
      dag = DAG("test-hook", default_args=default_args, schedule_interval=None)
      
      start = DummyOperator(
          task_id='kick_off',
          dag=dag
      )
      
      
      run_query = PythonOperator(
          task_id=f'get_data',
          python_callable=get_data,
          op_kwargs={
              'postgres_conn_id': 'postgres_conn'
          },
          provide_context=True,
          dag=dag
      )
      
      start >> run_query
      
      

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              darwinyip Darwin Yip
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated: