Details
-
Sub-task
-
Status: Open
-
Critical
-
Resolution: Unresolved
-
0.14.1
-
None
-
None
Description
We saw an issue in Hudi 0.14 that `HoodieAvroParquetReader` returns null on the list-typed field on reading a parquet file generated by replacecommit (clustering), see screenshot below. This caused NullPointerException during indexing with global index using HoodieMergedReadHandle
org.apache.hudi.exception.HoodieException: Unable to instantiate payload class at org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:101) ~[hudi-utilities-bundle_2.12.jar] at org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:154) ~[hudi-utilities-bundle_2.12.jar] at org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:122) ~[hudi-utilities-bundle_2.12.jar] at org.apache.hudi.avro.HoodieAvroUtils.createHoodieRecordFromAvro(HoodieAvroUtils.java:1404) ~[hudi-utilities-bundle_2.12.jar] at org.apache.hudi.common.model.HoodieAvroIndexedRecord.wrapIntoHoodieRecordPayloadWithParams(HoodieAvroIndexedRecord.java:159) ~[hudi-utilities-bundle_2.12.jar] at org.apache.hudi.io.HoodieMergedReadHandle.doMergedRead(HoodieMergedReadHandle.java:164) ~[hudi-utilities-bundle_2.12.jar] at org.apache.hudi.io.HoodieMergedReadHandle.getMergedRecords(HoodieMergedReadHandle.java:91) ~[hudi-utilities-bundle_2.12.jar] at org.apache.hudi.index.HoodieIndexUtils.lambda$getExistingRecords$c7e45d15$1(HoodieIndexUtils.java:246) ~[hudi-utilities-bundle_2.12.jar] at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:137) ~[hudi-utilities-bundle_2.12.jar] at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125) ~[spark-core_2.12-3.2.3.jar:3.2.3] at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) ~[scala-library-2.12.19.jar:?] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) ~[scala-library-2.12.19.jar:?] at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.19.jar:?] at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140) ~[spark-core_2.12-3.2.3.jar:3.2.3] at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.2.3.jar:3.2.3] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) ~[spark-core_2.12-3.2.3.jar:3.2.3] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) ~[spark-core_2.12-3.2.3.jar:3.2.3] at org.apache.spark.scheduler.Task.run(Task.scala:131) ~[spark-core_2.12-3.2.3.jar:3.2.3] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) ~[spark-core_2.12-3.2.3.jar:3.2.3] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) ~[spark-core_2.12-3.2.3.jar:3.2.3] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) ~[spark-core_2.12-3.2.3.jar:3.2.3] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_412] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_412] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_412] Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source) ~[?:?] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_412] at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_412] at org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99) ~[hudi-utilities-bundle_2.12.jar] ... 23 more Caused by: java.lang.NullPointerException: null value for (non-nullable) List<tip_history> at triprec.tip_history at org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:88) ~[avro-1.11.3.jar:1.11.3] at org.apache.avro.path.TracingNullPointException.summarize(TracingNullPointException.java:30) ~[avro-1.11.3.jar:1.11.3] at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:84) ~[avro-1.11.3.jar:1.11.3] at org.apache.hudi.avro.HoodieAvroUtils.indexedRecordToBytes(HoodieAvroUtils.java:189) ~[hudi-utilities-bundle_2.12.jar] at org.apache.hudi.avro.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:181) ~[hudi-utilities-bundle_2.12.jar] at org.apache.hudi.common.model.BaseAvroPayload.<init>(BaseAvroPayload.java:53) ~[hudi-utilities-bundle_2.12.jar] at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.<init>(OverwriteWithLatestAvroPayload.java:44) ~[hudi-utilities-bundle_2.12.jar] at org.apache.hudi.common.model.DefaultHoodieRecordPayload.<init>(DefaultHoodieRecordPayload.java:53) ~[hudi-utilities-bundle_2.12.jar] at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source) ~[?:?] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_412] at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_412] at org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99) ~[hudi-utilities-bundle_2.12.jar] ... 23 more Caused by: java.lang.NullPointerException at org.apache.avro.generic.GenericDatumWriter.getArraySize(GenericDatumWriter.java:315) ~[avro-1.11.3.jar:1.11.3] at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:281) ~[avro-1.11.3.jar:1.11.3] at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:151) ~[avro-1.11.3.jar:1.11.3] at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95) ~[avro-1.11.3.jar:1.11.3] at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:245) ~[avro-1.11.3.jar:1.11.3] at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:234) ~[avro-1.11.3.jar:1.11.3] at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145) ~[avro-1.11.3.jar:1.11.3] at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95) ~[avro-1.11.3.jar:1.11.3] at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) ~[avro-1.11.3.jar:1.11.3] at org.apache.hudi.avro.HoodieAvroUtils.indexedRecordToBytes(HoodieAvroUtils.java:189) ~[hudi-utilities-bundle_2.12.jar] at org.apache.hudi.avro.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:181) ~[hudi-utilities-bundle_2.12.jar] at org.apache.hudi.common.model.BaseAvroPayload.<init>(BaseAvroPayload.java:53) ~[hudi-utilities-bundle_2.12.jar] at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.<init>(OverwriteWithLatestAvroPayload.java:44) ~[hudi-utilities-bundle_2.12.jar] at org.apache.hudi.common.model.DefaultHoodieRecordPayload.<init>(DefaultHoodieRecordPayload.java:53) ~[hudi-utilities-bundle_2.12.jar] at sun.reflect.GeneratedConstructorAccessor53.newInstance(Unknown Source) ~[?:?] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_412] at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_412] at org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:99) ~[hudi-utilities-bundle_2.12.jar] ... 23 more
However, the parquet file itself has non-null values for the field (see "tip_history" field):
FROM replacecommit: parquet.avro.schema: {"type":"record","name":"triprec","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"partition_path","type":["null","string"],"default":null},{"name":"trip_type","type":{"type":"enum","name":"TripType","symbols":["UNKNOWN","UBERX","BLACK"],"default":"UNKNOWN"}},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]} writer.model.name: avroSchema: message triprec { .. required group tip_history (LIST) = 25 { repeated group array = 31 { required double amount = 32; required binary currency (STRING) = 33; } } }Row group 0: count: 123 101.62 B records start: 4 total(compressed): 12.206 kB total(uncompressed):19.198 kB -------------------------------------------------------------------------------- type encodings count avg size nulls min / max tip_history.array.amount DOUBLE G _ 123 8.44 B 0 "1.5205662454434887" / "99.8588537812955" tip_history.array.currency BINARY G _ R 123 0.82 B 0 "USD" / "USD"
Comparing the parquet metadata with the parquet files generated the deltacommit does not reveal any difference in schema:
FROM deltacommit: parquet.avro.schema: {"type":"record","name":"triprec","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"partition_path","type":["null","string"],"default":null},{"name":"trip_type","type":{"type":"enum","name":"TripType","symbols":["UNKNOWN","UBERX","BLACK"],"default":"UNKNOWN"}},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]} writer.model.name: avro Schema: message triprec { .. required group tip_history (LIST) { repeated group array { required double amount; required binary currency (STRING); } } } Row group 0: count: 12 269.67 B records start: 4 total(compressed): 3.160 kB total(uncompressed):2.990 kB -------------------------------------------------------------------------------- type encodings count avg size nulls min / max tip_history.array.amount DOUBLE G _ 12 12.83 B 0 "9.152136419558687" / "98.82597551791528" tip_history.array.currency BINARY G _ R 12 8.33 B 0 "USD" / "USD"
When looking into the parquet reader ("org.apache.parquet.hadoop.InternalParquetRecordReader" used under the hood for reading the records), we see that whenever the list-typed field data is lost, the configuration somehow contains "parquet.avro.write-old-list-structure=false" causing AvroSchemaConverter to set "writeOldListStructure" to false (default value is true if the config is not set), which controls how the requestedSchema is generated (requestedSchema is used to read the data).
InternalParquetRecordReader: ReadSupport.ReadContext readContext = this.readSupport.init(new InitContext(conf, toSetMultiMap(fileMetadata), this.fileSchema)); this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); this.requestedSchema = readContext.getRequestedSchema(); this.columnCount = this.requestedSchema.getPaths().size();
public AvroSchemaConverter(Configuration conf) { this.assumeRepeatedIsListElement = conf.getBoolean("parquet.avro.add-list-element-records", true); this.writeOldListStructure = conf.getBoolean("parquet.avro.write-old-list-structure", true); this.writeParquetUUID = conf.getBoolean("parquet.avro.write-parquet-uuid", false); this.readInt96AsFixed = conf.getBoolean("parquet.avro.readInt96AsFixed", false); } In private org.apache.parquet.schema.Type convertField(String fieldName, Schema schema, org.apache.parquet.schema.Type.Repetition repetition): if (type.equals(Type.ARRAY)) { if (this.writeOldListStructure) { return ConversionPatterns.listType(repetition, fieldName, this.convertField("array", schema.getElementType(), Repetition.REPEATED)); } return ConversionPatterns.listOfElements(repetition, fieldName, this.convertField("element", schema.getElementType())); }
When "writeOldListStructure" is set to false, the new list structure with 3 levels is used, which is different from the file schema which uses the 2-level old list structure in the parquet schema.
required group tip_history (LIST) {
repeated group list {
required group element {
required double amount;
required binary currency (STRING);
}
}
}
This means that in other places "writeOldListStructure" is set to true by default, writing old list structure to the parquet files. We need to make this consistent to avoid such data loss on read.
Explicitly setting "parquet.avro.write-old-list-structure=false" in the Hadoop config for all Hudi Spark jobs can get around this issue.