Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-1021

[Bug] Unable to update bootstrapped table using rows from the written bootstrapped table

    XMLWordPrintableJSON

Details

    • Task
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 0.9.0
    • 0.6.0
    • bootstrap
    • None

    Description

      Reproduction Steps:

       

      import spark.implicits._
      import org.apache.hudi.DataSourceWriteOptions
      import org.apache.hudi.DataSourceReadOptions
      import org.apache.hudi.config.HoodieWriteConfig
      import org.apache.hudi.HoodieDataSourceHelpers
      import org.apache.hudi.common.model.HoodieTableType
      import org.apache.spark.sql.SaveMode
      
      
      val sourcePath = "s3://uditme-iad/hudi/tables/events/events_data_partitioned_non_null"
      val sourceDf = spark.read.parquet(sourcePath + "/*")
      var tableName = "events_data_partitioned_non_null_00"
      var tablePath = "s3://emr-users/uditme/hudi/tables/events/" + tableName
      
      sourceDf.write.format("org.apache.hudi")
       .option(HoodieWriteConfig.TABLE_NAME, tableName)
       .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
       .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
       .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
       .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
       .mode(SaveMode.Overwrite)
       .save(tablePath)
      
      val readDf = spark.read.format("org.apache.hudi").load(tablePath + "/*")
      
      val updateDf = readDf.filter($"event_id" === "106")
                       .withColumn("event_name", lit("udit_event_106"))
                       
      updateDf.write.format("org.apache.hudi")
       .option(HoodieWriteConfig.TABLE_NAME, tableName)
       .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
       .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
       .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
       .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
       .mode(SaveMode.Append)
       .save(tablePath)
      
      

       

      Full Stack trace:

      Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
       at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:276)
       at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.lambda$execute$caffe4c4$1(BaseCommitActionExecutor.java:102)
       at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
       at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
       at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
       at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
       at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
       at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
       at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
       at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
       at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1181)
       at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1155)
       at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1090)
       at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1155)
       at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:881)
       at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
       at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
       at org.apache.spark.scheduler.Task.run(Task.scala:123)
       at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
       at java.lang.Thread.run(Thread.java:748)
       Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: java.lang.NullPointerException
       at org.apache.hudi.table.action.commit.MergeHelper.runMerge(MergeHelper.java:134)
       at org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdateInternal(CommitActionExecutor.java:90)
       at org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdate(CommitActionExecutor.java:74)
       at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:269)
       ... 30 more
       Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: java.lang.NullPointerException
       at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
       at org.apache.hudi.table.action.commit.MergeHelper.runMerge(MergeHelper.java:132)
       ... 33 more
       Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
       at java.util.concurrent.FutureTask.report(FutureTask.java:122)
       at java.util.concurrent.FutureTask.get(FutureTask.java:192)
       at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
       ... 34 more
       Caused by: java.lang.NullPointerException
       at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:222)
       at org.apache.hudi.table.action.commit.MergeHelper$UpdateHandler.consumeOneRecord(MergeHelper.java:159)
       at org.apache.hudi.table.action.commit.MergeHelper$UpdateHandler.consumeOneRecord(MergeHelper.java:149)
       at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
       at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
       ... 3 more
      
      

       

      Here as you can see updateDf is being formed by reading row from the bootstrapped hudi table itself. If however, we for the updateDf from the source data it works fine:

      val readDf = spark.read.parquet(sourcePath + "/*")
      val updateDf = readDf.filter($"event_id" === "106").withColumn("event_name", lit("udit_event_106"))

       

       

      Attachments

        Issue Links

          Activity

            People

              wenningd Wenning Ding
              uditme Udit Mehrotra
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: