Description
When optimizing an outer join, spark passes an empty row to both sides to see if nulls would be ignored (side comment: for half-outer joins it subsequently ignores the assessment on the dominant side).
For some reason, a condition such as xx IS NOT NULL && udf(xx) IS NOT NULL might result in checking the right side first, and an exception if the udf doesn't expect a null input (given the left side first).
A example is SIMILAR to the following (see actual query plans separately):
def func(value: Any): Int = ... // return AnyVal which probably causes a IS NOT NULL added filter on the result val df1 = sparkSession .table(...) .select("col1", "col2") // LongType both val df11 = df1 .filter(df1("col1").isNotNull) .withColumn("col3", functions.udf(func)(df1("col1")) .repartition(df1("col2")) .sortWithinPartitions(df1("col2")) val df2 = ... // load other data containing col2, similarly repartition and sort val df3 = df1.join(df2, Seq("col2"), "left_outer")
Attachments
Issue Links
- duplicates
-
SPARK-20359 Catalyst EliminateOuterJoin optimization can cause NPE
- Resolved