Uploaded image for project: 'Apache Airflow'
  1. Apache Airflow
  2. AIRFLOW-3046

ECS Operator mistakenly reports success when task is killed due to EC2 host termination

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.10.1
    • Component/s: contrib, operators
    • Labels:
      None

      Description

      We have ECS clusters made up of EC2 spot fleets. Among other things, this means hosts can be terminated on short notice. When this happens, all tasks (and associated containers) get terminated, as well.

      We expect that when that happens for Airflow task instances using the ECS Operator, those instances will be marked as failures and retried.

      Instead, they are marked as successful.

      As a result, the immediate downstream task fails, causing the scheduled DAG run to fail.

      Here's an example of the Airflow log output when this happens:

      [2018-09-12 01:02:02,712] {ecs_operator.py:112} INFO - ECS Task stopped, check status: {'tasks': [{'taskArn': 'arn:aws:ecs:us-east-1:111111111111:task/32d43a1d-fbc7-4659-815d-9133bde11cdc', 'clusterArn': 'arn:aws:ecs:us-east-1:111111111111:cluster/processing', 'taskDefinitionArn': 'arn:aws:ecs:us-east-1:111111111111:task-definition/foobar-testing_dataEngineering_rd:76', 'containerInstanceArn': 'arn:aws:ecs:us-east-1:111111111111:container-instance/7431f0a6-8fc5-4eff-8196-32f77d286a61', 'overrides': {'containerOverrides': [{'name': 'foobar-testing', 'command': ['./bin/generate-features.sh', '2018-09-11']}]}, 'lastStatus': 'STOPPED', 'desiredStatus': 'STOPPED', 'cpu': '4096', 'memory': '60000', 'containers': [{'containerArn': 'arn:aws:ecs:us-east-1:111111111111:container/0d5cc553-f894-4f9a-b17c-9f80f7ce8d0a', 'taskArn': 'arn:aws:ecs:us-east-1:111111111111:task/32d43a1d-fbc7-4659-815d-9133bde11cdc', 'name': 'foobar-testing', 'lastStatus': 'RUNNING', 'networkBindings': [], 'networkInterfaces': [], 'healthStatus': 'UNKNOWN'}], 'startedBy': 'Airflow', 'version': 3, 'stoppedReason': 'Host EC2 (instance i-02cf23bbd5ae26194) terminated.', 'connectivity': 'CONNECTED', 'connectivityAt': datetime.datetime(2018, 9, 12, 0, 6, 30, 245000, tzinfo=tzlocal()), 'pullStartedAt': datetime.datetime(2018, 9, 12, 0, 6, 32, 748000, tzinfo=tzlocal()), 'pullStoppedAt': datetime.datetime(2018, 9, 12, 0, 6, 59, 748000, tzinfo=tzlocal()), 'createdAt': datetime.datetime(2018, 9, 12, 0, 6, 30, 245000, tzinfo=tzlocal()), 'startedAt': datetime.datetime(2018, 9, 12, 0, 7, 0, 748000, tzinfo=tzlocal()), 'stoppingAt': datetime.datetime(2018, 9, 12, 1, 2, 0, 91000, tzinfo=tzlocal()), 'stoppedAt': datetime.datetime(2018, 9, 12, 1, 2, 0, 91000, tzinfo=tzlocal()), 'group': 'family:foobar-testing_dataEngineering_rd', 'launchType': 'EC2', 'attachments': [], 'healthStatus': 'UNKNOWN'}], 'failures': [], 'ResponseMetadata': {'RequestId': '758c791f-b627-11e8-83f7-2b76f4796ed2', 'HTTPStatusCode': 200, 'HTTPHeaders': {'server': 'Server', 'date': 'Wed, 12 Sep 2018 01:02:02 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '1412', 'connection': 'keep-alive', 'x-amzn-requestid': '758c791f-b627-11e8-83f7-2b76f4796ed2'}, 'RetryAttempts': 0}}

      I believe the function that checks whether the task is successful needs at least one more check. 

      We are currently running a modified version of the ECS Operator that contains the following _check_success_task function to address this failure condition:

          def _check_success_task(self):
              response = self.client.describe_tasks(
                  cluster=self.cluster,
                  tasks=[self.arn]
              )
              self.log.info('ECS Task stopped, check status: %s', response)
      
              if len(response.get('failures', [])) > 0:
                  raise AirflowException(response)
      
              for task in response['tasks']:
                  if 'terminated' in task.get('stoppedReason', '').lower():
                      raise AirflowException('The task was stopped because the host instance terminated: {}'.format(
                          task.get('stoppedReason', '')))
                  containers = task['containers']
                  for container in containers:
                      if container.get('lastStatus') == 'STOPPED' and \
                              container['exitCode'] != 0:
                          raise AirflowException(
                              'This task is not in success state {}'.format(task))
                      elif container.get('lastStatus') == 'PENDING':
                          raise AirflowException(
                              'This task is still pending {}'.format(task))
                      elif 'error' in container.get('reason', '').lower():
                          raise AirflowException(
                              'This containers encounter an error during launching : {}'.
                              format(container.get('reason', '').lower()))
      

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              danmactough Dan MacTough
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: