Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.4.2, 3.3.2, 3.5.0
-
None
Description
We discovered an important correctness issue directly linked to SPARK-47024
Even if SPARK-47024 has been considered 'Not a Problem' since it is linked directly to floats and double precision, it can still have drastic impacts combined to spark.sql.execution.sortBeforeRepartition set to true (the default)
We consistently reproduced the issue doing a GROUP BY with a SUM of float or double aggregation, followed by a repartition (common case to produce bigger files as output, either triggered by SQL hints or extensions like kyuubi).
If the repartition stage fails with Fetch Failed Exception for only few tasks, spark decides to recompute the partitions from the previous stage for which output could not be fetched and will retry only the failed partitions downstream.
Because block fetch order is indeterministic, the new upstream partition computation can provide a slightly different value for a float/double sum aggregation. We noticed a 1 bit difference is UnsafeRow backing byte array in all of our attempts. The sort performed before repartition uses UnsafeRow.hashcode for the row prefix which will be completely different even with such 1 bit difference, leading to the sort being completely different in the new upstream partition and thus target downstream partition for the shuffled rows completely different as well.
Because sort becomes undeterministic and since only the failed dowstream tasks are retried the resulting repartition will lead to duplicate rows as well as missing rows. The solution brought by SPARK-23207 is broken.
So far, we can only deactivate spark.sql.execution.sortBeforeRepartition to make the entire job fail instead of producing incorrect data. The default for spark currently leads to silent correctness issue.