Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
3.5.0
-
Tested in 3.5.0
Reproduced on, so far:
- kubernetes deployment
- docker cluster deployment
Local Cluster:
- master
- worker1 (2/2G)
- worker2 (1/1G)
Description
It seems like we have encountered an issue with Spark AQE's dynamic cache partitioning which causes incorrect count output values and data loss.
A similar issue could not be found, so i am creating this ticket to raise awareness.
Preconditions:
- Setup a cluster as per environment specification
- Prepare test data (or a data large enough to trigger read by both executors)
Steps to reproduce:
- Read parent
- Self join parent
- cache + materialize parent
- Join parent with child
Performing a self-join over a parentDF, then caching + materialising the DF, and then joining it with a childDF results in incorrect count value and missing data.
Performing a repartition seems to fix the issue, most probably due to rearrangement of the underlying partitions and statistic update.
This behaviour is observed over a multi-worker cluster with a job running 2 executors (1 per worker), when reading a large enough data file by both executors.
Not reproducible in local mode.
Circumvention:
So far, by disabling spark.sql.optimizer.canChangeCachedPlanOutputPartitioning or performing repartition this can be alleviated, but it is not the fix of the root cause.
This issue is dangerous considering that data loss is occurring silently and in absence of proper checks can lead to wrong behaviour/results down the line. So we have labeled it as a blocker.
There seems to be a file-size treshold after which dataloss is observed (possibly implying that it happens when both executors start reading the data file)
Minimal example:
// Read parent val parentData = session.read.format("avro").load("/data/shared/test/parent") // Self join parent and cache + materialize val parent = parentData.join(parentData, Seq("PID")).cache() parent.count() // Read child val child = session.read.format("avro").load("/data/shared/test/child") // Basic join val resultBasic = child.join( parent, parent("PID") === child("PARENT_ID") ) // Count: 16479 (Wrong) println(s"Count no repartition: ${resultBasic.count()}") // Repartition parent join val resultRepartition = child.join( parent.repartition(), parent("PID") === child("PARENT_ID") ) // Count: 50094 (Correct) println(s"Count with repartition: ${resultRepartition.count()}")
Invalid count-only DAG:
Valid repartition DAG:
Spark submit for this job:
spark-submit --class ExampleApp --packages org.apache.spark:spark-avro_2.12:3.5.0 --deploy-mode cluster --master spark://spark-master:6066 --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.cores.max=3 --driver-cores 1 --driver-memory 1g --executor-cores 1 --executor-memory 1g /path/to/test.jar
The cluster should be setup to the following (worker1(m+e) worker2(e)) as to split the executors onto two workers.
I have prepared a simple github repository which contains the compilable above example.
https://github.com/ridvanappabugis/spark-3.5-issue