Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
None
-
None
-
None
Description
As reported by the user:
https://github.com/apache/hudi/issues/6354
Currently, setting {{hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload' }}will result in the following exception:
org.apache.hudi.exception.HoodieUpsertExceptio n: Error upserting bucketType UPDATE for partition :0 at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merg e new record with old value in storage, for new record {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, currentLocation='HoodieRecordLocation {instantTime=20220810095846644, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', newLocation='HoodieRecordLocation {instantTime=20220810101719437, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": "20220810095824514_0_0", "_hoodie_record_key": "id:1", "_hoodie_partition_path": "", "_hoodie_file_name": "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", "id": 1, "name": "a0", "ts": 1000}} at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322) ... 28 more Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new record with old value in storage, for new record {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, currentLocation='HoodieRecordLocation {instantTime=20220810095846644, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', newLocation='HoodieRecordLocation {instantTime=20220810101719437, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": "20220810095824514_0_0", "_hoodie_record_key": "id:1", "_hoodie_partition_path": "", "_hoodie_file_name": "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", "id": 1, "name": "a0", "ts": 1000}} at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:161) at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147) ... 31 more Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new record with old value in storage, for new record {HoodieRecord{key=HoodieKey { r ecordKey=id:1 partitionPath=}, currentLocation='HoodieRecordLocation {instantTime=20220810095846644, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', newLocation='HoodieRecordLocation {instantTime=20220810101719437, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": "20220810095824514_0_0", "_hoodie_record_key": "id:1", "_hoodie_partition_path": "", "_hoodie_file_name": "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", "id": 1, "name": "a0", "ts": 1000}} at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:155) ... 32 more Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new record with old value in storage, for new record {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, currentLoc ation='HoodieRecordLocation {instantTime=20220810095846644, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', newLocation='HoodieRecordLocation {instantTime=20220810101719437, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": "20220810095824514_0_0", "_hoodie_record_key": "id:1", "_hoodie_partition_path": "", "_hoodie_file_name": "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", "id": 1, "name": "a0", "ts": 1000}} at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:351) at org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:122) at org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:112) at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135) at java.util.concurrent.FutureTask.run(FutureTask.java:266) ... 3 more Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long at java.lang.Long.compareTo(Long.java:54) at org.apache.hudi.common.model.DefaultHoodieRecordPayload.needUpdatingPersistedRecord(DefaultHoodieRecordPayload.java:139) at org.apache.hudi.common.model.DefaultHoodieRecordPayload.combineAndGetUpdateValue(DefaultHoodieRecordPayload.java:62) at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:332) ... 8 more3071575 [task-result-getter-2] ERROR org.apache.spark.scheduler.TaskSetManager - Task 0 in stage 996.0 failed 4 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 996.0 failed 4 times, most recent failure: Lost task 0.3 in stage 996.0 (TID 27262) (szzb-bg-uat-sdp-hadoop-05 executor 8): or g.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merg e new record with old value in storage, for new record {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, currentLocation='HoodieRecordLocation {instantTime=20220810095846644, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', newLocation='HoodieRecordLocation {instantTime=20220810101719437, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": "20220810095824514_0_0", "_hoodie_record_key": "id:1", "_hoodie_partition_path": "", "_hoodie_file_name": "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", "id": 1, "name": "a0", "ts": 1000}} at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322) ... 28 more
This occurs b/c DefaultHoodieRecordPayload somehow overrides the setting of ExpressionPayload that have to be used to appropriately handle MERGE INTO statement.
Steps to reproduce:
--step 1: create table drop table hudi_cow_pk_cbfield_tbl; create table hudi_cow_pk_cbfield_tbl ( id bigint, name string, ts bigint ) using hudi tblproperties ( type = 'cow', primaryKey = 'id', preCombineField = 'ts', hoodie.datasource.write.hive_style_partitioning = false, hoodie.datasource.write.operation = 'upsert', hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload' ) ; --step 2: insert into a recored with primaryKey=1, preCombineField=1000 insert into hudi_cow_pk_cbfield_tbl select 1, 'a0', 1000; --step 3: 'insert' with same primaryKey, but change the preCombineField value to the smaller value 100, the action not effect expectedly (note:is normal for setting the param: hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload') insert into hudi_cow_pk_cbfield_tbl select 1, 'a0_new', 100; select * from hudi_cow_pk_cbfield_tbl; --step 4: 'update' action with same primaryKey, but change the preCombineField value to the smaller value 20, the action not effect expectedly (note:is normal for setting the param: hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload') update hudi_cow_pk_cbfield_tbl set name='a1_new',ts=20 where id= 1; select * from hudi_cow_pk_cbfield_tbl; --step 5: 'merge into' action with same primaryKey, occure a error: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 merge into hudi_cow_pk_cbfield_tbl as target using (select 1 as id,'a1_merge' as name,2000 as ts) as source on target.id = source.id when matched then update set * when not matched then insert * ;