ParquetIO cannot read files written by itself using reflection


      Apache Beam is unable to read Parquet files when they are written using a Schema generated by reflection. However, it is able to read Parquet files when they are written using a hardcoded Schema.
      The following test passes right now. However, it fails when 'SCHEMA' is replaced with 'SCHEMA_FAILS' in this test.
      package com.example;

      import java.io.Serializable;
      import java.lang.reflect.Field;
      import org.apache.avro.Schema;
      import org.apache.avro.generic.GenericData;
      import org.apache.avro.generic.GenericRecord;
      import org.apache.avro.reflect.ReflectData;
      import org.apache.beam.sdk.coders.AvroCoder;
      import org.apache.beam.sdk.io.FileIO;
      import org.apache.beam.sdk.io.GenerateSequence;
      import org.apache.beam.sdk.io.parquet.ParquetIO;
      import org.apache.beam.sdk.testing.PAssert;
      import org.apache.beam.sdk.testing.TestPipeline;
      import org.apache.beam.sdk.transforms.DoFn;
      import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
      import org.apache.beam.sdk.transforms.Filter;
      import org.apache.beam.sdk.transforms.ParDo;
      import org.apache.beam.sdk.transforms.Values;
      import org.apache.beam.sdk.values.PCollection;
      import org.junit.Rule;
      import org.junit.Test;
      import org.junit.rules.TemporaryFolder;
      import org.junit.runner.RunWith;
      import org.junit.runners.JUnit4;

      public final class ReflectionTest {
      @Rule public transient TestPipeline pipeline = TestPipeline.create();

      @Rule public transient TemporaryFolder temporaryFolder = new TemporaryFolder();

      private static final Schema SCHEMA_FAILS = ReflectData.get().getSchema(Transaction.class);
      private static final Schema SCHEMA = new Schema.Parser().parse(Transaction.SCHEMA);


      • This test creates GenericRecord objects, writes them to Parquet files and reads them back.
      • <p>However, it is able to read Parquet files only when they are written using a hardcoded
      • Schema (see ReflectionTest.SCHEMA defined above).
      • <p>It is unable to read Parquet files when they are written using a Schema generated by
      • reflection (see ReflectionTest.SCHEMA_FAILS defined above).
        public void genericRecordToTableRow_convertsGenericRecordToTableRow() { PCollection<GenericRecord> pgr = pipeline .apply(GenerateSequence.from(0).to(2)) .apply("translateToGeneric", ParDo.of(new LongToGenericRecord())) .setCoder(AvroCoder.of(SCHEMA)); PCollection<GenericRecord> writeThenRead = pgr.apply( FileIO.<GenericRecord>write() .via(ParquetIO.sink(SCHEMA)) .to(temporaryFolder.getRoot().getAbsolutePath())) .getPerDestinationOutputFilenames() .apply(Values.create()) .apply(FileIO.matchAll()) .apply(FileIO.readMatches()) .apply(ParquetIO.readFiles(SCHEMA)) .apply(Filter.by(x -> false)); PAssert.that(writeThenRead).empty(); pipeline.run().waitUntilFinish(); }

      static class LongToGenericRecord extends DoFn<Long, GenericRecord> {
      public void processElement(ProcessContext context) {
      Transaction tr = new Transaction(context.element());
      GenericRecord result = new GenericData.Record(SCHEMA);
      for (Schema.Field r : SCHEMA.getFields()) {
      String name = r.name();

      { Field f = Transaction.class.getDeclaredField(name); f.setAccessible(true); result.put(name, f.get(tr)); }

      catch (NoSuchFieldException nsfe)

      { throw new RuntimeException("no such field: " + name, nsfe); }

      catch (IllegalAccessException iae)

      { throw new RuntimeException("no access to field: " + name, iae); }


      /** represents a row in our generated data */
      public static final class Transaction implements Serializable {

      double amountBase = 0.0;

      public Transaction(double amt)

      { amountBase = amt; }

      double getAmountBase()

      { return amountBase; }

      public static final String SCHEMA =

      {\n" + " \"namespace\": \"sample\",\n" + " \"type\": \"record\",\n" + " \"name\": \"Transaction\",\n" + " \"fields\": [\n" + " \{\"name\": \"amountBase\", \"type\": \"double\"}

      + " ]\n"
      + "}";

      public boolean equals(Object t) {
      if (t instanceof Transaction)

      { return ((Transaction) t).getAmountBase() == amountBase; }


      { return false; }


      public int hashCode()

      { return (int) amountBase; }





