Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Duplicate
-
2.34.0
Description
The I/O is much more complicated in Beam 2.34.0 than 2.31.0. For Beam 2.34.0, the [deleteBatch](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L584) logic *filters* messages *to delete* based on the *inflight* state. However, there are assumptions in the [extend](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L756) logic where the inflight state is modified to *exclude* messages that are *assumed expired or to be expired. These messages are **not* explicitly requested by the I/O to be deleted from sqs nor dropped by the I/O itself (the I/O could be [processing](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java#L509) a message that should have been expired to wait for it to be resent).
The ideal behavior should be not advancing a message if its receipt handle is expired, skip it and wait for it to be resent.
Though I'm not sure, with the current wrong behavior: advancing a message that is not "acked" (not deleted and will be resent), if pulling the same message again with a new receipt handle within the same bundle would cause the problem of mutation detection because receipt handle is part of the Message hashcode unless there is a hash collision in the mutation detector.
*TL;DR: debugging process*
The mutation was detected in the SqsUnboundedSource, not caused by any other code in the pipeline.
The code that reports the warning and throws the exception is [here](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MutationDetectors.java#L145).
The only field changed is the Receipt handle. It's documented [here](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-message-identifiers.html) that:
> If you receive a message more than once, each time you receive it, you get a different receipt handle. You must provide the most recently received receipt handle when you request to delete the message (otherwise, the message might not be deleted).
There is no [aws_java_sdk_version](https://github.com/apache/beam/blob/release-2.34.0/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L447) change between Beam 2.31.0 and Beam 2.34.0. So AWS SDK shouldn't be the culprit.
There is a significant change between Beam 2.31.0 and Beam 2.34.0 for [SqsUnboundedReader](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.java).
To receive a message more than once, the message must not have been deleted since the first time received. The deletion logic is invoked in [SqsCheckpointMark](https://github.com/apache/beam/blob/release-2.34.0/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsCheckpointMark.java).
Attachments
Issue Links
- is superceded by
-
BEAM-13631 SQS read IO requires deterministic coder for SQS message to work in batch mode mode.
- Open
- relates to
-
BEAM-13631 SQS read IO requires deterministic coder for SQS message to work in batch mode mode.
- Open