Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-2170

PubsubIO.readStrings should handle messages without metadata

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • None
    • 2.0.0
    • io-java-gcp
    • None
    • Important

    Description

      When I use PubsubIO.readStrings with messages that does not contain any metadata I got the following error:

      java.lang.reflect.InvocationTargetException
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null Map
      	at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:113)
      	at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:134)
      	at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:138)
      	at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:106)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	... 1 more
      Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null Map
      	at org.apache.beam.sdk.coders.MapCoder.encode(MapCoder.java:90)
      	at org.apache.beam.sdk.coders.MapCoder.encode(MapCoder.java:43)
      	at org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder.encode(PubsubMessageWithAttributesCoder.java:53)
      	at org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder.encode(PubsubMessageWithAttributesCoder.java:33)
      	at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:91)
      	at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:75)
      	at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:60)
      	at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:106)
      	at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44)
      	at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:111)
      	... 8 more
      

      When I add some metadata the problem disappears. You should be able to reproduce the problem with a very simple pipeline consisting of only one PubsubIO.readStrings.

      Tell me if you need more information

      Attachments

        Issue Links

          Activity

            People

              jkff Eugene Kirpichov
              alexandreyc Alexandre Crayssac
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: