Description
Paste the following snippet to spark-shell (need Hive support) to reproduce this issue:
import org.apache.spark.sql.hive.HiveContext val hiveContext = new HiveContext(sc) import hiveContext._ case class LogEntry(filename: String, message: String) case class LogFile(name: String) sc.makeRDD(Seq.empty[LogEntry]).registerTempTable("rawLogs") sc.makeRDD(Seq.empty[LogFile]).registerTempTable("logFiles") val srdd = sql( """ SELECT name, message FROM rawLogs JOIN ( SELECT name FROM logFiles ) files ON rawLogs.filename = files.name """) srdd.registerTempTable("boom") sql("select * from boom")
Exception thrown:
SchemaRDD[7] at RDD at SchemaRDD.scala:103 == Query Plan == == Physical Plan == org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: *, tree: Project [*] LowerCaseSchema Subquery boom Project ['name,'message] Join Inner, Some(('rawLogs.filename = name#2)) LowerCaseSchema Subquery rawlogs SparkLogicalPlan (ExistingRdd [filename#0,message#1], MapPartitionsRDD[1] at mapPartitions at basicOperators.scala:208) Subquery files Project [name#2] LowerCaseSchema Subquery logfiles SparkLogicalPlan (ExistingRdd [name#2], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:208)
Notice that rawLogs in the join operator is not lowercased.
The reason is that, during analysis phase, the CaseInsensitiveAttributeReferences batch is only executed before the Resolution batch. And when srdd is registered as temporary table boom, its original (unanalyzed) logical plan is stored into the catalog:
Join Inner, Some(('rawLogs.filename = 'files.name))
UnresolvedRelation None, rawLogs, None
Subquery files
Project ['name]
UnresolvedRelation None, logFiles, None
notice that attributes referenced in the join operator (esp. rawLogs) is not lowercased yet.
And then, when select * from boom is been analyzed, its input logical plan is:
Project [*] UnresolvedRelation None, boom, None
here the unresolved relation points to the unanalyzed logical plan of srdd above, which is later discovered by rule ResolveRelations, thus not touched by CaseInsensitiveAttributeReferences at all, and rawLogs.filename is thus not lowercased:
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations === Project [*] Project [*] ! UnresolvedRelation None, boom, None LowerCaseSchema ! Subquery boom ! Project ['name,'message] ! Join Inner, Some(('rawLogs.filename = 'files.name)) ! LowerCaseSchema ! Subquery rawlogs ! SparkLogicalPlan (ExistingRdd [filename#0,message#1], MapPartitionsRDD[1] at mapPartitions at basicOperators.scala:208) ! Subquery files ! Project ['name] ! LowerCaseSchema ! Subquery logfiles ! SparkLogicalPlan (ExistingRdd [name#2], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:208)
A reasonable fix for this could be always register analyzed logical plan to the catalog when registering temporary tables.
Attachments
Issue Links
- is duplicated by
-
SPARK-2063 Creating a SchemaRDD via sql() does not correctly resolve nested types
- Resolved
- links to