Description
A simple "insert into ... select" involving partitioned hive tables fails. Here's a simpler repro which doesn't involve hive at all – this succeeds on 2.1.1, but fails on 2.2.0-rc5:
spark.sql("""SET hive.exec.dynamic.partition.mode=nonstrict""") spark.sql("""DROP TABLE IF EXISTS src""") spark.sql("""DROP TABLE IF EXISTS dest""") spark.sql(""" CREATE TABLE src (first string, word string) PARTITIONED BY (length int) """) spark.sql(""" INSERT INTO src PARTITION(length) VALUES ('a', 'abc', 3), ('b', 'bcde', 4), ('c', 'cdefg', 5) """) spark.sql(""" CREATE TABLE dest (word string, length int) PARTITIONED BY (first string) """) spark.sql(""" INSERT INTO TABLE dest PARTITION(first) SELECT word, length, cast(first as string) as first FROM src """)
The exception is
17/06/21 14:25:53 WARN TaskSetManager: Lost task 1.0 in stage 4.0 (TID 10, localhost, executor driver): org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute , tree: first#74 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$$anonfun$bind$1.apply(GenerateOrdering.scala:49) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$$anonfun$bind$1.apply(GenerateOrdering.scala:49) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.bind(GenerateOrdering.scala:49) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.bind(GenerateOrdering.scala:43) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:884) at org.apache.spark.sql.execution.SparkPlan.newOrdering(SparkPlan.scala:363) at org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:63) at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:102) at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:320) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Couldn't find first#74 in [word#76,length#77,first#75] at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 40 more
The key to making this fail is the cast(first as string) as first. Doing the same thing on any other column doesn't matter, it only matters on first (which is the partition column for the destination table).
Here's the explain plan from 2.2.0:
== Parsed Logical Plan == 'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), false, false +- 'Project ['word, 'length, cast('first as string) AS first#85] +- 'UnresolvedRelation `src` == Analyzed Logical Plan == InsertIntoHiveTable CatalogTable( Database: default Table: dest Owner: irashid Created: Wed Jun 21 14:25:13 CDT 2017 Last Access: Wed Dec 31 18:00:00 CST 1969 Type: MANAGED Provider: hive Properties: [serialization.format=1] Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Partition Provider: Catalog Partition Columns: [`first`] Schema: root -- word: string (nullable = true) -- length: integer (nullable = true) -- first: string (nullable = true) ), Map(first -> None), false, false +- Project [word#89, length#90, cast(first#88 as string) AS first#85] +- SubqueryAlias src +- CatalogRelation CatalogTable( Database: default Table: src Owner: irashid Created: Wed Jun 21 14:25:11 CDT 2017 Last Access: Wed Dec 31 18:00:00 CST 1969 Type: MANAGED Provider: hive Properties: [serialization.format=1] Statistics: 9223372036854775807 bytes Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Partition Provider: Catalog Partition Columns: [`length`] Schema: root -- first: string (nullable = true) -- word: string (nullable = true) -- length: integer (nullable = true) ), [first#88, word#89], [length#90] == Optimized Logical Plan == [68/2430] InsertIntoHiveTable CatalogTable( Database: default Table: dest Owner: irashid Created: Wed Jun 21 14:25:13 CDT 2017 Last Access: Wed Dec 31 18:00:00 CST 1969 Type: MANAGED Provider: hive Properties: [serialization.format=1] Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Partition Provider: Catalog Partition Columns: [`first`] Schema: root -- word: string (nullable = true) -- length: integer (nullable = true) -- first: string (nullable = true) ), Map(first -> None), false, false +- Project [word#89, length#90, cast(first#88 as string) AS first#85] +- SubqueryAlias src +- CatalogRelation CatalogTable( Database: default Table: src Owner: irashid Created: Wed Jun 21 14:25:11 CDT 2017 Last Access: Wed Dec 31 18:00:00 CST 1969 Type: MANAGED Provider: hive Properties: [serialization.format=1] Statistics: 9223372036854775807 bytes Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Partition Provider: Catalog Partition Columns: [`length`] Schema: root -- first: string (nullable = true) -- word: string (nullable = true) -- length: integer (nullable = true) ), [first#88, word#89], [length#90] == Physical Plan == ExecutedCommand +- InsertIntoHiveTable CatalogTable( Database: default Table: dest Owner: irashid Created: Wed Jun 21 14:25:13 CDT 2017 Last Access: Wed Dec 31 18:00:00 CST 1969 Type: MANAGED Provider: hive Properties: [serialization.format=1] Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Partition Provider: Catalog Partition Columns: [`first`] Schema: root -- word: string (nullable = true) -- length: integer (nullable = true) -- first: string (nullable = true) ), Map(first -> None), false, false +- Project [word#89, length#90, cast(first#88 as string) AS first#85] +- SubqueryAlias src +- CatalogRelation CatalogTable( Database: default Table: src Owner: irashid Created: Wed Jun 21 14:25:11 CDT 2017 Last Access: Wed Dec 31 18:00:00 CST 1969 Type: MANAGED Provider: hive Properties: [serialization.format=1] Statistics: 9223372036854775807 bytes Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Partition Provider: Catalog Partition Columns: [`length`] Schema: root -- first: string (nullable = true) -- word: string (nullable = true) -- length: integer (nullable = true) ), [first#88, word#89], [length#90]|
And from 2.1.1:
== Parsed Logical Plan == 'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), OverwriteOptions(false,Map()), false +- 'Project ['word, 'length, cast('first as string) AS first#55] +- 'UnresolvedRelation `src` == Analyzed Logical Plan == InsertIntoTable MetastoreRelation default, dest, Map(first -> None), OverwriteOptions(false,Map()), false +- Project [word#60, length#58, cast(first#59 as string) AS first#55] +- MetastoreRelation default, src == Optimized Logical Plan == InsertIntoTable MetastoreRelation default, dest, Map(first -> None), OverwriteOptions(false,Map()), false +- Project [word#60, length#58, first#59] +- MetastoreRelation default, src == Physical Plan == InsertIntoHiveTable MetastoreRelation default, dest, Map(first -> None), false, false +- HiveTableScan [word#60, length#58, first#59], MetastoreRelation default, src
While this example query is somewhat contrived, this is really iimportant because if you try to do the same thing where src was created by hive, then the query fails with the same error. In that case, the explain plan looks like:
== Parsed Logical Plan == 'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), false, false +- 'Project ['word, 'length, 'first] +- 'UnresolvedRelation `src` == Analyzed Logical Plan == InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Map(first -> None), false, false +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55, cast(first#48 as string) AS first#56] +- Project [word#49, length#50, first#48] +- SubqueryAlias src +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [first#48, word#49], [length#50] == Optimized Logical Plan == InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Map(first -> None), false, false +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55, cast(first#48 as string) AS first#56] +- Project [word#49, length#50, first#48] +- SubqueryAlias src +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [first#48, word#49], [length#50] == Physical Plan == ExecutedCommand +- InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Map(first -> None), false, false +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55, cast(first#48 as string) AS first#56] +- Project [word#49, length#50, first#48] +- SubqueryAlias src +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [first#48, word#49], [length#50]