Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.8.1
-
None
-
None
Description
I am attempting to read Parquet data, whose schema contains a record nested in a wrapper record, which is also nested in an array. E.g:
{ "type": "record", "name": "record", "fields": [ { "name": "elements", "type": { "type": "array", "items": { "type": "record", "name": "elementWrapper", "fields": [ { "name": "array_element", "type": { "type": "record", "name": "element", "namespace": "test", "fields": [ { "name": "someField", "type": "int" } ] } } ] } } } ] }
When reading a parquet file with the above schema using the ParquetFileReader, I can see the file has the following schema, which appears to be correct:
message record { required group elements (LIST) { repeated group array { required group array_element { required int32 someField; } } } }
However, when attempting to read records from this file with the Avro interface (see below), I get a InvalidRecordException.
final ParquetReader<GenericRecord> parquetReader = AvroParquetReader.<GenericRecord>builder(path).build(); final GenericRecord read = parquetReader.read();
Stepping through the code, it looks like when the record is converted to Avro, the field "someField" isn't in scope. Only fields at the top level of the schema are in scope.
Is it expected that Avro Parquet does not support this schema? Is this a bug in the AvroRecordConverter?
Thanks, Iain
Stacktrace:
org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'someField' not found
at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:220)
at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:125)
at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:274)
at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:227)
at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:73)
at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:531)
at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:481)
at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:284)
at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:136)
at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:90)
at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:132)
at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:175)
at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:149)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:125)
Below is the full code that creates a Parquet file with this schema, and then fails to read it:
@Test @SneakyThrows public void canReadWithNestedArray() { final Path path = new Path("test-resources/" + UUID.randomUUID()); // Construct a record that defines the final nested value we can't read final Schema element = Schema.createRecord("element", null, "test", false); element.setFields(Arrays.asList(new Schema.Field("someField", Schema.create(Schema.Type.INT), null, null))); // Create a wrapper for above nested record final Schema elementWrapper = Schema.createRecord("elementWrapper", null, null, false); elementWrapper.setFields(Arrays.asList(new Schema.Field("array_element", element, null, null))); // Create top level field that contains array of wrapped records final Schema.Field topLevelArrayOfWrappers = new Schema.Field("elements", Schema.createArray(elementWrapper), null, null); final Schema topLevelElement = Schema.createRecord("record", null, null, false); topLevelElement.setFields(Arrays.asList(topLevelArrayOfWrappers)); final GenericRecord genericRecord = new GenericData.Record(topLevelElement); // Create element final GenericData.Record recordValue = new GenericData.Record(element); recordValue.put("someField", 5); // Create element of array, wrapper containing above element final GenericData.Record wrapperValue = new GenericData.Record(elementWrapper); wrapperValue.put("array_element", recordValue); genericRecord.put(topLevelArrayOfWrappers.name(), Arrays.asList(wrapperValue)); AvroParquetWriter.Builder<GenericRecord> fileWriterBuilder = AvroParquetWriter.<GenericRecord>builder(path).withSchema(topLevelElement); final ParquetWriter<GenericRecord> fileWriter = fileWriterBuilder.build(); fileWriter.write(genericRecord); fileWriter.close(); final ParquetFileReader parquetFileReader = ParquetFileReader.open(new Configuration(), path); final FileMetaData fileMetaData = parquetFileReader.getFileMetaData(); System.out.println(fileMetaData.getSchema().toString()); final ParquetReader<GenericRecord> parquetReader = AvroParquetReader.<GenericRecord>builder(path).build(); final GenericRecord read = parquetReader.read(); }