Uploaded image for project: 'Apache Airflow'
  1. Apache Airflow
  2. AIRFLOW-6602

Make "executor_config" templated field to support dynamic parameters for kubernetes executor

    XMLWordPrintableJSON

    Details

    • Type: New Feature
    • Status: In Progress
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.10.7
    • Fix Version/s: None
    • Labels:
      None

      Description

      When running airflow with Kubernetes Executor, one specifies the configurations through 

      "executor_config". At the moment, this field is not templated, meaning that we won't be able to have dynamic parameters. So I did an experiment that I created MyPythonOperator which inherits PythonOperator but with with "executor_config" added to template_fields. However, the result shows that this change itself isn't enough, because airflow first creates a Pod based on executor_config without rendering it, and then run the task inside the pod (the running will trigger the Jinja template rendering)

      See an example config below showing a use case where one can mount dynamic "subPath" to the image

       

      executor_config = {
          "KubernetesExecutor": {
              "image": "some_image",
              "request_memory": "2Gi",
              'request_cpu': '1',
              "volumes": [
                  {
                      "name": "data",
                      "persistentVolumeClaim": {"claimName": "some_claim_name"},
                  },
              ],
              "volume_mounts": [
                  {
                      "mountPath": "/code",
                      "name": "data",
                      "subPath": "/code/{{ dag_run.conf['branch_name'] }}"
                  },
              ]
          }
      }
      

       

       

       

      I have then did a further experiment that in 

      trigger_tasks() from airflow/executors/base_executor.py, right before execute_async() is called, I called simple_ti.render_templates() which will trigger the rendering, so the kubernetes_executor.execute_async() will pick up the resolved parameters

       

      # current behavior
      for i in range(min((open_slots, len(self.queued_tasks)))):
          key, (command, _, queue, simple_ti) = sorted_queue.pop(0)
          self.queued_tasks.pop(key)
          self.running[key] = command
          self.execute_async(key=key,
                             command=command,
                             queue=queue,
                             executor_config=simple_ti.executor_config)
      

       

       

      # Proposed new behavior:
      
      for i in range(min((open_slots, len(self.queued_tasks)))):
          key, (command, _, queue, simple_ti) = sorted_queue.pop(0)
          self.queued_tasks.pop(key)
          self.running[key] = command
          simple_ti.render_templates()  # render it
          self.execute_async(key=key,
                             command=command,
                             queue=queue,
                             executor_config=simple_ti.executor_config)
      

       

       

      I think this is a very useful feature to include into Airflow, especially for implementing CI/CD pipelines where we can mount dynamic volume and/or subPath to the image and this will open up a lot of other use-cases

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                jx.junxie@gmail.com Jun Xie
                Reporter:
                jx.junxie@gmail.com Jun Xie
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated: