Uploaded image for project: 'Apache Airflow'
  1. Apache Airflow
  2. AIRFLOW-6440

AWS Fargate Executor (AIP-29) (WIP)



    • Type: Improvement
    • Status: In Progress
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: 1.10.8
    • Fix Version/s: None
    • Component/s: aws, executors
    • Environment:
      AWS Cloud



      AIP - https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-29%3A+AWS+Fargate+Executor

      PR - https://github.com/apache/airflow/pull/7030

      Airflow on AWS Fargate

      We propose the creation of a new Airflow Executor, called the FargateExecutor, that runs tasks asynchronously on AWS Fargate. The Airflow Scheduler comes up with a command that needs to be executed in some shell. A Docker container parameterized with the command is passed in as an ARG, and AWS Fargate provisions a new instance with . The container then completes or fails the job, causing the container to die along with the Fargate instance. The executor is responsible for keeping track what happened to the task with an airflow task id and AWS ARN number, and based off of the instance exit code we either say that the task succeeded or failed.

      Proposed Implementation

      As you could probably deduce, the underlying mechanism to launch, track, and stop Fargate instances is AWS' Boto3 Library.

      To accomplish this we create a FargateExecutor under the "airflow.executors" module. This class will extend from BaseExecutor and override 5 methods: start(), sync(), execute_async(), end(), and terminate(). Internally, the FargateExecutor uses boto3 for monitoring and deployment purposes.

      The three major Boto3 API calls are:

      • The execute_async() function calls boto3's run_task() function.
        * The {{sync()
        }} function calls boto3's describe_tasks() function.
        * The {{terminate()
        }} function calls boto3's stop_task() function.

        h1. Maintenance

        The executor itself is nothing special since it mostly relies on overriding the proper methods from .

        In general, AWS is fairly committed to keeping their APIs in service. Fargate is rather new and I've personally perceived a lot more features added as optional parameters over the course of the past year. However, the required parameters for the three Boto3 calls that are used have remained the same. I've also written test-cases that ensures that the Boto3 calls made are complaint to the most current version of their APIs.

        We've also introduced a callback hook (very similar to the Celery Executor) that allows users to launch tasks with their own parameters. Therefore if a user doesn't like the default parameter options used in Boto3's {{run_task(),}}then they can call it themselves with whatever parameters they want. This means that Airflow doesn't have to add a new configuration everytime AWS makes an addition to AWS Fargate. It's just one configuration to cover them all.
        h1. Proposed Configuration


      # For more information on any of these execution parameters, see the link below:
      # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.run_task
      # For boto3 credential management, see
      # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
      # Name of region
      region = us-west-2
      # Name of cluster
      cluster = test-airflow
      # Name of task definition with a bootable-container. Note that this container will receive an airflow CLI
      # command as an additional parameter to its entrypoint. It's job is to boot-up and run this command
      task_definition = test-airflow-worker
      # name of registered container within your AWS cluster
      container_name = airflow-worker
      # security group ids for task to run in (comma-separated)
      security_groups = sg-xxxxxxxxxxxxxx
      # Subnets for task to run in.
      subnets = subnet-yyyyyyyyyy,subnet-zzzzzzzzz
      # FARGATE platform version. Defaults to Latest.
      platform_version = LATEST
      # Launch type can either be 'FARGATE' OR 'ECS'. Defaults to Fargate.
      launch_type = FARGATE
      # Assign public ip can either be 'ENABLED' or 'DISABLED'.  Defaults to 'ENABLED'.
      assign_public_ip = DISABLED
      # This is a function which returns a function. The outer function takes no arguments, and returns the inner function.
      # The inner function takes in an airflow CLI command an outputs a json compatible with the boto3 run_task API
      # linked above. In other words, if you don't like the way I call the fargate API then call it yourself
      execution_config_function = airflow.executors.fargate_executor.default_task_id_to_fargate_options_function





          Issue Links



              • Assignee:
                ahmed.elzeiny Ahmed Elzeiny
                ahmed.elzeiny Ahmed Elzeiny
              • Votes:
                1 Vote for this issue
                6 Start watching this issue


                • Created:

                  Time Tracking

                  Original Estimate - 336h
                  Remaining Estimate - 336h
                  Time Spent - Not Specified
                  Not Specified