Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-20073

Unexpected Cartesian product when using eqNullSafe in join with a derived table

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.0.2, 2.1.0
    • 2.3.0
    • SQL
    • None

    Description

      It appears that if you try to join tables A and B when B is derived from A and you use the eqNullSafe / <=> operator for the join condition, Spark performs a Cartesian product.

      However, if you perform the join on tables of the same data when they don't have a relationship, the expected non-Cartesian product join occurs.

      // Create some fake data.
      
      import org.apache.spark.sql.Row
      import org.apache.spark.sql.Dataset
      import org.apache.spark.sql.types._
      import org.apache.spark.sql.functions
      
      val peopleRowsRDD = sc.parallelize(Seq(
          Row("Fred", 8, 1),
          Row("Fred", 8, 2),
          Row(null, 10, 3),
          Row(null, 10, 4),
          Row("Amy", 12, 5),
          Row("Amy", 12, 6)))
          
      val peopleSchema = StructType(Seq(
          StructField("name", StringType, nullable = true),
          StructField("group", IntegerType, nullable = true),
          StructField("data", IntegerType, nullable = true)))
          
      val people = spark.createDataFrame(peopleRowsRDD, peopleSchema)
      
      people.createOrReplaceTempView("people")
      
      scala> people.show
      +----+-----+----+
      |name|group|data|
      +----+-----+----+
      |Fred|    8|   1|
      |Fred|    8|   2|
      |null|   10|   3|
      |null|   10|   4|
      | Amy|   12|   5|
      | Amy|   12|   6|
      +----+-----+----+
      
      // Now create a derived table from that table. It doesn't matter much what.
      val variantCounts = spark.sql("select name, count(distinct(name, group, data)) as variant_count from people group by name having variant_count > 1")
      
      variantCounts.show
      +----+-------------+                                                            
      |name|variant_count|
      +----+-------------+
      |Fred|            2|
      |null|            2|
      | Amy|            2|
      +----+-------------+
      
      // Now try an inner join using the regular equalTo that drops nulls. This works fine.
      
      val innerJoinEqualTo = variantCounts.join(people, variantCounts("name").equalTo(people("name")))
      innerJoinEqualTo.show
      
      +----+-------------+----+-----+----+                                            
      |name|variant_count|name|group|data|
      +----+-------------+----+-----+----+
      |Fred|            2|Fred|    8|   1|
      |Fred|            2|Fred|    8|   2|
      | Amy|            2| Amy|   12|   5|
      | Amy|            2| Amy|   12|   6|
      +----+-------------+----+-----+----+
      
      // Okay now lets switch to the <=> operator
      //
      // If you haven't set spark.sql.crossJoin.enabled=true, you'll get an error like
      // "Cartesian joins could be prohibitively expensive and are disabled by default. To explicitly enable them, please set spark.sql.crossJoin.enabled = true;"
      //
      // if you have enabled them, you'll get the table below.
      //
      // However, we really don't want or expect a Cartesian product!
      
      val innerJoinSqlNullSafeEqOp = variantCounts.join(people, variantCounts("name")<=>(people("name")))
      innerJoinSqlNullSafeEqOp.show
      
      +----+-------------+----+-----+----+                                            
      |name|variant_count|name|group|data|
      +----+-------------+----+-----+----+
      |Fred|            2|Fred|    8|   1|
      |Fred|            2|Fred|    8|   2|
      |Fred|            2|null|   10|   3|
      |Fred|            2|null|   10|   4|
      |Fred|            2| Amy|   12|   5|
      |Fred|            2| Amy|   12|   6|
      |null|            2|Fred|    8|   1|
      |null|            2|Fred|    8|   2|
      |null|            2|null|   10|   3|
      |null|            2|null|   10|   4|
      |null|            2| Amy|   12|   5|
      |null|            2| Amy|   12|   6|
      | Amy|            2|Fred|    8|   1|
      | Amy|            2|Fred|    8|   2|
      | Amy|            2|null|   10|   3|
      | Amy|            2|null|   10|   4|
      | Amy|            2| Amy|   12|   5|
      | Amy|            2| Amy|   12|   6|
      +----+-------------+----+-----+----+
      
      // Okay, let's try to construct the exact same variantCount table manually
      // so it has no relationship to the original.
      
      val variantCountRowsRDD = sc.parallelize(Seq(
          Row("Fred", 2),
          Row(null, 2),
          Row("Amy", 2)))
          
      val variantCountSchema = StructType(Seq(
          StructField("name", StringType, nullable = true),
          StructField("variant_count", IntegerType, nullable = true)))
          
      val manualVariantCounts = spark.createDataFrame(variantCountRowsRDD, variantCountSchema)
      
      // Now perform the same join with the null-safe equals operator. This works and gives us the expected non-Cartesian product result.
      
      val manualVarCountsInnerJoinSqlNullSafeEqOp = manualVariantCounts.join(people, manualVariantCounts("name")<=>(people("name")))
      manualVarCountsInnerJoinSqlNullSafeEqOp.show
      
      +----+-------------+----+-----+----+
      |name|variant_count|name|group|data|
      +----+-------------+----+-----+----+
      |Fred|            2|Fred|    8|   1|
      |Fred|            2|Fred|    8|   2|
      | Amy|            2| Amy|   12|   5|
      | Amy|            2| Amy|   12|   6|
      |null|            2|null|   10|   3|
      |null|            2|null|   10|   4|
      +----+-------------+----+-----+----+
      
      

      Attachments

        Issue Links

          Activity

            People

              maropu Takeshi Yamamuro
              everett Everett Toews
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: