Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-34417

org.apache.spark.sql.DataFrameNaFunctions.fillMap(values: Seq[(String, Any)]) fails for column name having a dot

Log workAgile BoardRank to TopRank to BottomAttach filesAttach ScreenshotVotersStop watchingWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 3.0.1
    • Fix Version/s: 3.1.2, 3.2.0
    • Component/s: Spark Core
    • Labels:
      None
    • Environment:

      Spark version - 3.0.1

      OS - macOS 10.15.7

      Description

      Code to reproduce the issue:

      import org.apache.spark.sql.SparkSession
      
      object ColumnNameWithDot {
      
        def main(args: Array[String]): Unit = {
      
          val spark = SparkSession.builder.appName("Simple Application")
            .config("spark.master", "local").getOrCreate()
      
          spark.sparkContext.setLogLevel("OFF")
      
          import spark.implicits._
          val df = Seq(("abc", 23), ("def", 44), (null, 0)).toDF("ColWith.Dot", "Col")
          df.na.fill(Map("`ColWith.Dot`" -> "na"))
            .show()
      
        }
      }
      

      Analysis

      -----------------------------PART-I----------------------------------

      Debugged the spark code. It is due to a bug in the spark-catalyst code at

      https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268

      Function in question resolves the column as per the code-comments in the following order until a match is found.

      • Consider pattern dbName.tableName.columnName
      • Consider tableName.columnName
      • Consider everything as columnName

      But implementation considers only the first part for the resolution in the third step. It should join all parts using dot(.).

      -----------------------------PART-II----------------------------------

      If we don’t use column name with back-tick them it fails at https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L400

       If it is quoted, the condition at https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L413 becomes false as k has value quoted with back-tick whereas f.name is not. Then it fails at https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L422

      It is failing due to the reason mentioned in the PART-I.

      Solution

      Make changes in https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268 as below:

      -val name = nameParts.head
      + val name = nameParts.mkString(".") // join all part using .
      val attributes = collectMatches(name, direct.get(name.toLowerCase(Locale.ROOT)))
      - (attributes, nameParts.tail)
      + (attributes, Seq.empty)

      Workaround

      We can make change in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L396

      While we are resolving the input columns, create a new map with the name of the resolved column and replace value as below.

      Idea is to use resolved named instead of input name while filling null values.

      private def fillMap(values: Seq[(String, Any)]): DataFrame = {
        // Error handling
        var resolved: Map[String, Any] = Map()
        values.foreach { case (colName, replaceValue) =>
          // Check column name exists
          val resolvedColumn = df.resolve(colName)
      
          // Check data type
          replaceValue match {
            case _: jl.Double | _: jl.Float | _: jl.Integer | _: jl.Long | _: jl.Boolean | _: String =>
            // This is good
            case _ => throw new IllegalArgumentException(
              s"Unsupported value type ${replaceValue.getClass.getName} ($replaceValue).")
          }
          resolved += (resolvedColumn.name -> replaceValue)
        }
      
        val columnEquals = df.sparkSession.sessionState.analyzer.resolver
        val projections = df.schema.fields.map { f =>
          resolved.find { case (k, _) => columnEquals(k, f.name) }.map { case (_, v) =>
            v match {
              case v: jl.Float => fillCol[Float](f, v)
              case v: jl.Double => fillCol[Double](f, v)
              case v: jl.Long => fillCol[Long](f, v)
              case v: jl.Integer => fillCol[Integer](f, v)
              case v: jl.Boolean => fillCol[Boolean](f, v.booleanValue())
              case v: String => fillCol[String](f, v)
            }
          }.getOrElse(df.col(f.name))
        }
        df.select(projections : _*)
      }
      

        Attachments

          Activity

          $i18n.getText('security.level.explanation', $currentSelection) Viewable by All Users
          Cancel

            People

            • Assignee:
              amsharma Amandeep Sharma Assign to me
              Reporter:
              amsharma Amandeep Sharma
              Shepherd:
              Sean Owen

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment