Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-43514

Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML features caused by certain SQL functions

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.3.2, 3.4.0
    • None
    • ML, SQL
    • Scala version: 2.12.17

      Test examples were executed inside Zeppelin 0.10.1 with Spark 3.4.0.

      Spark 3.3.2 deployed on cluster was used to check the issue on real data.

    Description

      We designed a function that joins two DFs on common column with some similarity. All next code will be on Scala 2.12.

      I've added show calls for demonstration purposes.

      import org.apache.spark.ml.Pipeline
      import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, RegexTokenizer, MinHashLSHModel}
      import org.apache.spark.sql.{DataFrame, Column}
      
      /**
       * Joins two dataframes on a string column using LSH algorithm for similarity computation.
       *
       * The output dataframe has three columns:
       *
       *   - `datasetA`, data from the left dataframe.
       *   - `datasetB`, data from the right dataframe.
       *   - `distance`, abstract distance between values of `joinColumn` from datasets;
       *     the lower `distance` value, the more similar `joinColumn` values are.
       */
      def similarityJoin(
        leftDf: DataFrame,
        rightDf: DataFrame,
        joinColumn: String,
        threshold: Double = 0.8,
      ): DataFrame = {
        leftDf.show(false)
        rightDf.show(false)
      
        val pipeline = new Pipeline().setStages(Array(
          new RegexTokenizer()
            .setPattern("")
            .setMinTokenLength(1)
            .setInputCol(joinColumn)
            .setOutputCol("tokens"),
          new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
          new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
          new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
        ))
      
        val model = pipeline.fit(leftDf)
      
        val storedHashed = model.transform(leftDf)
        val landedHashed = model.transform(rightDf)
      
        def columnMapper(dsName: String)(c: String): Column = col(s"$dsName.$c") as c
      
        val result = model
          .stages
          .last
          .asInstanceOf[MinHashLSHModel]
          .approxSimilarityJoin(storedHashed, landedHashed, threshold, "distance")
          .withColumn("datasetA", struct(leftDf.columns.map(columnMapper("datasetA")).toSeq: _*))
          .withColumn("datasetB", struct(rightDf.columns.map(columnMapper("datasetB")).toSeq: _*))
      
        result.show(false)
      
        result
      }
      

      Now consider such simple example:

      val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
      val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
      
      similarityJoin(inputDF1, inputDF2, "name", 0.6)
      

      This example runs with no errors and outputs 3 empty DFs. Let's add distinct method to one data frame:

      val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 2) as "df1"
      val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
      
      similarityJoin(inputDF1, inputDF2, "name", 0.6)
      

      This example outputs two empty DFs and then fails at result.show(false). Error:

      org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (LSHModel$$Lambda$3769/0x0000000101804840: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => array<struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>).
        ... many elided
      Caused by: java.lang.IllegalArgumentException: requirement failed: Must have at least 1 non zero entry.
        at scala.Predef$.require(Predef.scala:281)
        at org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
        at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
        ... many more
      

      Now let's take a look on the example which is close to our application code. Define some helper functions:

      import org.apache.spark.sql.functions._
      
      
      def process1(df: DataFrame): Unit = {
          val companies = df.select($"id", $"name")
      
          val directors = df
                  .select(explode($"directors"))
                  .select($"col.name", $"col.id")
                  .dropDuplicates("id")
      
          val toBeMatched1 = companies
                  .filter(length($"name") > 2)
                  .select(
                      $"name",
                      $"id" as "sourceLegalEntityId",
                  )
      
          val toBeMatched2 = directors
                  .filter(length($"name") > 2)
                  .select(
                      $"name",
                      $"id" as "directorId",
                  )
      
          similarityJoin(toBeMatched1, toBeMatched2, "name", 0.6)
      }
      
      def process2(df: DataFrame): Unit = {
          def process_financials(column: Column): Column = {
              transform(
                  column,
                  x => x.withField("date", to_timestamp(x("date"), "dd MMM yyyy")),
              )
          }
      
          val companies = df.select(
              $"id",
              $"name",
              struct(
                  process_financials($"financials.balanceSheet") as "balanceSheet",
                  process_financials($"financials.capitalAndReserves") as "capitalAndReserves",
              ) as "financials",
          )
      
          val directors = df
                  .select(explode($"directors"))
                  .select($"col.name", $"col.id")
                  .dropDuplicates("id")
      
          val toBeMatched1 = companies
                  .filter(length($"name") > 2)
                  .select(
                      $"name",
                      $"id" as "sourceLegalEntityId",
                  )
      
          val toBeMatched2 = directors
                  .filter(length($"name") > 2)
                  .select(
                      $"name",
                      $"id" as "directorId",
                  )
      
          similarityJoin(toBeMatched1, toBeMatched2, "name", 0.6)
      }
      

      Function process2 does the same job as process1, but also does some transforms on financials column before executing similarity join.

      Example data frame and its schema:

      import org.apache.spark.sql.types._
      
      val schema = StructType(
          Seq(
              StructField("id", StringType),
              StructField("name", StringType),
              StructField(
                  "directors",
                  ArrayType(
                      StructType(Seq(StructField("id", StringType), StructField("name", StringType))),
                      containsNull = true,
                  ),
              ),
              StructField(
                  "financials",
                  StructType(
                      Seq(
                          StructField(
                              "balanceSheet",
                              ArrayType(
                                  StructType(Seq(
                                      StructField("date", StringType),
                                      StructField("value", StringType),
                                  )
                                  ),
                                  containsNull = true,
                              ),
                          ),
                          StructField(
                              "capitalAndReserves",
                              ArrayType(
                                  StructType(Seq(
                                      StructField("date", StringType),
                                      StructField("value", StringType),
                                  )
                                  ),
                                  containsNull = true,
                              ),
                          ),
                      ),
                  ),
              ),
          )
      )
      
      val mainDF = (1 to 10)
              .toDF("data")
              .withColumn("data", lit(null) cast schema)
              .select("data.*")
      

      This code just makes a data frame with 10 rows of null column casted to the specified schema.

      Now let's pass mainDF to previously defined functions and observe results.

      Example 1:

      process1(mainDF)
      

      Outputs three empty DFs, no errors.

      Example 2:

      process1(mainDF.distinct())
      

      Outputs two empty DFs and then fails at result.show(false). Error:

      org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (RegexTokenizer$$Lambda$3266/0x0000000101620040: (string) => array<string>).
        ... many elided
      Caused by: java.lang.NullPointerException
        at org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146)
        ... many more
      

      Example 3:

      process2(mainDF)
      

      Outputs two empty DFs and then fails at result.show(false). Error:

      org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (RegexTokenizer$$Lambda$3266/0x0000000101620040: (string) => array<string>).
        ... many elided
      Caused by: java.lang.NullPointerException
        at org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146)
        ... many more
      

      Somehow presence of distinct DF method or transform (or to_timestamp) SQL function before executing similarity join causes it to fail on empty input data frames. If these operations are done after join, then no errors are emitted.

      In the examples above I used trivia data frames for testing, and by design of these examples similarityJoin gets empty DFs. They are empty because join column is preventively cleared from null values by filter(length($"name") > 2). We had the same issue with RegexTokenizer raising NullPointerException with the real data where data frames were not empty after identical filter statement. This is really strange to get NullPointerException for DFs which do not have null values in join column.

      Current workaround: call distinct DF method and transform SQL function after similarity join. apparently, adding `.cache()` to the source DF resolves the issue.

      Attachments

        1. Plan.png
          477 kB
          Ritika Maheshwari
        2. Screen Shot 2023-05-31 at 11.14.24 PM.png
          301 kB
          Ritika Maheshwari
        3. Test.scala
          6 kB
          Svyatoslav Semenyuk
        4. 2023-05-30 13-47-04.mp4
          31.37 MB
          Svyatoslav Semenyuk
        5. Screen Shot 2023-05-22 at 5.39.55 PM.png
          309 kB
          Ritika Maheshwari

        Activity

          People

            Unassigned Unassigned
            prometheus3375 Svyatoslav Semenyuk
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: