Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-13631

SQS read IO requires deterministic coder for SQS message to work in batch mode mode.

Details

    • Bug
    • Status: In Progress
    • P2
    • Resolution: Unresolved
    • None
    • 2.37.0
    • io-java-aws

    Description

      Currently the SQS read IO uses SerializableCoder.of(Message.class), which isn't deterministic. This may cause issues when used in batch mode (based on BoundedReadFromUnboundedSource). The mutation detector will throw in such case:

      Jan 10, 2022 11:37:05 AM org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector verifyUnmodifiedThrowingCheckedExceptions
      WARNING: Coder of type class org.apache.beam.sdk.coders.SerializableCoder has a #structuralValue method which does not return true when the encoding of the elements is equal. Element Shard{source=org.apache.beam.sdk.io.aws.sqs.SqsUnboundedSource@5f19451c, maxNumRecords=1, maxReadTime=null}
      Coder of type class org.apache.beam.sdk.coders.SerializableCoder has a #structuralValue method which does not return true when the encoding of the elements is equal. Element Shard{source=org.apache.beam.sdk.io.aws.sqs.SqsUnboundedSource@5f19451c, maxNumRecords=1, maxReadTime=null}
      
      Exception in thread "main" org.apache.beam.sdk.util.IllegalMutationException: PTransform SqsIO.Read/Read(SqsUnboundedSource)/Read/ParMultiDo(Read) mutated value ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj2FXnTVQ==,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}} after it was output (new value was ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: DeVRF8vQATm1f+rHIvR3eaejlRHksL1R7WE4zDT7lSwdIs9gJCYKXFXnTVQ==,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}}). Values must not be mutated in any way after being output.
          at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137)
          at org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:231)
          at org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:163)
          at org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292)
          at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194)
          at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:834)
      Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj2KQ==,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}} mutated illegally, new value was ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQ==,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}}. Encoding was rO.
          at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158)
          at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153)
          at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128)
          at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127)
          ... 10 more
      Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj=,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}} mutated illegally, new value was ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQE==,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}}. Encoding was rO2Mw.
       

      https://stackoverflow.com/questions/70648489/apache-beam-2-34-0-sqsio-illegal-mutation-exception

      Attachments

        Issue Links

          Activity

            People

              mosche Moritz Mack
              mosche Moritz Mack
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 0.5h
                  0.5h