We recently tracked missing data to a collision in the fake Hadoop task attempt ID created when using Hadoop OutputCommitters. This is similar to
A stage had one task fail to get one shard from a shuffle, causing a FetchFailedException and Spark resubmitted the stage. Because only one task was affected, the original stage attempt continued running tasks that had been resubmitted. Another task ran two attempts concurrently on the same executor, but had the same attempt number because they were from different stage attempts. Because the attempt number was the same, the task used the same temp locations. That caused one attempt to fail because a file path already existed, and that attempt then removed the shared temp location and deleted the other task's data. When the second attempt succeeded, it committed partial data.
The problem was that both attempts had the same partition and attempt numbers, despite being run in different stages, and that was used to create a Hadoop task attempt ID on which the temp location was based. The fix is to use Spark's global task attempt ID, which is a counter, instead of attempt number because attempt number is reused in stage attempts.