Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-9656

Reading from pubsub in portable FlinkRunner (ambigious ReadFromPubSub transform)

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.19.0
    • None
    • runner-flink
    • None
    • flink 1.9, beam-runners-flink-1.9-job-server-2.19.0.jar

    Description

      Hi,

      I'm trying to get streaming with pubsub in flinkrunner working, though I get following issue on a dummy test pipeline

      java.lang.IllegalArgumentException: unable to deserialize UnboundedSourcejava.lang.IllegalArgumentException: unable to deserialize UnboundedSource at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) at org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126) at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507) at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472) at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250) at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120) at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113) at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84) at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
       at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:844)Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) at org.xerial.snappy.Snappy.uncompress(Snappy.java:513) at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68) ... 14 moreERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5)
      
      options = PipelineOptions(pipeline_args)
      with Pipeline(options=options) as p:
          bounds_to_get = (
                  p | 'LoadJson' >> beam.io.ReadFromPubSub(
                          topic=known_args.input_topic
                  )
                  | beam.Map(lambda x: json.loads(x))
          )
      
      

      submitted on a flink cluster with following params:

      GOOGLE_APPLICATION_CREDENTIALS=~/gcp/dataflow.json python -m listing_beam_pipeline.test_runner --runner FlinkRunner --flink_master={} --flink_version 1.9  --output gs://... --input_topic projects/pubsub-public-data/topics/taxirides-realtime  --streaming

      I've tried same on both DirectRunner and DataflowRunner and it seems to work. I don't quite understand the underlying error on traceback.

      Could you advise on this issue please?

      Thanks!

      Attachments

        Activity

          People

            Unassigned Unassigned
            leopold.boudard Léopold Boudard
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: