Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-3414

Case insensitivity breaks when unresolved relation contains attributes with uppercase letters in their names

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 1.0.2
    • 1.2.0
    • SQL
    • None

    Description

      Paste the following snippet to spark-shell (need Hive support) to reproduce this issue:

      import org.apache.spark.sql.hive.HiveContext
      
      val hiveContext = new HiveContext(sc)
      import hiveContext._
      
      case class LogEntry(filename: String, message: String)
      case class LogFile(name: String)
      
      sc.makeRDD(Seq.empty[LogEntry]).registerTempTable("rawLogs")
      sc.makeRDD(Seq.empty[LogFile]).registerTempTable("logFiles")
      
      val srdd = sql(
        """
          SELECT name, message
          FROM rawLogs
          JOIN (
            SELECT name
            FROM logFiles
          ) files
          ON rawLogs.filename = files.name
        """)
      
      srdd.registerTempTable("boom")
      sql("select * from boom")
      

      Exception thrown:

      SchemaRDD[7] at RDD at SchemaRDD.scala:103
      == Query Plan ==
      == Physical Plan ==
      org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: *, tree:
      Project [*]
       LowerCaseSchema
        Subquery boom
         Project ['name,'message]
          Join Inner, Some(('rawLogs.filename = name#2))
           LowerCaseSchema
            Subquery rawlogs
             SparkLogicalPlan (ExistingRdd [filename#0,message#1], MapPartitionsRDD[1] at mapPartitions at basicOperators.scala:208)
           Subquery files
            Project [name#2]
             LowerCaseSchema
              Subquery logfiles
               SparkLogicalPlan (ExistingRdd [name#2], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:208)
      

      Notice that rawLogs in the join operator is not lowercased.

      The reason is that, during analysis phase, the CaseInsensitiveAttributeReferences batch is only executed before the Resolution batch. And when srdd is registered as temporary table boom, its original (unanalyzed) logical plan is stored into the catalog:

      Join Inner, Some(('rawLogs.filename = 'files.name))
       UnresolvedRelation None, rawLogs, None
       Subquery files
        Project ['name]
         UnresolvedRelation None, logFiles, None
      

      notice that attributes referenced in the join operator (esp. rawLogs) is not lowercased yet.

      And then, when select * from boom is been analyzed, its input logical plan is:

      Project [*]
       UnresolvedRelation None, boom, None
      

      here the unresolved relation points to the unanalyzed logical plan of srdd above, which is later discovered by rule ResolveRelations, thus not touched by CaseInsensitiveAttributeReferences at all, and rawLogs.filename is thus not lowercased:

      === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
       Project [*]                            Project [*]
      ! UnresolvedRelation None, boom, None    LowerCaseSchema
      !                                         Subquery boom
      !                                          Project ['name,'message]
      !                                           Join Inner, Some(('rawLogs.filename = 'files.name))
      !                                            LowerCaseSchema
      !                                             Subquery rawlogs
      !                                              SparkLogicalPlan (ExistingRdd [filename#0,message#1], MapPartitionsRDD[1] at mapPartitions at basicOperators.scala:208)
      !                                            Subquery files
      !                                             Project ['name]
      !                                              LowerCaseSchema
      !                                               Subquery logfiles
      !                                                SparkLogicalPlan (ExistingRdd [name#2], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:208)
      

      A reasonable fix for this could be always register analyzed logical plan to the catalog when registering temporary tables.

      Attachments

        Issue Links

          Activity

            People

              marmbrus Michael Armbrust
              lian cheng Cheng Lian
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: