Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.0.0
-
None
-
None
Description
// Test case for this bug val spark = SparkSession.builder().master("local[*]").getOrCreate() val data1 = Seq( Row("0", 1), Row("0", 2)) val schema1 = StructType(List( StructField("col0", StringType), StructField("col1", IntegerType)) ) val data2 = Seq( Row("0", 1), Row("0", 2)) val schema2 = StructType(List( StructField("str0", StringType), StructField("col0", IntegerType)) ) val df1 = spark.createDataFrame(spark.sparkContext.makeRDD(data1), schema1) val df2 = spark.createDataFrame(spark.sparkContext.makeRDD(data2), schema2) val joined = df1.join(df2, df1("col0") === df2("str0"), "left") import spark.implicits._ val distinct = joined .groupByKey { row => row.getInt(1) } .mapGroups { case (_, iter) => iter.maxBy(row => { row.getInt(3) }) }(RowEncoder(joined.schema)) distinct.show()
// A part of errors Exception in thread "main" org.apache.spark.SparkException: Failed to merge fields 'col0' and 'col0'. Failed to merge incompatible data types string and int at org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:593) at scala.Option.map(Option.scala:163) at org.apache.spark.sql.types.StructType$.$anonfun$merge$1(StructType.scala:585) at org.apache.spark.sql.types.StructType$.$anonfun$merge$1$adapted (StructType.scala:582) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at org.apache.spark.sql.types.StructType$.merge(StructType.scala:582) at org.apache.spark.sql.types.StructType.merge(StructType.scala:492) at org.apache.spark.sql.catalyst.expressions.SchemaPruning$.$ anonfun$pruneDataSchema$2(SchemaPruning.scala:36) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:89) at scala.collection.LinearSeqOptimized.reduceLeft(LinearSeqOptimized.scala:140) at scala.collection.LinearSeqOptimized.reduceLeft$(LinearSeqOptimized.scala:138) at scala.collection.immutable.List.reduceLeft(List.scala:89)
After left join two dataframe which have two shemas with the same name but different types, we use groupByKey and mapGroups to get the result. But it will makes some mistakes. Is it my grammatical mistake? If not, I think It may be related to schema merge in StructType.scala: 593. How can I turn off schema merging?