Uploaded image for project: 'Parquet'
  1. Parquet
  2. PARQUET-1407

Data loss on duplicate values with AvroParquetWriter/Reader

    XMLWordPrintableJSON

Details

    Description

      public class Blah {
      
        private static Path parquetFile = new Path("oops");
        private static Schema schema = SchemaBuilder.record("spark_schema")
            .fields().optionalBytes("value").endRecord();
      
        private static GenericData.Record recordFor(String value) {
          return new GenericRecordBuilder(schema)
              .set("value", value.getBytes()).build();
        }
      
        public static void main(String ... args) throws IOException {
          try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter
                .<GenericData.Record>builder(parquetFile)
                .withSchema(schema)
                .build()) {
            writer.write(recordFor("one"));
            writer.write(recordFor("two"));
            writer.write(recordFor("three"));
            writer.write(recordFor("three"));
            writer.write(recordFor("two"));
            writer.write(recordFor("one"));
            writer.write(recordFor("zero"));
          }
      
          try (ParquetReader<GenericRecord> reader = AvroParquetReader
              .<GenericRecord>builder(parquetFile)
              .withConf(new Configuration()).build()) {
            GenericRecord rec;
            int i = 0;
            while ((rec = reader.read()) != null) {
              ByteBuffer buf = (ByteBuffer) rec.get("value");
              byte[] bytes = new byte[buf.remaining()];
              buf.get(bytes);
              System.out.println("rec " + i++ + ": " + new String(bytes));
            }
          }
        }
      }
      

      Expected output:

      rec 0: one
      rec 1: two
      rec 2: three
      rec 3: three
      rec 4: two
      rec 5: one
      rec 6: zero

      Actual:

      rec 0: one
      rec 1: two
      rec 2: three
      rec 3: 
      rec 4: 
      rec 5: 
      rec 6: zero

       

      This was found when we started getting empty byte[] values back in spark unexpectedly.  (Spark 2.3.1 and Parquet 1.8.3).   I have not tried to reproduce with parquet 1.9.0, but its a bad enough bug that I would like a 1.8.4 release that I can drop-in replace 1.8.3 without any binary compatibility issues.

       Duplicate byte[] values are lost.

       

      A few clues: 

      If I do not call ByteBuffer.get, the size of ByteBuffer.remaining does not go to zero.  I suspect a ByteBuffer is being recycled, but the call to ByteBuffer.get mutates it.  I wonder if an appropriately placed ByteBuffer.duplicate() would fix it.

       

      Attachments

        Activity

          People

            nkollar Nándor Kollár
            scottcarey Scott Carey
            Votes:
            1 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: