Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.0.1
-
None
-
Spark version - 3.0.1
OS - macOS 10.15.7
Description
Code to reproduce the issue:
import org.apache.spark.sql.SparkSession object ColumnNameWithDot { def main(args: Array[String]): Unit = { val spark = SparkSession.builder.appName("Simple Application") .config("spark.master", "local").getOrCreate() spark.sparkContext.setLogLevel("OFF") import spark.implicits._ val df = Seq(("abc", 23), ("def", 44), (null, 0)).toDF("ColWith.Dot", "Col") df.na.fill(Map("`ColWith.Dot`" -> "na")) .show() } }
Analysis
-----------------------------PART-I----------------------------------
Debugged the spark code. It is due to a bug in the spark-catalyst code at
Function in question resolves the column as per the code-comments in the following order until a match is found.
- Consider pattern dbName.tableName.columnName
- Consider tableName.columnName
- Consider everything as columnName
But implementation considers only the first part for the resolution in the third step. It should join all parts using dot(.).
-----------------------------PART-II----------------------------------
If we don’t use column name with back-tick them it fails at https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L400
If it is quoted, the condition at https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L413 becomes false as k has value quoted with back-tick whereas f.name is not. Then it fails at https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L422
It is failing due to the reason mentioned in the PART-I.
Solution
Make changes in https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268 as below:
-val name = nameParts.head
+ val name = nameParts.mkString(".") // join all part using .
val attributes = collectMatches(name, direct.get(name.toLowerCase(Locale.ROOT)))
- (attributes, nameParts.tail)
+ (attributes, Seq.empty)
Workaround
We can make change in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L396
While we are resolving the input columns, create a new map with the name of the resolved column and replace value as below.
Idea is to use resolved named instead of input name while filling null values.
private def fillMap(values: Seq[(String, Any)]): DataFrame = { // Error handling var resolved: Map[String, Any] = Map() values.foreach { case (colName, replaceValue) => // Check column name exists val resolvedColumn = df.resolve(colName) // Check data type replaceValue match { case _: jl.Double | _: jl.Float | _: jl.Integer | _: jl.Long | _: jl.Boolean | _: String => // This is good case _ => throw new IllegalArgumentException( s"Unsupported value type ${replaceValue.getClass.getName} ($replaceValue).") } resolved += (resolvedColumn.name -> replaceValue) } val columnEquals = df.sparkSession.sessionState.analyzer.resolver val projections = df.schema.fields.map { f => resolved.find { case (k, _) => columnEquals(k, f.name) }.map { case (_, v) => v match { case v: jl.Float => fillCol[Float](f, v) case v: jl.Double => fillCol[Double](f, v) case v: jl.Long => fillCol[Long](f, v) case v: jl.Integer => fillCol[Integer](f, v) case v: jl.Boolean => fillCol[Boolean](f, v.booleanValue()) case v: String => fillCol[String](f, v) } }.getOrElse(df.col(f.name)) } df.select(projections : _*) }