Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-8724 Bug fixes - Phase 1
  3. HUDI-8299

Different parquet reader config on list-typed fields is used to read parquet file generated by clustering

Log workAgile BoardRank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsAdd voteVotersWatch issueWatchersConvert to IssueLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Open
    • Critical
    • Resolution: Unresolved
    • 0.14.1
    • 1.0.1
    • None
    • None

    Description

      We saw an issue in Hudi 0.14 that `HoodieAvroParquetReader` returns null on the list-typed field on reading a parquet file generated by replacecommit (clustering), see screenshot below.  This caused NullPointerException during indexing with global index using HoodieMergedReadHandle

      org.apache.hudi.exception.HoodieException: Unable to instantiate payload class 
          at org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:101) ~[hudi-utilities-bundle_2.12.jar]
          at org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:154) ~[hudi-utilities-bundle_2.12.jar]
          at org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:122) ~[hudi-utilities-bundle_2.12.jar]
          at org.apache.hudi.avro.HoodieAvroUtils.createHoodieRecordFromAvro(HoodieAvroUtils.java:1404) ~[hudi-utilities-bundle_2.12.jar]
          at org.apache.hudi.common.model.HoodieAvroIndexedRecord.wrapIntoHoodieRecordPayloadWithParams(HoodieAvroIndexedRecord.java:159) ~[hudi-utilities-bundle_2.12.jar]
          at org.apache.hudi.io.HoodieMergedReadHandle.doMergedRead(HoodieMergedReadHandle.java:164) ~[hudi-utilities-bundle_2.12.jar]
          at org.apache.hudi.io.HoodieMergedReadHandle.getMergedRecords(HoodieMergedReadHandle.java:91) ~[hudi-utilities-bundle_2.12.jar]
          at org.apache.hudi.index.HoodieIndexUtils.lambda$getExistingRecords$c7e45d15$1(HoodieIndexUtils.java:246) ~[hudi-utilities-bundle_2.12.jar]
          at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:137) ~[hudi-utilities-bundle_2.12.jar]
          at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125) ~[spark-core_2.12-3.2.3.jar:3.2.3]
          at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) ~[scala-library-2.12.19.jar:?]
          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) ~[scala-library-2.12.19.jar:?]
          at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.19.jar:?]
          at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140) ~[spark-core_2.12-3.2.3.jar:3.2.3]
          at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.2.3.jar:3.2.3]
          at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) ~[spark-core_2.12-3.2.3.jar:3.2.3]
          at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) ~[spark-core_2.12-3.2.3.jar:3.2.3]
          at org.apache.spark.scheduler.Task.run(Task.scala:131) ~[spark-core_2.12-3.2.3.jar:3.2.3]
          at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) ~[spark-core_2.12-3.2.3.jar:3.2.3]
          at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) ~[spark-core_2.12-3.2.3.jar:3.2.3]
          at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) ~[spark-core_2.12-3.2.3.jar:3.2.3]
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_412]
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_412]
          at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_412]
      Caused by: java.lang.reflect.InvocationTargetException
          at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source) ~[?:?]
          at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_412]
          at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_412]
          at org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99) ~[hudi-utilities-bundle_2.12.jar]
          ... 23 more
      Caused by: java.lang.NullPointerException: null value for (non-nullable) List<tip_history> at triprec.tip_history
          at org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:88) ~[avro-1.11.3.jar:1.11.3]
          at org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:30) ~[avro-1.11.3.jar:1.11.3]
          at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:84) ~[avro-1.11.3.jar:1.11.3]
          at org.apache.hudi.avro.HoodieAvroUtils.indexedRecordToBytes(HoodieAvroUtils.java:189) ~[hudi-utilities-bundle_2.12.jar]
          at org.apache.hudi.avro.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:181) ~[hudi-utilities-bundle_2.12.jar]
          at org.apache.hudi.common.model.BaseAvroPayload.<init>(BaseAvroPayload.java:53) ~[hudi-utilities-bundle_2.12.jar]
          at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.<init>(OverwriteWithLatestAvroPayload.java:44) ~[hudi-utilities-bundle_2.12.jar]
          at org.apache.hudi.common.model.DefaultHoodieRecordPayload.<init>(DefaultHoodieRecordPayload.java:53) ~[hudi-utilities-bundle_2.12.jar]
          at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source) ~[?:?]
          at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_412]
          at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_412]
          at org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99) ~[hudi-utilities-bundle_2.12.jar]
          ... 23 more
      Caused by: java.lang.NullPointerException
          at org.apache.avro.generic.GenericDatumWriter.getArraySize(GenericDatumWriter.java:315) ~[avro-1.11.3.jar:1.11.3]
          at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:281) ~[avro-1.11.3.jar:1.11.3]
          at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:151) ~[avro-1.11.3.jar:1.11.3]
          at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95) ~[avro-1.11.3.jar:1.11.3]
          at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:245) ~[avro-1.11.3.jar:1.11.3]
          at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:234) ~[avro-1.11.3.jar:1.11.3]
          at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145) ~[avro-1.11.3.jar:1.11.3]
          at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95) ~[avro-1.11.3.jar:1.11.3]
          at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) ~[avro-1.11.3.jar:1.11.3]
          at org.apache.hudi.avro.HoodieAvroUtils.indexedRecordToBytes(HoodieAvroUtils.java:189) ~[hudi-utilities-bundle_2.12.jar]
          at org.apache.hudi.avro.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:181) ~[hudi-utilities-bundle_2.12.jar]
          at org.apache.hudi.common.model.BaseAvroPayload.<init>(BaseAvroPayload.java:53) ~[hudi-utilities-bundle_2.12.jar]
          at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.<init>(OverwriteWithLatestAvroPayload.java:44) ~[hudi-utilities-bundle_2.12.jar]
          at org.apache.hudi.common.model.DefaultHoodieRecordPayload.<init>(DefaultHoodieRecordPayload.java:53) ~[hudi-utilities-bundle_2.12.jar]
          at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source) ~[?:?]
          at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_412]
          at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_412]
          at org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99) ~[hudi-utilities-bundle_2.12.jar]
          ... 23 more
       

      However, the parquet file itself has non-null values for the field (see "tip_history" field):

      FROM replacecommit:
      parquet.avro.schema: {"type":"record","name":"triprec","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"partition_path","type":["null","string"],"default":null},{"name":"trip_type","type":{"type":"enum","name":"TripType","symbols":["UNKNOWN","UBERX","BLACK"],"default":"UNKNOWN"}},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
      writer.model.name: avroSchema:
      message triprec {
        ..
        required group tip_history (LIST) = 25 {
          repeated group array = 31 {
            required double amount = 32;
            required binary currency (STRING) = 33;
          }
        }
      }Row group 0:  count: 123  101.62 B records  start: 4  total(compressed): 12.206 kB total(uncompressed):19.198 kB 
      --------------------------------------------------------------------------------
                                     type      encodings count     avg size   nulls   min / max
      tip_history.array.amount       DOUBLE    G   _     123       8.44 B     0       "1.5205662454434887" / "99.8588537812955"
      tip_history.array.currency     BINARY    G _ R     123       0.82 B     0       "USD" / "USD" 

       

      Comparing the parquet metadata with the parquet files generated the deltacommit does not reveal any difference in schema:

      FROM deltacommit:
      parquet.avro.schema: {"type":"record","name":"triprec","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"partition_path","type":["null","string"],"default":null},{"name":"trip_type","type":{"type":"enum","name":"TripType","symbols":["UNKNOWN","UBERX","BLACK"],"default":"UNKNOWN"}},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
      writer.model.name: avro
      Schema:
      message triprec {
        ..
        required group tip_history (LIST) {
          repeated group array {
            required double amount;
            required binary currency (STRING);
          }
        }
      }
      Row group 0:  count: 12  269.67 B records  start: 4  total(compressed): 3.160 kB total(uncompressed):2.990 kB 
      --------------------------------------------------------------------------------
                                     type      encodings count     avg size   nulls   min / max
      tip_history.array.amount       DOUBLE    G   _     12        12.83 B    0       "9.152136419558687" / "98.82597551791528"
      tip_history.array.currency     BINARY    G _ R     12        8.33 B     0       "USD" / "USD" 

       

      When looking into the parquet reader ("org.apache.parquet.hadoop.InternalParquetRecordReader" used under the hood for reading the records), we see that whenever the list-typed field data is lost, the configuration somehow contains "parquet.avro.write-old-list-structure=false" causing AvroSchemaConverter to set "writeOldListStructure" to false (default value is true if the config is not set), which controls how the requestedSchema is generated (requestedSchema is used to read the data).

       

      InternalParquetRecordReader:
      
      ReadSupport.ReadContext readContext = this.readSupport.init(new InitContext(conf, toSetMultiMap(fileMetadata), this.fileSchema));
      this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
      this.requestedSchema = readContext.getRequestedSchema();
      this.columnCount = this.requestedSchema.getPaths().size(); 

       

      public AvroSchemaConverter(Configuration conf) {
        this.assumeRepeatedIsListElement = conf.getBoolean("parquet.avro.add-list-element-records", true);
        this.writeOldListStructure = conf.getBoolean("parquet.avro.write-old-list-structure", true);
        this.writeParquetUUID = conf.getBoolean("parquet.avro.write-parquet-uuid", false);
        this.readInt96AsFixed = conf.getBoolean("parquet.avro.readInt96AsFixed", false);
      }
      
      In private org.apache.parquet.schema.Type convertField(String fieldName, Schema schema, org.apache.parquet.schema.Type.Repetition repetition):
      
      if (type.equals(Type.ARRAY)) {
        if (this.writeOldListStructure) {
          return ConversionPatterns.listType(repetition, fieldName, this.convertField("array", schema.getElementType(), Repetition.REPEATED));
        }
      
        return ConversionPatterns.listOfElements(repetition, fieldName, this.convertField("element", schema.getElementType()));
      } 

       

      When "writeOldListStructure" is set to false, the new list structure with 3 levels is used, which is different from the file schema which uses the 2-level old list structure in the parquet schema.

        required group tip_history (LIST) {
          repeated group list {
            required group element {
              required double amount;
              required binary currency (STRING);
            }
          }
        } 

       

      This means that in other places "writeOldListStructure" is set to true by default, writing old list structure to the parquet files. We need to make this consistent to avoid such data loss on read.

      Explicitly setting "parquet.avro.write-old-list-structure=false" in the Hadoop config for all Hudi Spark jobs can get around this issue.

       

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            jonvex Jonathan Vexler Assign to me
            yihua Y Ethan Guo

            Dates

              Created:
              Updated:

              Slack

                Issue deployment