Details
Description
We designed a function that joins two DFs on common column with some similarity. All next code will be on Scala 2.12.
I've added show calls for demonstration purposes.
import org.apache.spark.ml.Pipeline import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, RegexTokenizer, MinHashLSHModel} import org.apache.spark.sql.{DataFrame, Column} /** * Joins two dataframes on a string column using LSH algorithm for similarity computation. * * The output dataframe has three columns: * * - `datasetA`, data from the left dataframe. * - `datasetB`, data from the right dataframe. * - `distance`, abstract distance between values of `joinColumn` from datasets; * the lower `distance` value, the more similar `joinColumn` values are. */ def similarityJoin( leftDf: DataFrame, rightDf: DataFrame, joinColumn: String, threshold: Double = 0.8, ): DataFrame = { leftDf.show(false) rightDf.show(false) val pipeline = new Pipeline().setStages(Array( new RegexTokenizer() .setPattern("") .setMinTokenLength(1) .setInputCol(joinColumn) .setOutputCol("tokens"), new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"), new HashingTF().setInputCol("ngrams").setOutputCol("vectors"), new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"), )) val model = pipeline.fit(leftDf) val storedHashed = model.transform(leftDf) val landedHashed = model.transform(rightDf) def columnMapper(dsName: String)(c: String): Column = col(s"$dsName.$c") as c val result = model .stages .last .asInstanceOf[MinHashLSHModel] .approxSimilarityJoin(storedHashed, landedHashed, threshold, "distance") .withColumn("datasetA", struct(leftDf.columns.map(columnMapper("datasetA")).toSeq: _*)) .withColumn("datasetB", struct(rightDf.columns.map(columnMapper("datasetB")).toSeq: _*)) result.show(false) result }
Now consider such simple example:
val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1" val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2" similarityJoin(inputDF1, inputDF2, "name", 0.6)
This example runs with no errors and outputs 3 empty DFs. Let's add distinct method to one data frame:
val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 2) as "df1" val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2" similarityJoin(inputDF1, inputDF2, "name", 0.6)
This example outputs two empty DFs and then fails at result.show(false). Error:
org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (LSHModel$$Lambda$3769/0x0000000101804840: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => array<struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>). ... many elided Caused by: java.lang.IllegalArgumentException: requirement failed: Must have at least 1 non zero entry. at scala.Predef$.require(Predef.scala:281) at org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61) at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99) ... many more
Now let's take a look on the example which is close to our application code. Define some helper functions:
import org.apache.spark.sql.functions._ def process1(df: DataFrame): Unit = { val companies = df.select($"id", $"name") val directors = df .select(explode($"directors")) .select($"col.name", $"col.id") .dropDuplicates("id") val toBeMatched1 = companies .filter(length($"name") > 2) .select( $"name", $"id" as "sourceLegalEntityId", ) val toBeMatched2 = directors .filter(length($"name") > 2) .select( $"name", $"id" as "directorId", ) similarityJoin(toBeMatched1, toBeMatched2, "name", 0.6) } def process2(df: DataFrame): Unit = { def process_financials(column: Column): Column = { transform( column, x => x.withField("date", to_timestamp(x("date"), "dd MMM yyyy")), ) } val companies = df.select( $"id", $"name", struct( process_financials($"financials.balanceSheet") as "balanceSheet", process_financials($"financials.capitalAndReserves") as "capitalAndReserves", ) as "financials", ) val directors = df .select(explode($"directors")) .select($"col.name", $"col.id") .dropDuplicates("id") val toBeMatched1 = companies .filter(length($"name") > 2) .select( $"name", $"id" as "sourceLegalEntityId", ) val toBeMatched2 = directors .filter(length($"name") > 2) .select( $"name", $"id" as "directorId", ) similarityJoin(toBeMatched1, toBeMatched2, "name", 0.6) }
Function process2 does the same job as process1, but also does some transforms on financials column before executing similarity join.
Example data frame and its schema:
import org.apache.spark.sql.types._ val schema = StructType( Seq( StructField("id", StringType), StructField("name", StringType), StructField( "directors", ArrayType( StructType(Seq(StructField("id", StringType), StructField("name", StringType))), containsNull = true, ), ), StructField( "financials", StructType( Seq( StructField( "balanceSheet", ArrayType( StructType(Seq( StructField("date", StringType), StructField("value", StringType), ) ), containsNull = true, ), ), StructField( "capitalAndReserves", ArrayType( StructType(Seq( StructField("date", StringType), StructField("value", StringType), ) ), containsNull = true, ), ), ), ), ), ) ) val mainDF = (1 to 10) .toDF("data") .withColumn("data", lit(null) cast schema) .select("data.*")
This code just makes a data frame with 10 rows of null column casted to the specified schema.
Now let's pass mainDF to previously defined functions and observe results.
Example 1:
process1(mainDF)
Outputs three empty DFs, no errors.
Example 2:
process1(mainDF.distinct())
Outputs two empty DFs and then fails at result.show(false). Error:
org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (RegexTokenizer$$Lambda$3266/0x0000000101620040: (string) => array<string>). ... many elided Caused by: java.lang.NullPointerException at org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146) ... many more
Example 3:
process2(mainDF)
Outputs two empty DFs and then fails at result.show(false). Error:
org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (RegexTokenizer$$Lambda$3266/0x0000000101620040: (string) => array<string>). ... many elided Caused by: java.lang.NullPointerException at org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146) ... many more
Somehow presence of distinct DF method or transform (or to_timestamp) SQL function before executing similarity join causes it to fail on empty input data frames. If these operations are done after join, then no errors are emitted.
—
In the examples above I used trivia data frames for testing, and by design of these examples similarityJoin gets empty DFs. They are empty because join column is preventively cleared from null values by filter(length($"name") > 2). We had the same issue with RegexTokenizer raising NullPointerException with the real data where data frames were not empty after identical filter statement. This is really strange to get NullPointerException for DFs which do not have null values in join column.
Current workaround: call distinct DF method and transform SQL function after similarity join. apparently, adding `.cache()` to the source DF resolves the issue.