Uploaded image for project: 'Apache Airflow'
  1. Apache Airflow
  2. AIRFLOW-4065

misc security fixes and custom spark poll

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Invalid
    • Affects Version/s: 1.10.2, 1.10.3
    • Fix Version/s: None
    • Component/s: security
    • Labels:
      None

      Description

      1.

      www/app.py Add Click jacking defence

       

      Fix:

      at the end of     

      def create_app(config=None, testing=False):

      @app.after_request
      def apply_caching(response):
      response.headers["X-Frame-Options"] = "DENY"
      return response

       

      2.

      www/app.py Add WebUI login timeout of 15 minutes

       

      Fix:

      at the end of     

      def create_app(config=None, testing=False):

      @app.before_request
      def before_request():
      flask.session.permanent = True
      app.permanent_session_lifetime = datetime.timedelta(minutes=15)
      flask.session.modified = True
      flask.g.user = flask_login.current_user

       

      3.

      www/views.py Add Cross Site Scripting defence

      BEFORE

      return self.render(
      'airflow/dags.html',
      webserver_dags=webserver_dags_filtered,
      orm_dags=orm_dags,
      hide_paused=hide_paused,
      current_page=current_page,
      search_query=arg_search_query if arg_search_query else '',
      page_size=dags_per_page,
      num_of_pages=num_of_pages,
      num_dag_from=start + 1,
      num_dag_to=min(end, num_of_all_dags),
      num_of_all_dags=num_of_all_dags,
      paging=wwwutils.generate_pages(current_page, num_of_pages,
      search=arg_search_query,
      showPaused=not hide_paused),
      dag_ids_in_page=page_dag_ids,
      auto_complete_data=auto_complete_data)

       

      AFTER

      return self.render(
      'airflow/dags.html',
      webserver_dags=webserver_dags_filtered,
      orm_dags=orm_dags,
      hide_paused=hide_paused,
      current_page=current_page,
      search_query=arg_search_query if arg_search_query else '',
      page_size=dags_per_page,
      num_of_pages=num_of_pages,
      num_dag_from=start + 1,
      num_dag_to=min(end, num_of_all_dags),
      num_of_all_dags=num_of_all_dags,
      paging=wwwutils.generate_pages(current_page, num_of_pages,
      search=escape(arg_search_query) if arg_search_query else None,
      showPaused=not hide_paused),
      dag_ids_in_page=page_dag_ids,
      auto_complete_data=auto_complete_data)

       

      4.

      contrib/hooks/spark_submit_hook.py Poll spark server at a custom interval instead of every second

       

      BEFORE

      1. Sleep for 1 second as we do not want to spam the cluster
        time.sleep(1)

       

      AFTER

      import airflow
      from airflow import configuration as conf

      Sleep for n second as we do not want to spam the cluster
      _poll_interval = conf.getint('sparksubmit', 'poll_interval')
      time.sleep(_poll_interval)

       

      5. DOCO only. Securing connection to mysql backend metastore.

      At the end of sql_alchemy_conn line in airflow.cfg add    ?ssl_ca=<PEMCERTFORMYSQL.pem>

       

      6.

      contrib/hooks/spark_submit_hook.py    

      Mask passwords in spark submit cmd AND error stacktrace

       

      add

      def _mask_cmd(self, connection_cmd):

      1. Mask any password related fields in application args with key value pair where key contains password (case insensitive), e.g. HivePassword='abc'
        connection_cmd_masked = re.sub(r"(\S*?(?:secret|password)\S*?\s*=\s*')[^'](?=')", r'\1*****', ' '.join(connection_cmd), flags=re.I)

      return connection_cmd_masked

       

      BEFORE

      self.log.info("Spark-Submit cmd: %s", connection_cmd)

       

      AFTER

      self.log.info("Spark-Submit cmd: %s", self._mask_cmd(connection_cmd))

       

      BEFORE

       if returncode or (self._is_kubernetes and self._spark_exit_code != 0):
      raise AirflowException(
      "Cannot execute: {}. Error code is: {}.".format(
      spark_submit_cmd, returncode
      )
      )

       

      AFTER

      if returncode or (self._is_kubernetes and self._spark_exit_code != 0):
      raise AirflowException(
      "Cannot execute: {}. Error code is: {}.".format(
      self._mask_cmd(spark_submit_cmd), returncode
      )
      )

       

      7. cli.py is currently printing logs with password if you use cli to add connection with conn_password.

      example log is being printed (this is issue if you have a auto-logforwarder like splunk)

      Successfully added `conn_id`=query_hive : hive_cli://user:cleartextpassw@host:10000/default

       

      relevant code doing the printing:

      with db.create_session() as session:
      if not (session.query(Connection)
      .filter(Connection.conn_id == new_conn.conn_id).first()):
      session.add(new_conn)
      msg = '\n\tSuccessfully added `conn_id`={conn_id} : {uri}\n'
      msg = msg.format(conn_id=new_conn.conn_id,
      uri=args.conn_uri or
      urlunparse((args.conn_type,
      '{login}:{password}@{host}:{port}'
      .format(login=args.conn_login or '',
      password=args.conn_password or '',
      host=args.conn_host or '',
      port=args.conn_port or ''),
      args.conn_schema or '', '', '', '')))

       

      8. below should be 'airflow.api.auth.backend.deny_all' by default:

      [api]
       
      1. How to authenticate users of the API
        auth_backend = airflow.api.auth.backend.default

       

      9. defaults in examples should be 1970 to allow historical DAGruns before 2015 as can be confusing

      instead of 'start_date': datetime(2015, 6, 1),

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              toopt4 t oo
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: