Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-36932

Misuse "merge schema" when mapGroups

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.0.0
    • None
    • Java API
    • None

    Description

      // Test case for this bug
      val spark = SparkSession.builder().master("local[*]").getOrCreate()
      
      val data1 = Seq(
        Row("0", 1),
        Row("0", 2))
      val schema1 = StructType(List(
        StructField("col0", StringType),
        StructField("col1", IntegerType))
      )
      val data2 = Seq(
        Row("0", 1),
        Row("0", 2))
      val schema2 = StructType(List(
        StructField("str0", StringType),
        StructField("col0", IntegerType))
      )
      
      val df1 = spark.createDataFrame(spark.sparkContext.makeRDD(data1), schema1)
      val df2 = spark.createDataFrame(spark.sparkContext.makeRDD(data2), schema2)
      
      val joined = df1.join(df2, df1("col0") === df2("str0"), "left")
      
      import spark.implicits._
      val distinct = joined
        .groupByKey {
          row => row.getInt(1)
        }
        .mapGroups {
          case (_, iter) =>
              iter.maxBy(row => {
                row.getInt(3)
              })
        }(RowEncoder(joined.schema))
      
      distinct.show()
       // A part of errors
      Exception in thread "main" org.apache.spark.SparkException: Failed to merge fields 'col0' and 'col0'. Failed to merge incompatible data types string and int 
      at org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:593)
      at scala.Option.map(Option.scala:163)
      at org.apache.spark.sql.types.StructType$.$anonfun$merge$1(StructType.scala:585)
      at org.apache.spark.sql.types.StructType$.$anonfun$merge$1$adapted
      (StructType.scala:582)
      at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
      at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
      at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) 
      at org.apache.spark.sql.types.StructType$.merge(StructType.scala:582) 
      at org.apache.spark.sql.types.StructType.merge(StructType.scala:492)
      at org.apache.spark.sql.catalyst.expressions.SchemaPruning$.$
      anonfun$pruneDataSchema$2(SchemaPruning.scala:36)
      at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
      at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) 
      at scala.collection.immutable.List.foldLeft(List.scala:89) 
      at scala.collection.LinearSeqOptimized.reduceLeft(LinearSeqOptimized.scala:140) 
      at scala.collection.LinearSeqOptimized.reduceLeft$(LinearSeqOptimized.scala:138) 
      at scala.collection.immutable.List.reduceLeft(List.scala:89)
      

      After left join two dataframe which have two shemas with the same name but different types, we use groupByKey and mapGroups to get the result. But it will makes some mistakes. Is it my grammatical mistake? If not, I think It may be related to schema merge in StructType.scala: 593. How can I turn off schema merging?

      Attachments

        Activity

          People

            Unassigned Unassigned
            Kane27 Wang Zekai
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: