Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-11721

Cannot read array values with ParquetIO

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.25.0
    • None
    • io-java-parquet
    • None

    Description

      Hi Beam community,

       I am seeing an error when reading an array field using ParquetIO. I was using beam 2.25.  Both direct runner and spark runner testing is seeing this issue. This is a blocker issue to me for the beam adoption, so a prompt help would be appreciated.

       Below is the schema tree as a quick visualization. The array field name is "numbers" and the element type is int. 

       

      root |

           -- numbers: array (nullable = true) | |

                 -- element: integer (containsNull = true)

       

      The beam code is very simple: pipeline.apply(ParquetIO.read(avroSchema).from(parquetPath));

       

      Below is the error when running that code:

       

      Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.Number
      
                      at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:353)
      
                      at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:321)
      
                      at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:216)
      
                      at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
      
                      at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
      
                      at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
      
      Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.Number
      
                      at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:156)
      
                      at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
      
                      at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234)
      
                      at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136)
      
                      at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
      
                      at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
      
                      at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
      
                      at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
      
                      at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
      
                      at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
      
                      at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:317)
      
                      at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
      
                      at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
      
                      at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
      
                      at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
      
                      at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
      
                      at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
      
                      at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
      
                      at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
      
                      at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:301)
      
                      at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
      
                      at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
      
                      at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
      
                      at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
      
                      at org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement(ParquetIO.java:646)
      
      

      Attachments

        1. from-spark.snappy.parquet
          0.5 kB
          Tao Li
        2. schema.avsc
          0.2 kB
          Tao Li

        Activity

          People

            Unassigned Unassigned
            sekiforever Tao Li
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated: