Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.1.3
-
Spark v3.1.3
Scala v2.12.13
Description
We are attempting to partition data by a few columns, sort by a particular sortCol and write out one file per partition.
df .repartition(col("day"), col("month"), col("year")) .withColumn("partitionId",spark_partition_id) .withColumn("monotonicallyIncreasingIdUnsorted",monotonicallyIncreasingId) .sortWithinPartitions("year", "month", "day", "sortCol") .withColumn("monotonicallyIncreasingIdSorted",monotonicallyIncreasingId) .write .partitionBy("year", "month", "day") .parquet(path)
When inspecting the results, we observe one file per partition, however we see an alternating pattern of unsorted rows in some files.
{"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832121344,"monotonicallyIncreasingIdSorted":6287832121344} {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287877022389,"monotonicallyIncreasingIdSorted":6287876860586} {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287877567881,"monotonicallyIncreasingIdSorted":6287832121345} {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287835105553,"monotonicallyIncreasingIdSorted":6287876860587} {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832570127,"monotonicallyIncreasingIdSorted":6287832121346} {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287879965760,"monotonicallyIncreasingIdSorted":6287876860588} {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287878762347,"monotonicallyIncreasingIdSorted":6287832121347} {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287837165012,"monotonicallyIncreasingIdSorted":6287876860589} {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832910545,"monotonicallyIncreasingIdSorted":6287832121348} {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287881244758,"monotonicallyIncreasingIdSorted":6287876860590} {"sortCol":100000,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287880041345,"monotonicallyIncreasingIdSorted":6287832121349}
Here is a gist to reproduce the issue.
Turning off AQE with spark.conf.set("spark.sql.adaptive.enabled", false) fixes the issue.
I'm working on identifying why AQE affects the sort order. Any leads or thoughts would be appreciated!