Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-40588

Sorting issue with partitioned-writing and AQE turned on

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.1.3
    • 3.2.3, 3.3.2
    • Spark Core
    • Spark v3.1.3
      Scala v2.12.13

    Description

      We are attempting to partition data by a few columns, sort by a particular sortCol and write out one file per partition. 

      df
          .repartition(col("day"), col("month"), col("year"))
          .withColumn("partitionId",spark_partition_id)
          .withColumn("monotonicallyIncreasingIdUnsorted",monotonicallyIncreasingId)
          .sortWithinPartitions("year", "month", "day", "sortCol")
          .withColumn("monotonicallyIncreasingIdSorted",monotonicallyIncreasingId)
          .write
          .partitionBy("year", "month", "day")
          .parquet(path)

      When inspecting the results, we observe one file per partition, however we see an alternating pattern of unsorted rows in some files.

      {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832121344,"monotonicallyIncreasingIdSorted":6287832121344}
      {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287877022389,"monotonicallyIncreasingIdSorted":6287876860586}
      {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287877567881,"monotonicallyIncreasingIdSorted":6287832121345}
      {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287835105553,"monotonicallyIncreasingIdSorted":6287876860587}
      {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832570127,"monotonicallyIncreasingIdSorted":6287832121346}
      {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287879965760,"monotonicallyIncreasingIdSorted":6287876860588}
      {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287878762347,"monotonicallyIncreasingIdSorted":6287832121347}
      {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287837165012,"monotonicallyIncreasingIdSorted":6287876860589}
      {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832910545,"monotonicallyIncreasingIdSorted":6287832121348}
      {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287881244758,"monotonicallyIncreasingIdSorted":6287876860590}
      {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287880041345,"monotonicallyIncreasingIdSorted":6287832121349}

      Here is a gist to reproduce the issue. 

      Turning off AQE with spark.conf.set("spark.sql.adaptive.enabled", false) fixes the issue.

      I'm working on identifying why AQE affects the sort order. Any leads or thoughts would be appreciated!

      Attachments

        Activity

          People

            enricomi Enrico Minack
            swebask Swetha Baskaran
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: