Details
-
Task
-
Status: Closed
-
Major
-
Resolution: Fixed
-
0.9.0
-
None
Description
Reproduction Steps:
import spark.implicits._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.HoodieDataSourceHelpers import org.apache.hudi.common.model.HoodieTableType import org.apache.spark.sql.SaveMode val sourcePath = "s3://uditme-iad/hudi/tables/events/events_data_partitioned_non_null" val sourceDf = spark.read.parquet(sourcePath + "/*") var tableName = "events_data_partitioned_non_null_00" var tablePath = "s3://emr-users/uditme/hudi/tables/events/" + tableName sourceDf.write.format("org.apache.hudi") .option(HoodieWriteConfig.TABLE_NAME, tableName) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts") .mode(SaveMode.Overwrite) .save(tablePath) val readDf = spark.read.format("org.apache.hudi").load(tablePath + "/*") val updateDf = readDf.filter($"event_id" === "106") .withColumn("event_name", lit("udit_event_106")) updateDf.write.format("org.apache.hudi") .option(HoodieWriteConfig.TABLE_NAME, tableName) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts") .mode(SaveMode.Append) .save(tablePath)
Full Stack trace:
Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:276) at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.lambda$execute$caffe4c4$1(BaseCommitActionExecutor.java:102) at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1181) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1155) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1090) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1155) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:881) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: java.lang.NullPointerException at org.apache.hudi.table.action.commit.MergeHelper.runMerge(MergeHelper.java:134) at org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdateInternal(CommitActionExecutor.java:90) at org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdate(CommitActionExecutor.java:74) at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:269) ... 30 more Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: java.lang.NullPointerException at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143) at org.apache.hudi.table.action.commit.MergeHelper.runMerge(MergeHelper.java:132) ... 33 more Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141) ... 34 more Caused by: java.lang.NullPointerException at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:222) at org.apache.hudi.table.action.commit.MergeHelper$UpdateHandler.consumeOneRecord(MergeHelper.java:159) at org.apache.hudi.table.action.commit.MergeHelper$UpdateHandler.consumeOneRecord(MergeHelper.java:149) at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121) at java.util.concurrent.FutureTask.run(FutureTask.java:266) ... 3 more
Here as you can see updateDf is being formed by reading row from the bootstrapped hudi table itself. If however, we for the updateDf from the source data it works fine:
val readDf = spark.read.parquet(sourcePath + "/*")
val updateDf = readDf.filter($"event_id" === "106").withColumn("event_name", lit("udit_event_106"))
Attachments
Issue Links
- is a child of
-
HUDI-991 Bootstrap Implementation Bugs
- Resolved