Description
It appears that if you try to join tables A and B when B is derived from A and you use the eqNullSafe / <=> operator for the join condition, Spark performs a Cartesian product.
However, if you perform the join on tables of the same data when they don't have a relationship, the expected non-Cartesian product join occurs.
// Create some fake data. import org.apache.spark.sql.Row import org.apache.spark.sql.Dataset import org.apache.spark.sql.types._ import org.apache.spark.sql.functions val peopleRowsRDD = sc.parallelize(Seq( Row("Fred", 8, 1), Row("Fred", 8, 2), Row(null, 10, 3), Row(null, 10, 4), Row("Amy", 12, 5), Row("Amy", 12, 6))) val peopleSchema = StructType(Seq( StructField("name", StringType, nullable = true), StructField("group", IntegerType, nullable = true), StructField("data", IntegerType, nullable = true))) val people = spark.createDataFrame(peopleRowsRDD, peopleSchema) people.createOrReplaceTempView("people") scala> people.show +----+-----+----+ |name|group|data| +----+-----+----+ |Fred| 8| 1| |Fred| 8| 2| |null| 10| 3| |null| 10| 4| | Amy| 12| 5| | Amy| 12| 6| +----+-----+----+ // Now create a derived table from that table. It doesn't matter much what. val variantCounts = spark.sql("select name, count(distinct(name, group, data)) as variant_count from people group by name having variant_count > 1") variantCounts.show +----+-------------+ |name|variant_count| +----+-------------+ |Fred| 2| |null| 2| | Amy| 2| +----+-------------+ // Now try an inner join using the regular equalTo that drops nulls. This works fine. val innerJoinEqualTo = variantCounts.join(people, variantCounts("name").equalTo(people("name"))) innerJoinEqualTo.show +----+-------------+----+-----+----+ |name|variant_count|name|group|data| +----+-------------+----+-----+----+ |Fred| 2|Fred| 8| 1| |Fred| 2|Fred| 8| 2| | Amy| 2| Amy| 12| 5| | Amy| 2| Amy| 12| 6| +----+-------------+----+-----+----+ // Okay now lets switch to the <=> operator // // If you haven't set spark.sql.crossJoin.enabled=true, you'll get an error like // "Cartesian joins could be prohibitively expensive and are disabled by default. To explicitly enable them, please set spark.sql.crossJoin.enabled = true;" // // if you have enabled them, you'll get the table below. // // However, we really don't want or expect a Cartesian product! val innerJoinSqlNullSafeEqOp = variantCounts.join(people, variantCounts("name")<=>(people("name"))) innerJoinSqlNullSafeEqOp.show +----+-------------+----+-----+----+ |name|variant_count|name|group|data| +----+-------------+----+-----+----+ |Fred| 2|Fred| 8| 1| |Fred| 2|Fred| 8| 2| |Fred| 2|null| 10| 3| |Fred| 2|null| 10| 4| |Fred| 2| Amy| 12| 5| |Fred| 2| Amy| 12| 6| |null| 2|Fred| 8| 1| |null| 2|Fred| 8| 2| |null| 2|null| 10| 3| |null| 2|null| 10| 4| |null| 2| Amy| 12| 5| |null| 2| Amy| 12| 6| | Amy| 2|Fred| 8| 1| | Amy| 2|Fred| 8| 2| | Amy| 2|null| 10| 3| | Amy| 2|null| 10| 4| | Amy| 2| Amy| 12| 5| | Amy| 2| Amy| 12| 6| +----+-------------+----+-----+----+ // Okay, let's try to construct the exact same variantCount table manually // so it has no relationship to the original. val variantCountRowsRDD = sc.parallelize(Seq( Row("Fred", 2), Row(null, 2), Row("Amy", 2))) val variantCountSchema = StructType(Seq( StructField("name", StringType, nullable = true), StructField("variant_count", IntegerType, nullable = true))) val manualVariantCounts = spark.createDataFrame(variantCountRowsRDD, variantCountSchema) // Now perform the same join with the null-safe equals operator. This works and gives us the expected non-Cartesian product result. val manualVarCountsInnerJoinSqlNullSafeEqOp = manualVariantCounts.join(people, manualVariantCounts("name")<=>(people("name"))) manualVarCountsInnerJoinSqlNullSafeEqOp.show +----+-------------+----+-----+----+ |name|variant_count|name|group|data| +----+-------------+----+-----+----+ |Fred| 2|Fred| 8| 1| |Fred| 2|Fred| 8| 2| | Amy| 2| Amy| 12| 5| | Amy| 2| Amy| 12| 6| |null| 2|null| 10| 3| |null| 2|null| 10| 4| +----+-------------+----+-----+----+
Attachments
Attachments
Issue Links
- is duplicated by
-
SPARK-20804 Join with null safe equality fails with AnalysisException
- Closed
- is related to
-
SPARK-6231 Join on two tables (generated from same one) is broken
- Resolved
-
SPARK-11803 Dataset self join returns incorrect result
- Resolved
-
SPARK-10925 Exception when joining DataFrames
- Resolved
-
SPARK-12052 DataFrame with self-join fails unless toDF() column aliases provided
- Resolved
-
SPARK-15127 Column names are handled incorrectly when they originate from a single Dataframe
- Resolved
-
SPARK-10892 Join with Data Frame returns wrong results
- Closed
-
SPARK-6459 Warn when Column API is constructing trivially true equality
- Resolved
- links to