Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-8913

ParquetIO cannot read files written by itself using reflection

Details

    • Bug
    • Status: Resolved
    • P3
    • Resolution: Won't Fix
    • 2.15.0, 2.16.0
    • Missing
    • io-java-parquet
    • None
    • Java 8, JUnit 4

    Description

      Apache Beam is unable to read Parquet files when they are written using a Schema generated by reflection. However, it is able to read Parquet files when they are written using a hardcoded Schema.
       
      The following test passes right now. However, it fails when 'SCHEMA' is replaced with 'SCHEMA_FAILS' in this test.
       
      package com.example;

      import java.io.Serializable;
      import java.lang.reflect.Field;
      import org.apache.avro.Schema;
      import org.apache.avro.generic.GenericData;
      import org.apache.avro.generic.GenericRecord;
      import org.apache.avro.reflect.ReflectData;
      import org.apache.beam.sdk.coders.AvroCoder;
      import org.apache.beam.sdk.io.FileIO;
      import org.apache.beam.sdk.io.GenerateSequence;
      import org.apache.beam.sdk.io.parquet.ParquetIO;
      import org.apache.beam.sdk.testing.PAssert;
      import org.apache.beam.sdk.testing.TestPipeline;
      import org.apache.beam.sdk.transforms.DoFn;
      import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
      import org.apache.beam.sdk.transforms.Filter;
      import org.apache.beam.sdk.transforms.ParDo;
      import org.apache.beam.sdk.transforms.Values;
      import org.apache.beam.sdk.values.PCollection;
      import org.junit.Rule;
      import org.junit.Test;
      import org.junit.rules.TemporaryFolder;
      import org.junit.runner.RunWith;
      import org.junit.runners.JUnit4;

      @RunWith(JUnit4.class)
      public final class ReflectionTest {
      @Rule public transient TestPipeline pipeline = TestPipeline.create();

      @Rule public transient TemporaryFolder temporaryFolder = new TemporaryFolder();

      private static final Schema SCHEMA_FAILS = ReflectData.get().getSchema(Transaction.class);
      private static final Schema SCHEMA = new Schema.Parser().parse(Transaction.SCHEMA);

      /**

      • This test creates GenericRecord objects, writes them to Parquet files and reads them back.
        *
      • <p>However, it is able to read Parquet files only when they are written using a hardcoded
      • Schema (see ReflectionTest.SCHEMA defined above).
        *
      • <p>It is unable to read Parquet files when they are written using a Schema generated by
      • reflection (see ReflectionTest.SCHEMA_FAILS defined above).
        */
        @Test
        public void genericRecordToTableRow_convertsGenericRecordToTableRow() { PCollection<GenericRecord> pgr = pipeline .apply(GenerateSequence.from(0).to(2)) .apply("translateToGeneric", ParDo.of(new LongToGenericRecord())) .setCoder(AvroCoder.of(SCHEMA)); PCollection<GenericRecord> writeThenRead = pgr.apply( FileIO.<GenericRecord>write() .via(ParquetIO.sink(SCHEMA)) .to(temporaryFolder.getRoot().getAbsolutePath())) .getPerDestinationOutputFilenames() .apply(Values.create()) .apply(FileIO.matchAll()) .apply(FileIO.readMatches()) .apply(ParquetIO.readFiles(SCHEMA)) .apply(Filter.by(x -> false)); PAssert.that(writeThenRead).empty(); pipeline.run().waitUntilFinish(); }

      static class LongToGenericRecord extends DoFn<Long, GenericRecord> {
      @ProcessElement
      public void processElement(ProcessContext context) {
      Transaction tr = new Transaction(context.element());
      GenericRecord result = new GenericData.Record(SCHEMA);
      for (Schema.Field r : SCHEMA.getFields()) {
      String name = r.name();
      try

      { Field f = Transaction.class.getDeclaredField(name); f.setAccessible(true); result.put(name, f.get(tr)); }

      catch (NoSuchFieldException nsfe)

      { throw new RuntimeException("no such field: " + name, nsfe); }

      catch (IllegalAccessException iae)

      { throw new RuntimeException("no access to field: " + name, iae); }

      }
      context.output(result);
      }
      }

      /** represents a row in our generated data */
      public static final class Transaction implements Serializable {

      double amountBase = 0.0;

      public Transaction(double amt)

      { amountBase = amt; }

      double getAmountBase()

      { return amountBase; }

      public static final String SCHEMA =
      "

      {\n" + " \"namespace\": \"sample\",\n" + " \"type\": \"record\",\n" + " \"name\": \"Transaction\",\n" + " \"fields\": [\n" + " \{\"name\": \"amountBase\", \"type\": \"double\"}

      \n"
      + " ]\n"
      + "}";

      public boolean equals(Object t) {
      if (t instanceof Transaction)

      { return ((Transaction) t).getAmountBase() == amountBase; }

      else

      { return false; }

      }

      public int hashCode()

      { return (int) amountBase; }

      }
      }
       
       

      Attachments

        Activity

          People

            Unassigned Unassigned
            meetapoorvgupta Apoorv Gupta
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: