Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Resolved
-
None
-
None
-
None
Description
Based on Alexey's testing, when
getNestedFieldVal(record, properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), true)
gets called, the following exception is thrown. However, the method is designed to return null is the field is not found in the schema.
21/11/30 10:22:42 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=R2I675JE64OFU1 partitionPath=default}, currentLocation='null', newLocation='null'} org.apache.avro.AvroRuntimeException: Not a valid schema field: ts at org.apache.avro.generic.GenericData$Record.get(GenericData.java:256) at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:462) at org.apache.hudi.common.model.DefaultHoodieRecordPayload.updateEventTime(DefaultHoodieRecordPayload.java:90) at org.apache.hudi.common.model.DefaultHoodieRecordPayload.getInsertValue(DefaultHoodieRecordPayload.java:84) at org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90) at org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190) at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Code to reproduce:
import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieClusteringConfig import org.apache.hudi.config.HoodieWriteConfig._ val layoutOptStrategy = "z-order"; val inputPath = s"file:///${System.getProperty("user.home")}/datasets/amazon_reviews_parquet/product_category=Personal_Care_Appliances" val tableName = s"amazon_reviews_${layoutOptStrategy}" val outputPath = s"file:///tmp/hudi/$tableName" val commonOpts = Map( "hoodie.compact.inline" -> "false", "hoodie.bulk_insert.shuffle.parallelism" -> "10" ) spark.sparkContext.setLogLevel("DEBUG") val df = spark.read.parquet(inputPath) df.write.format("hudi") .option(DataSourceWriteOptions.TABLE_TYPE.key(), COW_TABLE_TYPE_OPT_VAL) .option("hoodie.table.name", tableName) .option(PRECOMBINE_FIELD.key(), "review_id") .option(RECORDKEY_FIELD.key(), "review_id") // TODO obliterate .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "product_category") .option("hoodie.clustering.inline", "true") .option("hoodie.clustering.inline.max.commits", "1") .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true") .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key, layoutOptStrategy) .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "product_id,customer_id") .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) .option(BULK_INSERT_SORT_MODE.key(), "NONE") .options(commonOpts) .mode(Overwrite) .save(outputPath)
Full stacktrace:
https://gist.github.com/alexeykudinkin/8ec70674f23c797ab285fb6d2e2f14ad