Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-47061

Wrong result from flatMapGroups using Scala 2.13.x

Log workAgile BoardRank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsAdd voteVotersWatch issueWatchersCreate sub-taskConvert to sub-taskMoveLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete CommentsDelete
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.5.0
    • None
    • Spark Core
    • None
    • Tested with Windows using OpenJDK 17, as well as Ubuntu using OpenJDK 19

    Description

      Using Scala 2.13 and `KeyValueGroupedDataset::flatMapGroups` produces wrong results. All rows produced by flatMapGroups have the values from the last entry in the returned iterator.

      • Downgrading to Scala 2.12 fixes the issue.
      • Using `mapGroups` followed by `flatMap` also fixes the issue.

       

      Test-Setup:

      import org.apache.spark.sql.SparkSession
      object Main {
        def main(args: Array[String]): Unit = {
          val spark = SparkSession.builder().master("local[*]").getOrCreate()
          import spark.implicits._
      
          // using flatMapGroups
          spark.createDataset(Seq(1, 2))
            .groupByKey(x => x)
            .flatMapGroups((x, _) => Seq(10 + x, 20 + x, 30 + x)).show()
      
          // second code using map, then flatMap ~> should yield the same result    
          spark.createDataset(Seq(1, 2))
            .groupByKey(x => x)
            .mapGroups((x, _) => Seq(10 + x, 20 + x, 30 + x))
            .flatMap(x => x).show()
         }
      } 

      We map the key 1 to the Sequence (11, 21, 31). Analogously the key 2 is mapped to (12, 22, 32). Both computations should produce the following (identical) result:

      +-----+
      |value|
      +-----+
      |   11|
      |   21|
      |   31|
      |   12|
      |   22|
      |   32|
      +-----+ 

      This was run using Scala 2.12 with Spark 3.5 - using the following `build.sbt`:

      ThisBuild / scalaVersion := "2.12.18"
      libraryDependencies += "org.apache.spark" % "spark-sql_2.12" % "3.5.0"
      

       

      Problem: By upgrading to Scala 2.13 we instead get the following result:

      +-----+
      |value|
      +-----+
      |   31|
      |   31|
      |   31|
      |   32|
      |   32|
      |   32|
      +-----+ 

      Using this new `build.sbt`. The Code was not modified.

      ThisBuild / scalaVersion := "2.13.10"
      libraryDependencies += "org.apache.spark" % "spark-sql_2.13" % "3.5.0" 

      The test-case is inspired by this StackOverflow question.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned Assign to me
            meinaccount Magnus Kühn

            Dates

              Created:
              Updated:

              Slack

                Issue deployment