Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-41914

Sorting issue with partitioned-writing and planned write optimization disabled

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.4.0
    • 3.4.0
    • SQL

    Description

      Spark 3.4.0 introduced option spark.sql.optimizer.plannedWrite.enabled, which is enabled by default. When disabled, partitioned writing loses in-partition order when spilling occurs.

      This is related to SPARK-40885 where setting option spark.sql.optimizer.plannedWrite.enabled to true will remove the existing sort (for day and id) entirely.

      Run this with 512m memory and one executor, e.g.:

      spark-shell --driver-memory 512m --master "local[1]"
      
      import org.apache.spark.sql.SaveMode
      
      spark.conf.set("spark.sql.optimizer.plannedWrite.enabled", false)
      
      val ids = 2000000
      val days = 2
      val parts = 2
      
      val ds = spark.range(0, days, 1, parts).withColumnRenamed("id", "day").join(spark.range(0, ids, 1, parts))
      
      ds.repartition($"day")
        .sortWithinPartitions($"day", $"id")
        .write
        .partitionBy("day")
        .mode(SaveMode.Overwrite)
        .csv("interleaved.csv")
      

      Check the written files are sorted (states OK when file is sorted):

      for file in interleaved.csv/day\=*/part-*
      do
        echo "$(sort -n "$file" | md5sum | cut -d " " -f 1)  $file"
      done | md5sum -c
      

      Files should look like this

      0
      1
      2
      ...
      1048576
      1048577
      1048578
      ...
      

      But they look like

      0
      1048576
      1
      1048577
      2
      1048578
      ...
      

      The cause issue is the same as in SPARK-40588. A sort (for day) is added on top of the existing sort (for day and id). Spilling interleaves the sorted spill files.

      Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0
      +- AdaptiveSparkPlan isFinalPlan=false
         +- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [plan_id=30]
               +- BroadcastNestedLoopJoin BuildLeft, Inner
                  :- BroadcastExchange IdentityBroadcastMode, [plan_id=28]
                  :  +- Project [id#0L AS day#2L]
                  :     +- Range (0, 2, step=1, splits=2)
                  +- Range (0, 2000000, step=1, splits=2)
      

      Attachments

        Activity

          People

            enricomi Enrico Minack
            enricomi Enrico Minack
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: