Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-6517

Pipeline fails with deserializing lambda function on Spark (MapElements)

Details

    • Bug
    • Status: Resolved
    • P3
    • Resolution: Duplicate
    • 2.9.0
    • Not applicable
    • io-java-avro
    • None

    Description

      I'm trying to read from Parquet using Spark runner. Initial attempt failed because of version mismatch (Spark 2.3.2 use older Parquet). I tried shading parquet and avro, and it successfully read the Parquet record. However when I tried to access the record field using lambda function:

      p.apply(FileIO.match().filepattern(options.getInputFile()))
        .apply(FileIO.readMatches())
        .apply(ParquetIO.readFiles(schema))
        .apply(MapElements.into(TypeDescriptors.strings()).via(it -> it.get("name").toString()))
        .apply(TextIO.write().to(options.getOutput()));

       

      Exception in thread "main" java.lang.IllegalArgumentException: unable to deserialize org.apache.beam.sdk.transforms.MapElements$1@1292071f
      at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
      at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:100)
      at org.apache.beam.runners.spark.translation.MultiDoFnFunction.<init>(MultiDoFnFunction.java:103)
      at org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:374)
      at org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:340)
      at org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:438)
      at org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:426)
      at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
      at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649)
      at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649)
      at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649)
      at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
      at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
      at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
      at org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:223)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      Caused by: java.io.IOException: unexpected exception type
      at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736)
      at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266)
      at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
      at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
      at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
      at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
      at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
      at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
      at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
      at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
      at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
      at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
      at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
      at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
      at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
      at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
      at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
      at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71)
      ... 19 more
      Caused by: java.lang.reflect.InvocationTargetException
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
      ... 41 more
      Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
      at beamtest.examples.ParquetWordCount.$deserializeLambda$(ParquetWordCount.java:23)
      ... 51 more

       

      This doesn't happen if I use SimpleFunction, but many times lambda functions are more convenient and readable 

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              vho Vu Ho
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: