Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-13627

aws sqs I/O misses to drop expired messages, transform output mutation exception


    • Bug
    • Status: Resolved
    • P2
    • Resolution: Duplicate
    • 2.34.0
    • Missing
    • io-java-aws


      The I/O is much more complicated in Beam 2.34.0 than 2.31.0. For Beam 2.34.0, the [deleteBatch](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L584) logic *filters* messages *to delete* based on the *inflight* state. However, there are assumptions in the [extend](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L756) logic where the inflight state is modified to *exclude* messages that are *assumed expired or to be expired. These messages are **not* explicitly requested by the I/O to be deleted from sqs nor dropped by the I/O itself (the I/O could be [processing](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L509) a message that should have been expired to wait for it to be resent).

      The ideal behavior should be not advancing a message if its receipt handle is expired, skip it and wait for it to be resent.

      Though I'm not sure, with the current wrong behavior: advancing a message that is not "acked" (not deleted and will be resent), if pulling the same message again with a new receipt handle within the same bundle would cause the problem of mutation detection because receipt handle is part of the Message hashcode unless there is a hash collision in the mutation detector.

      *TL;DR: debugging process*

      The mutation was detected in the SqsUnboundedSource, not caused by any other code in the pipeline.

      The code that reports the warning and throws the exception is [here](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MutationDetectors.java#L145).

      The only field changed is the Receipt handle. It's documented [here](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-message-identifiers.html) that:

      > If you receive a message more than once, each time you receive it, you get a different receipt handle. You must provide the most recently received receipt handle when you request to delete the message (otherwise, the message might not be deleted).

      There is no [aws_java_sdk_version](https://github.com/apache/beam/blob/release-2.34.0/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L447) change between Beam 2.31.0 and Beam 2.34.0. So AWS SDK shouldn't be the culprit.

      There is a significant change between Beam 2.31.0 and Beam 2.34.0 for [SqsUnboundedReader](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java).

      To receive a message more than once, the message must not have been deleted since the first time received. The deletion logic is invoked in [SqsCheckpointMark](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsCheckpointMark.java).


        Issue Links



              Unassigned Unassigned
              ningk Ning
              0 Vote for this issue
              2 Start watching this issue