-
Type:
Improvement
-
Status: In Progress
-
Priority:
Minor
-
Resolution: Unresolved
-
Affects Version/s: 1.10.8
-
Fix Version/s: None
-
Labels:
-
Environment:AWS Cloud
Links
AIP - https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-29%3A+AWS+Fargate+Executor
PR - https://github.com/apache/airflow/pull/7030
Airflow on AWS Fargate
We propose the creation of a new Airflow Executor, called the FargateExecutor, that runs tasks asynchronously on AWS Fargate. The Airflow Scheduler comes up with a command that needs to be executed in some shell. A Docker container parameterized with the command is passed in as an ARG, and AWS Fargate provisions a new instance with . The container then completes or fails the job, causing the container to die along with the Fargate instance. The executor is responsible for keeping track what happened to the task with an airflow task id and AWS ARN number, and based off of the instance exit code we either say that the task succeeded or failed.
Proposed Implementation
As you could probably deduce, the underlying mechanism to launch, track, and stop Fargate instances is AWS' Boto3 Library.
To accomplish this we create a FargateExecutor under the "airflow.executors" module. This class will extend from BaseExecutor and override 5 methods: start(), sync(), execute_async(), end(), and terminate(). Internally, the FargateExecutor uses boto3 for monitoring and deployment purposes.
The three major Boto3 API calls are:
- The execute_async() function calls boto3's run_task() function.
* The {{sync()}} function calls boto3's describe_tasks() function.
* The {{terminate()}} function calls boto3's stop_task() function.
h1. Maintenance
The executor itself is nothing special since it mostly relies on overriding the proper methods from .
In general, AWS is fairly committed to keeping their APIs in service. Fargate is rather new and I've personally perceived a lot more features added as optional parameters over the course of the past year. However, the required parameters for the three Boto3 calls that are used have remained the same. I've also written test-cases that ensures that the Boto3 calls made are complaint to the most current version of their APIs.
We've also introduced a callback hook (very similar to the Celery Executor) that allows users to launch tasks with their own parameters. Therefore if a user doesn't like the default parameter options used in Boto3's {{run_task(),}}then they can call it themselves with whatever parameters they want. This means that Airflow doesn't have to add a new configuration everytime AWS makes an addition to AWS Fargate. It's just one configuration to cover them all.
h1. Proposed Configuration
[fargate] # For more information on any of these execution parameters, see the link below: # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.run_task # For boto3 credential management, see # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html ### MANDATORY CONFIGS: # Name of region region = us-west-2 # Name of cluster cluster = test-airflow ### EITHER POPULATE THESE: # Name of task definition with a bootable-container. Note that this container will receive an airflow CLI # command as an additional parameter to its entrypoint. It's job is to boot-up and run this command task_definition = test-airflow-worker # name of registered container within your AWS cluster container_name = airflow-worker # security group ids for task to run in (comma-separated) security_groups = sg-xxxxxxxxxxxxxx # Subnets for task to run in. subnets = subnet-yyyyyyyyyy,subnet-zzzzzzzzz # FARGATE platform version. Defaults to Latest. platform_version = LATEST # Launch type can either be 'FARGATE' OR 'ECS'. Defaults to Fargate. launch_type = FARGATE # Assign public ip can either be 'ENABLED' or 'DISABLED'. Defaults to 'ENABLED'. assign_public_ip = DISABLED ### OR POPULATE THIS: # This is a function which returns a function. The outer function takes no arguments, and returns the inner function. # The inner function takes in an airflow CLI command an outputs a json compatible with the boto3 run_task API # linked above. In other words, if you don't like the way I call the fargate API then call it yourself execution_config_function = airflow.executors.fargate_executor.default_task_id_to_fargate_options_function
- links to