Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-2896

Exception is thrown from HoodieAvroUtils.getNestedFieldVal() with returnNullIfNotFound as true

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Resolved
    • None
    • 0.11.0
    • None
    • None

    Description

      Based on Alexey's testing, when 

      getNestedFieldVal(record, properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), true)

       gets called, the following exception is thrown.  However, the method is designed to return null is the field is not found in the schema.

       

      21/11/30 10:22:42 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=R2I675JE64OFU1 partitionPath=default}, currentLocation='null', newLocation='null'}
      org.apache.avro.AvroRuntimeException: Not a valid schema field: ts
              at org.apache.avro.generic.GenericData$Record.get(GenericData.java:256)
              at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:462)
              at org.apache.hudi.common.model.DefaultHoodieRecordPayload.updateEventTime(DefaultHoodieRecordPayload.java:90)
              at org.apache.hudi.common.model.DefaultHoodieRecordPayload.getInsertValue(DefaultHoodieRecordPayload.java:84)
              at org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90)
              at org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103)
              at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190)
              at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
              at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748) 

       

       

      Code to reproduce:

       

      import scala.collection.JavaConversions._
      import org.apache.spark.sql.SaveMode._
      import org.apache.hudi.DataSourceReadOptions._
      import org.apache.hudi.DataSourceWriteOptions
      import org.apache.hudi.DataSourceWriteOptions._
      import org.apache.hudi.config.HoodieClusteringConfig
      import org.apache.hudi.config.HoodieWriteConfig._
      
      val layoutOptStrategy = "z-order";
      
      val inputPath = s"file:///${System.getProperty("user.home")}/datasets/amazon_reviews_parquet/product_category=Personal_Care_Appliances"
      val tableName = s"amazon_reviews_${layoutOptStrategy}"
      val outputPath = s"file:///tmp/hudi/$tableName"
      
      val commonOpts =
        Map(
          "hoodie.compact.inline" -> "false",
          "hoodie.bulk_insert.shuffle.parallelism" -> "10"
        )
      
      spark.sparkContext.setLogLevel("DEBUG")
      
      val df = spark.read.parquet(inputPath)
      
      df.write.format("hudi")
        .option(DataSourceWriteOptions.TABLE_TYPE.key(), COW_TABLE_TYPE_OPT_VAL)
        .option("hoodie.table.name", tableName)
        .option(PRECOMBINE_FIELD.key(), "review_id")
        .option(RECORDKEY_FIELD.key(), "review_id")
        // TODO obliterate
        .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "product_category")
        .option("hoodie.clustering.inline", "true")
        .option("hoodie.clustering.inline.max.commits", "1")
        .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true")
        .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key, layoutOptStrategy)
        .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "product_id,customer_id")
        .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
        .option(BULK_INSERT_SORT_MODE.key(), "NONE")
        .options(commonOpts)
        .mode(Overwrite)
        .save(outputPath) 

       

      Full stacktrace:

      https://gist.github.com/alexeykudinkin/8ec70674f23c797ab285fb6d2e2f14ad

       

      Attachments

        Issue Links

          Activity

            People

              alexey.kudinkin Alexey Kudinkin
              guoyihua Ethan Guo
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: