Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-40588

Sorting issue with partitioned-writing and AQE turned on

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.1.3
    • 3.2.3, 3.3.2
    • Spark Core
    • Spark v3.1.3
      Scala v2.12.13

    Description

      We are attempting to partition data by a few columns, sort by a particular sortCol and write out one file per partition. 

      df
          .repartition(col("day"), col("month"), col("year"))
          .withColumn("partitionId",spark_partition_id)
          .withColumn("monotonicallyIncreasingIdUnsorted",monotonicallyIncreasingId)
          .sortWithinPartitions("year", "month", "day", "sortCol")
          .withColumn("monotonicallyIncreasingIdSorted",monotonicallyIncreasingId)
          .write
          .partitionBy("year", "month", "day")
          .parquet(path)

      When inspecting the results, we observe one file per partition, however we see an alternating pattern of unsorted rows in some files.

      {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832121344,"monotonicallyIncreasingIdSorted":6287832121344}
      {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287877022389,"monotonicallyIncreasingIdSorted":6287876860586}
      {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287877567881,"monotonicallyIncreasingIdSorted":6287832121345}
      {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287835105553,"monotonicallyIncreasingIdSorted":6287876860587}
      {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832570127,"monotonicallyIncreasingIdSorted":6287832121346}
      {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287879965760,"monotonicallyIncreasingIdSorted":6287876860588}
      {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287878762347,"monotonicallyIncreasingIdSorted":6287832121347}
      {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287837165012,"monotonicallyIncreasingIdSorted":6287876860589}
      {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832910545,"monotonicallyIncreasingIdSorted":6287832121348}
      {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287881244758,"monotonicallyIncreasingIdSorted":6287876860590}
      {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287880041345,"monotonicallyIncreasingIdSorted":6287832121349}

      Here is a gist to reproduce the issue. 

      Turning off AQE with spark.conf.set("spark.sql.adaptive.enabled", false) fixes the issue.

      I'm working on identifying why AQE affects the sort order. Any leads or thoughts would be appreciated!

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            enricomi Enrico Minack
            swebask Swetha Baskaran
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment