Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
1.6.1, 2.4.0
-
None
-
None
Description
Following code snippet explains the bug. The join on the Double columns results in catersian , when both columns typecasted to String it works.
please see the explain plan belolw
Error: scala> cartesianJoinErr.explain()
== Physical Plan ==
CartesianProduct
:- ConvertToSafe
: +- Project name#143,group#144,data#145,name#143 AS name1#146
: +- Filter (name#143 = name#143)
: +- Scan ExistingRDDname#143,group#144,data#145
+- Scan ExistingRDDname#147,group#148,data#149
-----------------------------------------------------------
After conversion to String explain plan
stringColJoinWorks.explain()
== Physical Plan ==
SortMergeJoin name1String#151, name2String#152
:- Sort name1String#151 ASC, false, 0
: +- TungstenExchange hashpartitioning(name1String#151,200), None
: +- Project name#143,group#144,data#145,cast(name#143 as string) AS name1String#151
: +- Scan ExistingRDDname#143,group#144,data#145
+- Sort name2String#152 ASC, false, 0
+- TungstenExchange hashpartitioning(name2String#152,200), None
+- Project name#153,group#154,data#155,cast(name#153 as string) AS name2String#152
+- Scan ExistingRDDname#153,group#154,data#155
import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions
val doubleRDD = sc.parallelize(Seq(
Row(11111.0, 2, 1),
Row(22222.0, 8, 2),
Row(33333.0, 10, 3),
Row(44444.0, 10, 4)))
val testSchema = StructType(Seq(
StructField("name", DoubleType, nullable = true),
StructField("group", IntegerType, nullable = true),
StructField("data", IntegerType, nullable = true)))
val doubleRDDCartesian = sqlContext.createDataFrame(doubleRDD, testSchema)
val cartNewCol = doubleRDDCartesian.select($"name" , $"group", $"data")
val newColName1DF = cartNewCol.withColumn("name1", $"name")
val cartesianJoinErr = newColName1DF.join(doubleRDDCartesian, newColName1DF("name1")===(doubleRDDCartesian("name")))
cartesianJoinErr.show
cartesianJoinErr.explain()
//Convert both into StringType
val stringColDF1 = doubleRDDCartesian.withColumn("name1String",$"name".cast("String"))
val stringColDF2 = cartNewCol.withColumn("name2String", $"name".cast("String"))
val stringColJoinWorks = stringColDF1.join(stringColDF2, stringColDF1("name1String")===(stringColDF2("name2String")))
stringColJoinWorks.show
stringColJoinWorks.explain()
Attachments
Issue Links
- duplicates
-
SPARK-25150 Joining DataFrames derived from the same source yields confusing/incorrect results
- Resolved