Details
-
Bug
-
Status: Resolved
-
P3
-
Resolution: Won't Fix
-
2.15.0, 2.16.0
-
None
-
Java 8, JUnit 4
Description
Apache Beam is unable to read Parquet files when they are written using a Schema generated by reflection. However, it is able to read Parquet files when they are written using a hardcoded Schema.
The following test passes right now. However, it fails when 'SCHEMA' is replaced with 'SCHEMA_FAILS' in this test.
package com.example;
import java.io.Serializable;
import java.lang.reflect.Field;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.ReflectData;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public final class ReflectionTest {
@Rule public transient TestPipeline pipeline = TestPipeline.create();
@Rule public transient TemporaryFolder temporaryFolder = new TemporaryFolder();
private static final Schema SCHEMA_FAILS = ReflectData.get().getSchema(Transaction.class);
private static final Schema SCHEMA = new Schema.Parser().parse(Transaction.SCHEMA);
/**
- This test creates GenericRecord objects, writes them to Parquet files and reads them back.
* - <p>However, it is able to read Parquet files only when they are written using a hardcoded
- Schema (see ReflectionTest.SCHEMA defined above).
* - <p>It is unable to read Parquet files when they are written using a Schema generated by
- reflection (see ReflectionTest.SCHEMA_FAILS defined above).
*/
@Test
public void genericRecordToTableRow_convertsGenericRecordToTableRow() { PCollection<GenericRecord> pgr = pipeline .apply(GenerateSequence.from(0).to(2)) .apply("translateToGeneric", ParDo.of(new LongToGenericRecord())) .setCoder(AvroCoder.of(SCHEMA)); PCollection<GenericRecord> writeThenRead = pgr.apply( FileIO.<GenericRecord>write() .via(ParquetIO.sink(SCHEMA)) .to(temporaryFolder.getRoot().getAbsolutePath())) .getPerDestinationOutputFilenames() .apply(Values.create()) .apply(FileIO.matchAll()) .apply(FileIO.readMatches()) .apply(ParquetIO.readFiles(SCHEMA)) .apply(Filter.by(x -> false)); PAssert.that(writeThenRead).empty(); pipeline.run().waitUntilFinish(); }
static class LongToGenericRecord extends DoFn<Long, GenericRecord> {
@ProcessElement
public void processElement(ProcessContext context) {
Transaction tr = new Transaction(context.element());
GenericRecord result = new GenericData.Record(SCHEMA);
for (Schema.Field r : SCHEMA.getFields()) {
String name = r.name();
try
catch (NoSuchFieldException nsfe)
{ throw new RuntimeException("no such field: " + name, nsfe); }catch (IllegalAccessException iae)
{ throw new RuntimeException("no access to field: " + name, iae); } }
context.output(result);
}
}
/** represents a row in our generated data */
public static final class Transaction implements Serializable {
double amountBase = 0.0;
public Transaction(double amt)
{ amountBase = amt; }double getAmountBase()
{ return amountBase; }public static final String SCHEMA =
"
\n"
+ " ]\n"
+ "}";
public boolean equals(Object t) {
if (t instanceof Transaction)
else
{ return false; }}
public int hashCode()
{ return (int) amountBase; } }
}