org.apache.spark.sql.DataFrameNaFunctions.fillMap(values: Seq[(String, Any)]) fails for column name having a dot



    • Spark version - 3.0.1

      OS - macOS 10.15.7


      Code to reproduce the issue:

      import org.apache.spark.sql.SparkSession
      object ColumnNameWithDot {
        def main(args: Array[String]): Unit = {
          val spark = SparkSession.builder.appName("Simple Application")
            .config("spark.master", "local").getOrCreate()
          import spark.implicits._
          val df = Seq(("abc", 23), ("def", 44), (null, 0)).toDF("ColWith.Dot", "Col")
          df.na.fill(Map("`ColWith.Dot`" -> "na"))



      Debugged the spark code. It is due to a bug in the spark-catalyst code at


      Function in question resolves the column as per the code-comments in the following order until a match is found.

      • Consider pattern dbName.tableName.columnName
      • Consider tableName.columnName
      • Consider everything as columnName

      But implementation considers only the first part for the resolution in the third step. It should join all parts using dot(.).


      If we don’t use column name with back-tick them it fails at https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L400

       If it is quoted, the condition at https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L413 becomes false as k has value quoted with back-tick whereas f.name is not. Then it fails at https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L422

      It is failing due to the reason mentioned in the PART-I.


      Make changes in https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268 as below:

      -val name = nameParts.head
      + val name = nameParts.mkString(".") // join all part using .
      val attributes = collectMatches(name, direct.get(name.toLowerCase(Locale.ROOT)))
      - (attributes, nameParts.tail)
      + (attributes, Seq.empty)


      We can make change in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L396

      While we are resolving the input columns, create a new map with the name of the resolved column and replace value as below.

      Idea is to use resolved named instead of input name while filling null values.

      private def fillMap(values: Seq[(String, Any)]): DataFrame = {
        // Error handling
        var resolved: Map[String, Any] = Map()
        values.foreach { case (colName, replaceValue) =>
          // Check column name exists
          val resolvedColumn = df.resolve(colName)
          // Check data type
          replaceValue match {
            case _: jl.Double | _: jl.Float | _: jl.Integer | _: jl.Long | _: jl.Boolean | _: String =>
            // This is good
            case _ => throw new IllegalArgumentException(
              s"Unsupported value type ${replaceValue.getClass.getName} ($replaceValue).")
          resolved += (resolvedColumn.name -> replaceValue)
        val columnEquals = df.sparkSession.sessionState.analyzer.resolver
        val projections = df.schema.fields.map { f =>
          resolved.find { case (k, _) => columnEquals(k, f.name) }.map { case (_, v) =>
            v match {
              case v: jl.Float => fillCol[Float](f, v)
              case v: jl.Double => fillCol[Double](f, v)
              case v: jl.Long => fillCol[Long](f, v)
              case v: jl.Integer => fillCol[Integer](f, v)
              case v: jl.Boolean => fillCol[Boolean](f, v.booleanValue())
              case v: String => fillCol[String](f, v)
        df.select(projections : _*)




