Details
-
Task
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
Description
MERGE INTO command doesn't select the correct primary key for tables created using CTAS, whereas it works for tables created using CREATE TABLE command.
I guess we are hitting this issue because the key generator class is set to SqlKeyGenerator for tables created using CTAS:
working use-case:
create table h5 (id bigint, name string, ts bigint) using hudi options (type = "cow" , primaryKey="id" , preCombineField="ts" ); merge into h5 as t0 using ( select 5 as s_id, 'vinoth' as s_name, current_timestamp() as s_ts ) t1 on t1.s_id = t0.id when matched then update set * when not matched then insert *;
hoodie.properties for working use-case:
➜ analytics.db git:(apache_hudi_support) cat h5/.hoodie/hoodie.properties #Properties saved on Wed Aug 25 04:10:33 UTC 2021 #Wed Aug 25 04:10:33 UTC 2021 hoodie.table.name=h5 hoodie.table.recordkey.fields=id hoodie.table.type=COPY_ON_WRITE hoodie.table.precombine.field=ts hoodie.table.partition.fields= hoodie.archivelog.folder=archived hoodie.table.create.schema={"type"\:"record","name"\:"topLevelRecord","fields"\:[{"name"\:"_hoodie_commit_time","type"\:["string","null"]},{"name"\:"_hoodie_commit_seqno","type"\:["string","null"]},{"name"\:"_hoodie_record_key","type"\:["string","null"]},{"name"\:"_hoodie_partition_path","type"\:["string","null"]},{"name"\:"_hoodie_file_name","type"\:["string","null"]},{"name"\:"id","type"\:["long","null"]},{"name"\:"name","type"\:["string","null"]},{"name"\:"ts","type"\:["long","null"]}]} hoodie.timeline.layout.version=1 hoodie.table.version=1
Whereas this doesn't work:
create table h4 using hudi options (type = "cow" , primaryKey="id" , preCombineField="ts" ) as select 5 as id, cast(rand() as string) as name, current_timestamp(); merge into h3 as t0u sing (select '5' as s_id, 'vinoth' as s_name, current_timestamp() as s_ts) t1 on t1.s_id = t0.id when matched then update set * when not matched then insert *; ========ERROR LOG==================== 544702 [main] ERROR org.apache.spark.sql.hive.thriftserver.SparkSQLDriver - Failed in [merge into analytics.h3 as t0using ( select '5' as s_id, 'vinoth' as s_name, current_timestamp() as s_ts) t1on t1.s_id = t0.idwhen matched then update set *when not matched then insert *]java.lang.IllegalArgumentException: Merge Key[id] is not Equal to the defined primary key[] in table h3 at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.buildMergeIntoConfig(MergeIntoHoodieTableCommand.scala:425) at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:147) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650) at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)java.lang.IllegalArgumentException: Merge Key[id] is not Equal to the defined primary key[] in table h3 at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.buildMergeIntoConfig(MergeIntoHoodieTableCommand.scala:425) at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:147) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650) at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
hoodie.properties for not working use-case (CTAS table):
➜ analytics.db git:(apache_hudi_support) cat h4/.hoodie/hoodie.properties
#Properties saved on Wed Aug 25 04:09:37 UTC 2021
#Wed Aug 25 04:09:37 UTC 2021
hoodie.table.name=h4
hoodie.table.recordkey.fields=id
hoodie.table.type=COPY_ON_WRITE
hoodie.table.precombine.field=ts
hoodie.table.partition.fields=
hoodie.archivelog.folder=archived
hoodie.populate.meta.fields=true
hoodie.table.keygenerator.class=org.apache.spark.sql.hudi.command.SqlKeyGenerator
hoodie.table.base.file.format=PARQUET
hoodie.timeline.layout.version=1
hoodie.table.version=1
This is a blocker for the dbt integration https://issues.apache.org/jira/browse/HUDI-2319
Please try to fix it as part of the 0.9.0 release so that the dbt integration can be unblocked.
Attachments
Issue Links
- links to