Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-4879

MERGE INTO fails when setting "hoodie.datasource.write.payload.class"

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • None
    • 0.12.1
    • None
    • None

    Description

      As reported by the user:

      https://github.com/apache/hudi/issues/6354

       

      Currently, setting {{hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload' }}will result in the following exception:

      org.apache.hudi.exception.HoodieUpsertExceptio
      n: Error upserting bucketType UPDATE for partition :0 at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
      at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
      at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
      at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
      at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
      at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
      at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
      at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
      at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
      at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
      at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
      at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
      at org.apache.spark.scheduler.Task.run(Task.scala:131)
      at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merg
      e new record with old value in storage, for new record {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, currentLocation='HoodieRecordLocation {instantTime=20220810095846644, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', newLocation='HoodieRecordLocation {instantTime=20220810101719437, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": "20220810095824514_0_0", "_hoodie_record_key": "id:1", "_hoodie_partition_path": "", "_hoodie_file_name": "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", "id": 1, "name": "a0", "ts": 1000}} at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
      at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
      at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
      at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
      ... 28 more
      Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new record with old value in storage, for
      new record {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, currentLocation='HoodieRecordLocation {instantTime=20220810095846644, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', newLocation='HoodieRecordLocation {instantTime=20220810101719437, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": "20220810095824514_0_0", "_hoodie_record_key": "id:1", "_hoodie_partition_path": "", "_hoodie_file_name": "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", "id": 1, "name": "a0", "ts": 1000}} at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:161)
      at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147)
      ... 31 more
      Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new record with old value in storage, for new record {HoodieRecord{key=HoodieKey { r
      ecordKey=id:1 partitionPath=}, currentLocation='HoodieRecordLocation {instantTime=20220810095846644, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', newLocation='HoodieRecordLocation {instantTime=20220810101719437, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": "20220810095824514_0_0", "_hoodie_record_key": "id:1", "_hoodie_partition_path": "", "_hoodie_file_name": "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", "id": 1, "name": "a0", "ts": 1000}} at java.util.concurrent.FutureTask.report(FutureTask.java:122)
      at java.util.concurrent.FutureTask.get(FutureTask.java:192)
      at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:155)
      ... 32 more
      Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new record with old value in storage, for new record {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, currentLoc
      ation='HoodieRecordLocation {instantTime=20220810095846644, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', newLocation='HoodieRecordLocation {instantTime=20220810101719437, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": "20220810095824514_0_0", "_hoodie_record_key": "id:1", "_hoodie_partition_path": "", "_hoodie_file_name": "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", "id": 1, "name": "a0", "ts": 1000}} at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:351)
      at org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:122)
      at org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:112)
      at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
      at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      ... 3 more
      Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
      at java.lang.Long.compareTo(Long.java:54)
      at org.apache.hudi.common.model.DefaultHoodieRecordPayload.needUpdatingPersistedRecord(DefaultHoodieRecordPayload.java:139)
      at org.apache.hudi.common.model.DefaultHoodieRecordPayload.combineAndGetUpdateValue(DefaultHoodieRecordPayload.java:62)
      at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:332)
      ... 8 more3071575 [task-result-getter-2] ERROR org.apache.spark.scheduler.TaskSetManager - Task 0 in stage 996.0 failed 4 times; aborting job
      org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 996.0 failed 4 times, most recent failure: Lost task 0.3 in stage 996.0 (TID 27262) (szzb-bg-uat-sdp-hadoop-05 executor 8): or
      g.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
      at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
      at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
      at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
      at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
      at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
      at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
      at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
      at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
      at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
      at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
      at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
      at org.apache.spark.scheduler.Task.run(Task.scala:131)
      at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merg
      e new record with old value in storage, for new record {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, currentLocation='HoodieRecordLocation {instantTime=20220810095846644, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', newLocation='HoodieRecordLocation {instantTime=20220810101719437, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": "20220810095824514_0_0", "_hoodie_record_key": "id:1", "_hoodie_partition_path": "", "_hoodie_file_name": "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", "id": 1, "name": "a0", "ts": 1000}} at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
      at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
      at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
      at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
      ... 28 more 

      This occurs b/c DefaultHoodieRecordPayload somehow overrides the setting of ExpressionPayload that have to be used to appropriately handle MERGE INTO statement.

       

      Steps to reproduce:

      --step 1: create table 
      drop table hudi_cow_pk_cbfield_tbl;
      create table hudi_cow_pk_cbfield_tbl (
        id bigint,
        name string,
        ts bigint
      ) using hudi
      tblproperties (
        type = 'cow',
        primaryKey = 'id',
        preCombineField = 'ts',
        hoodie.datasource.write.hive_style_partitioning = false,
        hoodie.datasource.write.operation = 'upsert',
        hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload'
       )
      ;
      
      --step 2: insert into a recored with primaryKey=1, preCombineField=1000
      insert into hudi_cow_pk_cbfield_tbl select 1, 'a0', 1000;
      --step 3: 'insert'  with same primaryKey, but change the preCombineField value to the smaller value 100,  the action not effect expectedly (note:is normal for setting the param: hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload')
      insert into hudi_cow_pk_cbfield_tbl select 1, 'a0_new', 100;
      select * from hudi_cow_pk_cbfield_tbl;
      
      --step 4: 'update' action with same primaryKey, but change the preCombineField value to the smaller value 20, the action not effect expectedly (note:is normal for setting the param: hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload')
      update hudi_cow_pk_cbfield_tbl set name='a1_new',ts=20  where id= 1;
      select * from hudi_cow_pk_cbfield_tbl;
      
      --step 5: 'merge into' action with same primaryKey, occure a error: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
      merge into hudi_cow_pk_cbfield_tbl as target
      using (select 1 as id,'a1_merge' as name,2000 as ts) as source
      on target.id = source.id
      when matched then update set *
      when not matched then insert *
      ; 

      Attachments

        Activity

          People

            fengjian_428 Jian Feng
            alexey.kudinkin Alexey Kudinkin
            Alexey Kudinkin
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: