Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.4.0
Description
Spark 3.4.0 introduced option spark.sql.optimizer.plannedWrite.enabled, which is enabled by default. When disabled, partitioned writing loses in-partition order when spilling occurs.
This is related to SPARK-40885 where setting option spark.sql.optimizer.plannedWrite.enabled to true will remove the existing sort (for day and id) entirely.
Run this with 512m memory and one executor, e.g.:
spark-shell --driver-memory 512m --master "local[1]"
import org.apache.spark.sql.SaveMode spark.conf.set("spark.sql.optimizer.plannedWrite.enabled", false) val ids = 2000000 val days = 2 val parts = 2 val ds = spark.range(0, days, 1, parts).withColumnRenamed("id", "day").join(spark.range(0, ids, 1, parts)) ds.repartition($"day") .sortWithinPartitions($"day", $"id") .write .partitionBy("day") .mode(SaveMode.Overwrite) .csv("interleaved.csv")
Check the written files are sorted (states OK when file is sorted):
for file in interleaved.csv/day\=*/part-* do echo "$(sort -n "$file" | md5sum | cut -d " " -f 1) $file" done | md5sum -c
Files should look like this
0 1 2 ... 1048576 1048577 1048578 ...
But they look like
0 1048576 1 1048577 2 1048578 ...
The cause issue is the same as in SPARK-40588. A sort (for day) is added on top of the existing sort (for day and id). Spilling interleaves the sorted spill files.
Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0 +- AdaptiveSparkPlan isFinalPlan=false +- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [plan_id=30] +- BroadcastNestedLoopJoin BuildLeft, Inner :- BroadcastExchange IdentityBroadcastMode, [plan_id=28] : +- Project [id#0L AS day#2L] : +- Range (0, 2, step=1, splits=2) +- Range (0, 2000000, step=1, splits=2)
Attachments
Issue Links
- causes
-
SPARK-49091 Some broadcasts cannot be cleared from memory storage
- Resolved
- links to