Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26231

Dataframes inner join on double datatype columns resulting in Cartesian product

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 1.6.1, 2.4.0
    • None
    • SQL
    • None

    Description

      Following code snippet explains the bug. The join on the Double columns results in catersian , when both columns typecasted to String it works.

      please see the explain plan belolw

      Error: scala> cartesianJoinErr.explain()
      == Physical Plan ==
      CartesianProduct
      :- ConvertToSafe
      :  +- Project name#143,group#144,data#145,name#143 AS name1#146
      :     +- Filter (name#143 = name#143)
      :        +- Scan ExistingRDDname#143,group#144,data#145
      +- Scan ExistingRDDname#147,group#148,data#149

      -----------------------------------------------------------

      After conversion to String explain plan

      stringColJoinWorks.explain()
      == Physical Plan ==
      SortMergeJoin name1String#151, name2String#152
      :- Sort name1String#151 ASC, false, 0
      :  +- TungstenExchange hashpartitioning(name1String#151,200), None
      :     +- Project name#143,group#144,data#145,cast(name#143 as string) AS name1String#151
      :        +- Scan ExistingRDDname#143,group#144,data#145
      +- Sort name2String#152 ASC, false, 0
         +- TungstenExchange hashpartitioning(name2String#152,200), None
            +- Project name#153,group#154,data#155,cast(name#153 as string) AS name2String#152
               +- Scan ExistingRDDname#153,group#154,data#155

       

       

      import org.apache.spark.sql.Row
      import org.apache.spark.sql.Dataset
      import org.apache.spark.sql.types._
      import org.apache.spark.sql.functions

      val doubleRDD = sc.parallelize(Seq(
          Row(11111.0, 2, 1),
          Row(22222.0, 8, 2),
          Row(33333.0, 10, 3),
          Row(44444.0, 10, 4)))
          
      val testSchema = StructType(Seq(
          StructField("name", DoubleType, nullable = true),
          StructField("group", IntegerType, nullable = true),
          StructField("data", IntegerType, nullable = true)))
          
      val doubleRDDCartesian = sqlContext.createDataFrame(doubleRDD, testSchema)

      val cartNewCol = doubleRDDCartesian.select($"name" , $"group", $"data")

      val newColName1DF = cartNewCol.withColumn("name1", $"name")
      val cartesianJoinErr = newColName1DF.join(doubleRDDCartesian, newColName1DF("name1")===(doubleRDDCartesian("name")))
      cartesianJoinErr.show
      cartesianJoinErr.explain()

      //Convert both into StringType
      val stringColDF1 = doubleRDDCartesian.withColumn("name1String",$"name".cast("String"))
      val stringColDF2 = cartNewCol.withColumn("name2String", $"name".cast("String"))

      val stringColJoinWorks = stringColDF1.join(stringColDF2, stringColDF1("name1String")===(stringColDF2("name2String")))
      stringColJoinWorks.show
      stringColJoinWorks.explain()

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              shrikant_khupat@yahoo.com Shrikant
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: