Uploaded image for project: 'Parquet'
  1. Parquet
  2. PARQUET-1254

Unable to read deeply nested records from Parquet file with Avro interface.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.8.1
    • None
    • parquet-avro, parquet-mr
    • None

    Description

      I am attempting to read Parquet data, whose schema contains a record nested in a wrapper record, which is also nested in an array. E.g:

      {
        "type": "record",
        "name": "record",
        "fields": [
          {
            "name": "elements",
            "type": {
              "type": "array",
              "items": {
                "type": "record",
                "name": "elementWrapper",
                "fields": [
                  {
                    "name": "array_element",
                    "type": {
                      "type": "record",
                      "name": "element",
                      "namespace": "test",
                      "fields": [
                        {
                          "name": "someField",
                          "type": "int"
                        }
                      ]
                    }
                  }
                ]
              }
            }
          }
        ]
      }
      

      When reading a parquet file with the above schema using the ParquetFileReader, I can see the file has the following schema, which appears to be correct:

      message record {
        required group elements (LIST) {
          repeated group array {
            required group array_element {
              required int32 someField;
            }
          }
        }
      }
      

      However, when attempting to read records from this file with the Avro interface (see below), I get a InvalidRecordException.

      final ParquetReader<GenericRecord> parquetReader = AvroParquetReader.<GenericRecord>builder(path).build();
      final GenericRecord read = parquetReader.read();
      

      Stepping through the code, it looks like when the record is converted to Avro, the field "someField" isn't in scope. Only fields at the top level of the schema are in scope.

      Is it expected that Avro Parquet does not support this schema? Is this a bug in the AvroRecordConverter?

      Thanks, Iain

      Stacktrace:

      org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'someField' not found
      
      	at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:220)
      	at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:125)
      	at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:274)
      	at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:227)
      	at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:73)
      	at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:531)
      	at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:481)
      	at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:284)
      	at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:136)
      	at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:90)
      	at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
      	at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:132)
      	at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:175)
      	at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:149)
      	at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:125)
      

      Below is the full code that creates a Parquet file with this schema, and then fails to read it:

          @Test
          @SneakyThrows
          public void canReadWithNestedArray() {
              final Path path = new Path("test-resources/" + UUID.randomUUID());
      
              // Construct a record that defines the final nested value we can't read
              final Schema element = Schema.createRecord("element", null, "test", false);
              element.setFields(Arrays.asList(new Schema.Field("someField", Schema.create(Schema.Type.INT), null, null)));
      
              // Create a wrapper for above nested record
              final Schema elementWrapper = Schema.createRecord("elementWrapper", null, null, false);
              elementWrapper.setFields(Arrays.asList(new Schema.Field("array_element", element, null, null)));
      
              // Create top level field that contains array of wrapped records
              final Schema.Field topLevelArrayOfWrappers = new Schema.Field("elements", Schema.createArray(elementWrapper), null, null);
      
              final Schema topLevelElement = Schema.createRecord("record", null, null, false);
              topLevelElement.setFields(Arrays.asList(topLevelArrayOfWrappers));
              final GenericRecord genericRecord = new GenericData.Record(topLevelElement);
      
              // Create element
              final GenericData.Record recordValue = new GenericData.Record(element);
              recordValue.put("someField", 5);
      
              // Create element of array, wrapper containing above element
              final GenericData.Record wrapperValue = new GenericData.Record(elementWrapper);
              wrapperValue.put("array_element", recordValue);
      
              genericRecord.put(topLevelArrayOfWrappers.name(), Arrays.asList(wrapperValue));
              
              AvroParquetWriter.Builder<GenericRecord> fileWriterBuilder = AvroParquetWriter.<GenericRecord>builder(path).withSchema(topLevelElement);
              final ParquetWriter<GenericRecord> fileWriter = fileWriterBuilder.build();
      
              fileWriter.write(genericRecord);
              fileWriter.close();
      
              final ParquetFileReader parquetFileReader = ParquetFileReader.open(new Configuration(), path);
              final FileMetaData fileMetaData = parquetFileReader.getFileMetaData();
              System.out.println(fileMetaData.getSchema().toString());
      
              final ParquetReader<GenericRecord> parquetReader = AvroParquetReader.<GenericRecord>builder(path).build();
              final GenericRecord read = parquetReader.read();
          }
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            iainlogan Bob smith
            Votes:
            1 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: