Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-1658 [UMBRELLA] Spark Sql Support For Hudi
  3. HUDI-2357

MERGE INTO doesn't work for tables created using CTAS

    XMLWordPrintableJSON

Details

    Description

      MERGE INTO command doesn't select the correct primary key for tables created using CTAS, whereas it works for tables created using CREATE TABLE command.

      I guess we are hitting this issue because the key generator class is set to SqlKeyGenerator for tables created using CTAS:

      working use-case:

      create table h5 (id bigint, name string, ts bigint) using hudi
      options (type = "cow" , primaryKey="id" , preCombineField="ts" );
      
      merge into h5 as t0
      using (
          select 5 as s_id, 'vinoth' as s_name, current_timestamp() as s_ts
      ) t1
      on t1.s_id = t0.id
      when matched then update set * 
      when not matched then insert *;
      

      hoodie.properties for working use-case:

      ➜  analytics.db git:(apache_hudi_support) cat h5/.hoodie/hoodie.properties
      #Properties saved on Wed Aug 25 04:10:33 UTC 2021
      #Wed Aug 25 04:10:33 UTC 2021
      hoodie.table.name=h5
      hoodie.table.recordkey.fields=id
      hoodie.table.type=COPY_ON_WRITE
      hoodie.table.precombine.field=ts
      hoodie.table.partition.fields=
      hoodie.archivelog.folder=archived
      hoodie.table.create.schema={"type"\:"record","name"\:"topLevelRecord","fields"\:[{"name"\:"_hoodie_commit_time","type"\:["string","null"]},{"name"\:"_hoodie_commit_seqno","type"\:["string","null"]},{"name"\:"_hoodie_record_key","type"\:["string","null"]},{"name"\:"_hoodie_partition_path","type"\:["string","null"]},{"name"\:"_hoodie_file_name","type"\:["string","null"]},{"name"\:"id","type"\:["long","null"]},{"name"\:"name","type"\:["string","null"]},{"name"\:"ts","type"\:["long","null"]}]}
      hoodie.timeline.layout.version=1
      hoodie.table.version=1

       

      Whereas this doesn't work:

      create table h4 using hudi options (type = "cow" , primaryKey="id" , preCombineField="ts" ) as select 5 as id, cast(rand() as string) as name, current_timestamp();
      
      merge into h3 as t0u sing (select '5' as s_id, 'vinoth' as s_name, current_timestamp() as s_ts) t1 on t1.s_id = t0.id when matched then update set * when not matched then insert *;
      
      ========ERROR LOG====================
      544702 [main] ERROR org.apache.spark.sql.hive.thriftserver.SparkSQLDriver  - Failed in [merge into analytics.h3 as t0using (    select '5' as s_id, 'vinoth' as s_name, current_timestamp() as s_ts) t1on t1.s_id = t0.idwhen matched then update set *when not matched then insert *]java.lang.IllegalArgumentException: Merge Key[id] is not Equal to the defined primary key[] in table h3 at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.buildMergeIntoConfig(MergeIntoHoodieTableCommand.scala:425) at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:147) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650) at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)java.lang.IllegalArgumentException: Merge Key[id] is not Equal to the defined primary key[] in table h3 at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.buildMergeIntoConfig(MergeIntoHoodieTableCommand.scala:425) at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:147) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650) at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

      hoodie.properties for not working use-case (CTAS table):

      ➜  analytics.db git:(apache_hudi_support) cat h4/.hoodie/hoodie.properties
      #Properties saved on Wed Aug 25 04:09:37 UTC 2021
      #Wed Aug 25 04:09:37 UTC 2021
      hoodie.table.name=h4
      hoodie.table.recordkey.fields=id
      hoodie.table.type=COPY_ON_WRITE
      hoodie.table.precombine.field=ts
      hoodie.table.partition.fields=
      hoodie.archivelog.folder=archived
      hoodie.populate.meta.fields=true
      hoodie.table.keygenerator.class=org.apache.spark.sql.hudi.command.SqlKeyGenerator
      hoodie.table.base.file.format=PARQUET
      hoodie.timeline.layout.version=1
      hoodie.table.version=1

      This is a blocker for the dbt integration https://issues.apache.org/jira/browse/HUDI-2319

      Please try to fix it as part of the 0.9.0 release so that the dbt integration can be unblocked.

       

      Attachments

        Issue Links

          Activity

            People

              pzw2018 pengzhiwei
              vino Vinoth Govindarajan
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: