Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-3282

Fix delete exception for Spark SQL when sync Hive

    XMLWordPrintableJSON

Details

    Description

      ```
      hudi 0.11.0 master build
      spark: 2.4.5
      ```
      ```bash
      hive
      create database test_hudi;
      ```
      ```scala
      spark-shell -master yarn --deploy-mode client --executor-memory 2G --num-executors 3 --executor-cores 2 --driver-memory 4G --driver-cores 2 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'  conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --principal ..  -keytab ..

      import org.apache.hudi.DataSourceWriteOptions._
      import org.apache.hudi.QuickstartUtils.{DataGenerator, convertToStringList, getQuickstartWriteConfigs}
      import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
      import org.apache.spark.sql.SaveMode._
      import org.apache.spark.sql.{SaveMode, SparkSession}
      import org.apache.spark.sql.functions.lit
      import org.apache.hudi.DataSourceReadOptions._
      import org.apache.hudi.config.HoodieWriteConfig
      import org.apache.hudi.keygen.SimpleKeyGenerator
      import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodiePayloadProps}
      import org.apache.hudi.io.HoodieMergeHandle
      import org.apache.hudi.common.table.HoodieTableConfig
      import org.apache.spark.sql.functions._

      import spark.implicits._
      val df = Seq((1, "a1", 10, 1000, "2022-01-19")).toDF("id", "name", "value", "ts", "dt")

      df.write.format("hudi").
      option(HoodieWriteConfig.TBL_NAME.key, "test_hudi_table_sync_hive").
      option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL).
      option(RECORDKEY_FIELD.key, "id").
      option(PRECOMBINE_FIELD.key, "ts").
      option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
      option("hoodie.datasource.write.partitionpath.field", "").
      option("hoodie.metadata.enable", false).
      option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.ComplexKeyGenerator").
      option(META_SYNC_ENABLED.key(), true).
      option(HIVE_USE_JDBC.key(), false).
      option(HIVE_DATABASE.key(), "test_hudi").
      option(HIVE_AUTO_CREATE_DATABASE.key(), true).
      option(HIVE_TABLE.key(), "test_hudi_table_sync_hive").
      option(HIVE_PARTITION_EXTRACTOR_CLASS.key(), "org.apache.hudi.hive.MultiPartKeysValueExtractor").
      mode("overwrite").
      save("/test_hudi/test_hudi_table_sync_hive")
      ```
      ```

      1. hoodie.properties
        hoodie.table.precombine.field=ts
        hoodie.table.partition.fields=
        hoodie.table.type=COPY_ON_WRITE
        hoodie.archivelog.folder=archived
        hoodie.populate.meta.fields=true
        hoodie.timeline.layout.version=1
        hoodie.table.version=3
        hoodie.table.recordkey.fields=id
        hoodie.table.base.file.format=PARQUET
        hoodie.table.timeline.timezone=LOCAL
        hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
        hoodie.table.name=test_hudi_table_sync_hive
        hoodie.datasource.write.hive_style_partitioning=false
        ```

      hive
      ```sql
      show create table test_hudi_table_sync_hive;
      ----------------------------------------------------

                        createtab_stmt                  

      ----------------------------------------------------

      CREATE EXTERNAL TABLE `test_hudi_table_sync_hive`(
        `_hoodie_commit_time` string,                    
        `_hoodie_commit_seqno` string,                  
        `_hoodie_record_key` string,                    
        `_hoodie_partition_path` string,                
        `_hoodie_file_name` string,                      
        `id` int,                                        
        `name` string,                                  
        `value` int,                                    
        `ts` int,                                        
        `dt` string)                                    
      ROW FORMAT SERDE                                  
        'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  
      WITH SERDEPROPERTIES (                            
        'hoodie.query.as.ro.table'='false',              
        'path'='/test_hudi/test_hudi_table_sync_hive')  
      STORED AS INPUTFORMAT                              
        'org.apache.hudi.hadoop.HoodieParquetInputFormat'  
      OUTPUTFORMAT                                      
        'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
      LOCATION                                          
        'hdfs://cluster1/test_hudi/test_hudi_table_sync_hive'
      TBLPROPERTIES (                                    
        'last_commit_time_sync'='20220119110215185',    
        'spark.sql.sources.provider'='hudi',            
        'spark.sql.sources.schema.numParts'='1',        
        'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},\{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},\{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},\{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},\{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},\{"name":"id","type":"integer","nullable":false,"metadata":{}},\{"name":"name","type":"string","nullable":true,"metadata":{}},\{"name":"value","type":"integer","nullable":false,"metadata":{}},\{"name":"ts","type":"integer","nullable":false,"metadata":{}},\{"name":"dt","type":"string","nullable":true,"metadata":{}}]}',  
        'transient_lastDdlTime'='1642561355')            

      ----------------------------------------------------
      28 rows selected (0.429 seconds)

      ```

      ```sql
      spark-sql -master yarn --deploy-mode client  conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'  -conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --principal .. --keytab ..

      delete from test_hudi.test_hudi_table_sync_hive where id=1;
      ```

      exception:
      ```
      : org.apache.hudi.exception.HoodieKeyException: recordKey values: "uuid:_null_" for fields: [uuid] cannot be entirely null or empty.
              at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:109)
              at org.apache.hudi.keygen.ComplexAvroKeyGenerator.getRecordKey(ComplexAvroKeyGenerator.java:43)
              at org.apache.hudi.keygen.ComplexKeyGenerator.getRecordKey(ComplexKeyGenerator.java:49)
              at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:65)
              at org.apache.spark.sql.hudi.command.SqlKeyGenerator.getRecordKey(SqlKeyGenerator.scala:64)
              at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:65)
              at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$1.apply(HoodieSparkSqlWriter.scala:170)
              at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$1.apply(HoodieSparkSqlWriter.scala:170)
              at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
              at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
              at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:193)
              at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
              at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
              at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
              at org.apache.spark.scheduler.Task.run(Task.scala:123)
              at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
              at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
              at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)

              at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:109)
              at org.apache.hudi.keygen.ComplexAvroKeyGenerator.getRecordKey(ComplexAvroKeyGenerator.java:43)
              at org.apache.hudi.keygen.ComplexKeyGenerator.getRecordKey(ComplexKeyGenerator.java:49)
              at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:65)
              at org.apache.spark.sql.hudi.command.SqlKeyGenerator.getRecordKey(SqlKeyGenerator.scala:64)
              at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:65)
              at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$1.apply(HoodieSparkSqlWriter.scala:170)
              at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$1.apply(HoodieSparkSqlWriter.scala:170)
              at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
              at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
              at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:193)
              at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
              at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
              at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
              at org.apache.spark.scheduler.Task.run(Task.scala:123)
              at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
              at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
              at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)

      Driver stacktrace:
      22/01/19 13:33:39 INFO DAGScheduler: Job 60 failed: countByKey at HoodieJavaPairRDD.java:103, took 0.671469 s
      22/01/19 13:33:39 ERROR SparkSQLDriver: Failed in [delete from test_hudi.test_hudi_table_sync_hive where id = 1]
      org.apache.hudi.exception.HoodieUpsertException: Failed to delete for commit time 20220119133338404
              at org.apache.hudi.table.action.commit.SparkDeleteHelper.execute(SparkDeleteHelper.java:120)
              at org.apache.hudi.table.action.commit.SparkDeleteCommitActionExecutor.execute(SparkDeleteCommitActionExecutor.java:46)
              at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.delete(HoodieSparkCopyOnWriteTable.java:136)
              at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.delete(HoodieSparkCopyOnWriteTable.java:103)
              at org.apache.hudi.client.SparkRDDWriteClient.delete(SparkRDDWriteClient.java:256)
              at org.apache.hudi.DataSourceUtils.doDeleteOperation(DataSourceUtils.java:226)
              at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:191)
              at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
              at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
              at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
              at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
              at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
              at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
              at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
              at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
              at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
              at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
              at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
              at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
              at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
              at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
              at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
              at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
              at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
              at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
              at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
              at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
              at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
              at org.apache.spark.sql.hudi.command.DeleteHoodieTableCommand.run(DeleteHoodieTableCommand.scala:51)
              at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
              at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
              at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
              at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
              at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
              at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
              at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
              at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
              at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
              at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
              at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
              at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
              at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
              at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694)
              at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:62)
              at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:371)
              at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
              at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:274)
              at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
              at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
              at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
              at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
              at java.lang.reflect.Method.invoke(Method.java:498)
              at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
              at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
              at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
              at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
              at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
              at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
              at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
              at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
      ```

      Attachments

        Issue Links

          Activity

            People

              dongkelun 董可伦
              dongkelun 董可伦
              Shiyan Xu, Yann Byron
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: