Uploaded image for project: 'Apache Airflow'
  1. Apache Airflow
  2. AIRFLOW-6448

Connection URI Parsing Swaps Service and Scheme Components

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: 1.10.7
    • Fix Version/s: 2.0.0
    • Component/s: configuration, hooks
    • Labels:
      None
    • Environment:
      python:3.7-alpine3.10 Docker image ran on Kubernetes cluster using Kubernetes Executor.

      Description

      This issue was discovered when specifying a Slack connection as a URI via an environment variable.

      Using the following connection definition via environment variable:

      AIRFLOW_CONN_SLACKCONN=https://hooks.slack.com/services?webhook_token=MYAWESOMETOKEN
      

      results in the following error:

      {"asctime": "2020-01-04 01:44:56,748", "filename": "logging_mixin.py", "lineno": 112, "levelname": "INFO", "message": "[2020-01-04 01:44:56,748] {http_hook.py:131} INFO - Sending 'POST' to url: services://hooks.slack.com/MYAWESOMETOKEN", "dag_id": "mydagid", "task_id": "notify", "execution_date": "2020_01_04T01_44_47_230593", "try_number": "1"}
      

      i.e. the final URL generated by Airflow is:

      services://hooks.slack.com/MYAWESOMETOKEN
      

      The issue seems to be in:

      airflow/models/connection.py

      Specifically:

          def parse_from_uri(self, uri):
                      uri_parts = urlparse(uri)
                      conn_type = uri_parts.scheme
                      if conn_type == 'postgresql':
                          conn_type = 'postgres'
                      elif '-' in conn_type:
                          conn_type = conn_type.replace('-', '_')
                      self.conn_type = conn_type
                      self.host = parse_netloc_to_hostname(uri_parts)
                      quoted_schema = uri_parts.path[1:]
                      self.schema = unquote(quoted_schema) if quoted_schema else quoted_schema
                      self.login = unquote(uri_parts.username) \
                          if uri_parts.username else uri_parts.username
                      self.password = unquote(uri_parts.password) \
                          if uri_parts.password else uri_parts.password
                      self.port = uri_parts.port
                      if uri_parts.query:
                          self.extra = json.dumps(dict(parse_qsl(uri_parts.query, keep_blank_values=True)))
      

      A hacky workaround for this particular case is to define AIRFLOW_CONN_SLACKCONN as:

      services://hooks.slack.com/https?webhook_token=/services/MYAWESOMETOKEN
      

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              OneMintJulep Julian Vasilkoski
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:

                Time Tracking

                Estimated:
                Original Estimate - 96h
                96h
                Remaining:
                Remaining Estimate - 96h
                96h
                Logged:
                Time Spent - Not Specified
                Not Specified