Status: In Progress
We've seen some shuffle data corruption during shuffle read phase.
As described in
SPARK-26089, spark only checks small shuffle blocks before PR #23453, which is proposed by ankuriitg.
There are two changes/improvements that are made in PR #23453.
1. Large blocks are checked upto maxBytesInFlight/3 size in a similar way as smaller blocks, so if a
large block is corrupt in the starting, that block will be re-fetched and if that also fails,
FetchFailureException will be thrown.
2. If large blocks are corrupt after size maxBytesInFlight/3, then any IOException thrown while
reading the stream will be converted to FetchFailureException. This is slightly more aggressive
than was originally intended but since the consumer of the stream may have already read some records and processed them, we can't just re-fetch the block, we need to fail the whole task. Additionally, we also thought about maybe adding a new type of TaskEndReason, which would re-try the task couple of times before failing the previous stage, but given the complexity involved in that solution we decided to not proceed in that direction.
However, I think there still exists some problems with the current shuffle transmitted data verification mechanism:
- For a large block, it is checked upto maxBytesInFlight/3 size when fetching shuffle data. So if a large block is corrupt after size maxBytesInFlight/3, it can not be detected in data fetch phase. This has been described in the previous section.
- Only the compressed or wrapped blocks are checked, I think we should also check thease blocks which are not wrapped.
We complete the verification mechanism for shuffle transmitted data:
Firstly, we choose crc32 for the checksum verification of shuffle data.
Crc is also used for checksum verification in hadoop, it is simple and fast.
In shuffle write phase, after completing the partitionedFile, we compute
the crc32 value for each partition and then write these digests with the indexs into shuffle index file.
For the sortShuffleWriter and unsafe shuffle writer, there is only one partitionedFile for a shuffleMapTask, so the compution of digests(compute the digests for each partition depend on the indexs of this partitionedFile) is cheap.
For the bypassShuffleWriter, the reduce partitions is little than byPassMergeThreshold, the cost of digests compution is acceptable.
In shuffle read phase, the digest value will be passed with the block data.
And we will recompute the digest of the data obtained to compare with the origin digest value.
When recomputing the digest of data obtained, it only need an additional buffer(2048Bytes) for computing crc32 value.
After recomputing, we will reset the obtained data inputStream, if it is markSupported we only need reset it, otherwise it is a fileSegmentManagerBuffer, we need recreate it.
So, this verification mechanism proposed for shuffle transmitted data is cheap and complete.
- is related to
SPARK-4105 FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based shuffle
SPARK-26089 Handle large corrupt shuffle blocks
- links to