Uploaded image for project: 'Apache Airflow'
  1. Apache Airflow
  2. AIRFLOW-7109

HiveServer2Hook and pyHive do not handle timeout correctly

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.10.6
    • Fix Version/s: None
    • Component/s: hooks, utils
    • Labels:
      None
    • Environment:
      CentOS 7, python 3.6.7, Airflow 1.10.6, HDP 3.1.4

      Description

      Dear experts,

      using the 1.10.6 vanilla "HiveServer2Hook" from airflow/hooks/hive_hooks in a custom-operator, we observe the improper handling of timeouts configured on task-level using "execution_timeout".

      The picture is always the same:

      1. The hook is instantiated, the connection opend and the query submitted
      2. The task-timeout is reached but the query is completed nonetheless (confirmed by looking into YARN, job is flagged as "finished", i.e. successful)
      3. Although the query has been successful (hive-wise), no result is obtained and processed but an obscure codec-error is raised, making the task fail. The codec-error is based on the actual AirflowTimeoutException not being properly propagated.

      The persistent codec-error (including its stacktrace) is given below.

      We investigated the issue further by slightly modifiying the vanilla version of the HiveServer2Hook and adding some debugging output to various modules.

      • We made the "pyhive.hive.connect"-object a class-variable (instead of having it only exist in the scope of the _get_results-classmethod (vanilla)
      • We added a "kill()"-method that invokes "connection.close()" on said class-variable and call the method upon execution of "on_kill()" in our custom-operator.
      • We added debugging (i.e. logging) output to the airflow/models/taskinstance.py "run_raw_task"-method and the "airflow/utils/timeout.py" class's "enter()" and "exit_()"-methods.

      Our findings, based on the attached log-files (and the modified core-module-files as well as the modified hook, the custom-operator and the DAG) is the following:

      • In order to invoke the "on_kill()"-method of any operator upon timeout of the task,
        the "timeout.py"-class used to execute the "task_copy" from ("taskinstance.py, "_run_raw_task")  needs to exit with an "AirflowTimeoutException" (and nothing else!)
      • The AirflowTimeoutException is produced but not at the point in runtime at which the timeout has actually been reached but only at the end of the pyHive-execution of the query.
      • Upward propagation of the AirflowTimeoutException then fails with the obscure codec-error mentioned earlier and thus no "on_kill()"- and "kill()"-methods are invoked. Nota bena: this would not be necessary as the task does not need actual killing anymore as it converged anyhow.

      In out opinion this leads to two aspects to be studied, understood and fixed, potentially in pyHive though.

      • The execution of the pyHive query needs to be killed upon timeout
      • The propagation of the exception needs be performed properly

      NB: closing the pyhive.hive.connection-object upon invocation of "on_kill()" does work properly if the task is killed manually, e.g. by setting it to "failed". In this case, "on_kill()" is invoked directly instead of waiting for the AirflowTimeoutException to pop up.

      NB: things work perfectly using the "HiverCLIHook" which, under the hood, uses beeline executed from a python subprocess-call which is terminated (killed after 60'') upon invocation of "on_kill" in e.g. the vanilla HiveOperator. The log attached as "vanilla_HiverOperator_task.log" shows how the behaviour of the vanilla HiverOpator performing the same query. In principle, similarly flawless behaviour is expected when using the HiveServer2Hook to submit the query ...

       

      Your input on this matter and any suggestions are highly appreciated. The "HiveServer2Hook" is, in our opinion, an irreplacable tool when in need of query-results for postprocessing.

       

      Cheers

       Oliver

      {{[2020-03-20 14:45:42,982] {taskinstance.py:1060} ERROR - 'utf-8' codec can't decode byte 0xdb in position 0: invalid continuation byte
      Traceback (most recent call last):
      File "/home/airflow/dags/src/vipa_import/plugins/hooks/dev_hive_hook.py", line 33, in _get_results
      cur.execute(statement)
      File "/home/airflow/airflow_venv/lib/python3.6/site-packages/pyhive/hive.py", line 364, in execute
      response = self._connection.client.ExecuteStatement(req)
      File "/home/airflow/airflow_venv/lib/python3.6/site-packages/TCLIService/TCLIService.py", line 280, in ExecuteStatement
      return self.recv_ExecuteStatement()
      File "/home/airflow/airflow_venv/lib/python3.6/site-packages/TCLIService/TCLIService.py", line 292, in recv_ExecuteStatement
      (fname, mtype, rseqid) = iprot.readMessageBegin()
      File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/protocol/TBinaryProtocol.py", line 134, in readMessageBegin
      sz = self.readI32()
      File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/protocol/TBinaryProtocol.py", line 217, in readI32
      buff = self.trans.readAll(4)
      File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TTransport.py", line 60, in readAll
      chunk = self.read(sz - have)
      File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift_sasl/_init_.py", line 166, in read
      self._read_frame()
      File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift_sasl/_init_.py", line 170, in _read_frame
      header = self._trans.readAll(4)
      File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TTransport.py", line 60, in readAll
      chunk = self.read(sz - have)
      File "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TSocket.py", line 117, in read
      buff = self.handle.recv(sz)
      File "/home/airflow/airflow_venv/lib/python3.6/site-packages/airflow/utils/timeout.py", line 43, in handle_timeout
      raise AirflowTaskTimeout(self.error_message)
      airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 17089}}

        Attachments

        1. dag.py
          2 kB
          Oliver Ricken
        2. dev_hive_operator.py
          3 kB
          Oliver Ricken
        3. dev_hive_hook.py
          3 kB
          Oliver Ricken
        4. timeout.py
          2 kB
          Oliver Ricken
        5. modified_HiveServer2Hook_task.log
          31 kB
          Oliver Ricken
        6. taskinstance.py
          53 kB
          Oliver Ricken
        7. vanilla_HiverOperator_task.log
          31 kB
          Oliver Ricken

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              oricken Oliver Ricken
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated: