Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-21165

Fail to write into partitioned hive table due to attribute reference not working with cast on partition column

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 2.2.0
    • Fix Version/s: 2.2.0, 2.3.0
    • Component/s: SQL
    • Labels:
      None

      Description

      A simple "insert into ... select" involving partitioned hive tables fails. Here's a simpler repro which doesn't involve hive at all – this succeeds on 2.1.1, but fails on 2.2.0-rc5:

      spark.sql("""SET hive.exec.dynamic.partition.mode=nonstrict""")
      spark.sql("""DROP TABLE IF EXISTS src""")
      spark.sql("""DROP TABLE IF EXISTS dest""")
      spark.sql("""
      CREATE TABLE src (first string, word string)
        PARTITIONED BY (length int)
      """)
      
      spark.sql("""
      INSERT INTO src PARTITION(length) VALUES
        ('a', 'abc', 3),
        ('b', 'bcde', 4),
        ('c', 'cdefg', 5)
      """)
      
      spark.sql("""
        CREATE TABLE dest (word string, length int)
          PARTITIONED BY (first string)
      """)
      
      spark.sql("""
        INSERT INTO TABLE dest PARTITION(first) SELECT word, length, cast(first as string) as first FROM src
      """)
      

      The exception is

      17/06/21 14:25:53 WARN TaskSetManager: Lost task 1.0 in stage 4.0 (TID 10, localhost, executor driver): org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute
      , tree: first#74
              at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
              at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
              at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
              at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
              at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
              at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
              at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
              at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
              at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
              at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
              at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
              at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
              at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
              at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
              at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
              at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$$anonfun$bind$1.apply(GenerateOrdering.scala:49)
              at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$$anonfun$bind$1.apply(GenerateOrdering.scala:49)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
              at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
              at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
              at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
              at scala.collection.AbstractTraversable.map(Traversable.scala:104)
              at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.bind(GenerateOrdering.scala:49)
              at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.bind(GenerateOrdering.scala:43)
              at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:884)
              at org.apache.spark.sql.execution.SparkPlan.newOrdering(SparkPlan.scala:363)
              at org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:63)
              at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:102)
              at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
              at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
              at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
              at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
              at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
              at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
              at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
              at org.apache.spark.scheduler.Task.run(Task.scala:108)
              at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:320)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.RuntimeException: Couldn't find first#74 in [word#76,length#77,first#75]
              at scala.sys.package$.error(package.scala:27)
              at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94)
              at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88)
              at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
              ... 40 more
      

      The key to making this fail is the cast(first as string) as first. Doing the same thing on any other column doesn't matter, it only matters on first (which is the partition column for the destination table).

      Here's the explain plan from 2.2.0:

      == Parsed Logical Plan ==
      'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), false, false
      +- 'Project ['word, 'length, cast('first as string) AS first#85]
         +- 'UnresolvedRelation `src`
      
      == Analyzed Logical Plan ==
      InsertIntoHiveTable CatalogTable(
      Database: default
      Table: dest
      Owner: irashid
      Created: Wed Jun 21 14:25:13 CDT 2017
      Last Access: Wed Dec 31 18:00:00 CST 1969
      Type: MANAGED
      Provider: hive
      Properties: [serialization.format=1]
      Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest
      Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      InputFormat: org.apache.hadoop.mapred.TextInputFormat
      OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
      Partition Provider: Catalog
      Partition Columns: [`first`]
      Schema: root
      -- word: string (nullable = true)
      -- length: integer (nullable = true)
      -- first: string (nullable = true)
      ), Map(first -> None), false, false
         +- Project [word#89, length#90, cast(first#88 as string) AS first#85]
            +- SubqueryAlias src
               +- CatalogRelation CatalogTable(
      Database: default
      Table: src
      Owner: irashid
      Created: Wed Jun 21 14:25:11 CDT 2017
      Last Access: Wed Dec 31 18:00:00 CST 1969
      Type: MANAGED
      Provider: hive
      Properties: [serialization.format=1]
      Statistics: 9223372036854775807 bytes
      Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src
      Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      InputFormat: org.apache.hadoop.mapred.TextInputFormat
      OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
      Partition Provider: Catalog
      Partition Columns: [`length`]
      Schema: root
      -- first: string (nullable = true)
      -- word: string (nullable = true)
      -- length: integer (nullable = true)
      ), [first#88, word#89], [length#90]
      
      == Optimized Logical Plan ==                                                                                                                                                    [68/2430]
      InsertIntoHiveTable CatalogTable(
      Database: default
      Table: dest
      Owner: irashid
      Created: Wed Jun 21 14:25:13 CDT 2017
      Last Access: Wed Dec 31 18:00:00 CST 1969
      Type: MANAGED
      Provider: hive
      Properties: [serialization.format=1]
      Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest
      Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      InputFormat: org.apache.hadoop.mapred.TextInputFormat
      OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
      Partition Provider: Catalog
      Partition Columns: [`first`]
      Schema: root
      -- word: string (nullable = true)
      -- length: integer (nullable = true)
      -- first: string (nullable = true)
      ), Map(first -> None), false, false
         +- Project [word#89, length#90, cast(first#88 as string) AS first#85]
            +- SubqueryAlias src
               +- CatalogRelation CatalogTable(
      Database: default
      Table: src
      Owner: irashid
      Created: Wed Jun 21 14:25:11 CDT 2017
      Last Access: Wed Dec 31 18:00:00 CST 1969
      Type: MANAGED
      Provider: hive
      Properties: [serialization.format=1]
      Statistics: 9223372036854775807 bytes
      Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src
      Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      InputFormat: org.apache.hadoop.mapred.TextInputFormat
      OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
      Partition Provider: Catalog
      Partition Columns: [`length`]
      Schema: root
      -- first: string (nullable = true)
      -- word: string (nullable = true)
      -- length: integer (nullable = true)
      ), [first#88, word#89], [length#90]
      
      == Physical Plan ==
      ExecutedCommand
         +- InsertIntoHiveTable CatalogTable(
      Database: default
      Table: dest
      Owner: irashid
      Created: Wed Jun 21 14:25:13 CDT 2017
      Last Access: Wed Dec 31 18:00:00 CST 1969
      Type: MANAGED
      Provider: hive
      Properties: [serialization.format=1]
      Location: file:/Users/irashid/github/pub/spark/spark-warehouse/dest
      Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      InputFormat: org.apache.hadoop.mapred.TextInputFormat
      OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
      Partition Provider: Catalog
      Partition Columns: [`first`]
      Schema: root
      -- word: string (nullable = true)
      -- length: integer (nullable = true)
      -- first: string (nullable = true)
      ), Map(first -> None), false, false
               +- Project [word#89, length#90, cast(first#88 as string) AS first#85]
                  +- SubqueryAlias src
                     +- CatalogRelation CatalogTable(
      Database: default
      Table: src
      Owner: irashid
      Created: Wed Jun 21 14:25:11 CDT 2017
      Last Access: Wed Dec 31 18:00:00 CST 1969
      Type: MANAGED
      Provider: hive
      Properties: [serialization.format=1]
      Statistics: 9223372036854775807 bytes
      Location: file:/Users/irashid/github/pub/spark/spark-warehouse/src
      Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      InputFormat: org.apache.hadoop.mapred.TextInputFormat
      OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
      Partition Provider: Catalog
      Partition Columns: [`length`]
      Schema: root
      -- first: string (nullable = true)
      -- word: string (nullable = true)
      -- length: integer (nullable = true)
      ), [first#88, word#89], [length#90]|
      

      And from 2.1.1:

      == Parsed Logical Plan ==
      'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), OverwriteOptions(false,Map()), false
      +- 'Project ['word, 'length, cast('first as string) AS first#55]
         +- 'UnresolvedRelation `src`
      
      == Analyzed Logical Plan ==
      InsertIntoTable MetastoreRelation default, dest, Map(first -> None), OverwriteOptions(false,Map()), false
      +- Project [word#60, length#58, cast(first#59 as string) AS first#55]
         +- MetastoreRelation default, src
      
      == Optimized Logical Plan ==
      InsertIntoTable MetastoreRelation default, dest, Map(first -> None), OverwriteOptions(false,Map()), false
      +- Project [word#60, length#58, first#59]
         +- MetastoreRelation default, src
      
      == Physical Plan ==
      InsertIntoHiveTable MetastoreRelation default, dest, Map(first -> None), false, false
      +- HiveTableScan [word#60, length#58, first#59], MetastoreRelation default, src
      

      While this example query is somewhat contrived, this is really iimportant because if you try to do the same thing where src was created by hive, then the query fails with the same error. In that case, the explain plan looks like:

      == Parsed Logical Plan ==
      'InsertIntoTable 'UnresolvedRelation `dest`, Map(first -> None), false, false
      +- 'Project ['word, 'length, 'first]
         +- 'UnresolvedRelation `src`
      
      == Analyzed Logical Plan ==
      InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Map(first -> None), false, false
         +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55, cast(first#48 as string) AS first#56]
            +- Project [word#49, length#50, first#48]
               +- SubqueryAlias src
                  +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [first#48, word#49], [length#50]
      
      == Optimized Logical Plan ==
      InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Map(first -> None), false, false
         +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55, cast(first#48 as string) AS first#56]
            +- Project [word#49, length#50, first#48]
               +- SubqueryAlias src
                  +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [first#48, word#49], [length#50]
      
      == Physical Plan ==
      ExecutedCommand
         +- InsertIntoHiveTable `my_test`.`dest`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Map(first -> None), false, false
               +- Project [cast(word#49 as string) AS word#54, cast(length#50 as int) AS length#55, cast(first#48 as string) AS first#56]
                  +- Project [word#49, length#50, first#48]
                     +- SubqueryAlias src
                        +- CatalogRelation `my_test`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [first#48, word#49], [length#50]
      

        Attachments

          Activity

            People

            • Assignee:
              smilegator Xiao Li
              Reporter:
              irashid Imran Rashid
            • Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: