There is a limit for shuffle read.
If a shuffle partition block's size is large than Integer.MaxValue(2GB) and this block is fetched from remote, an Exception will be thrown.
Then this task would throw a fetchFailedException.
This task will retry and it would execute successfully only when this task was reScheduled to a executor whose host is same to this oversize shuffle partition block.
However, if there are more than one oversize(>2GB) shuffle partitions block, this task would never execute successfully and it may cause the failure of application.
In this PR, I propose a new method to fetch shuffle block, it would fetch multi times when the relative shuffle partition block is oversize.
The simple brief introduction:
1. Set a shuffle fetch threshold(SHUFFLE_FETCH_THRESHOLD) to Int.MaxValue(2GB)
2. Set a parameter spark.shuffle.fetch.split to control whether enable fetch large partition multi times
3. When creating mapStatus, caucluate the segemens of shuffle block (Math.ceil(size /SHUFFLE_FETCH_THRESHOLD )), and only record the segment number which is large than 1.
4. Define a new BlockId type, ShuffleBlockSegmentId, used to identifiy the fetch method.
5. When spark.shuffle.fetch.split is enabled, send ShuffleBlockSegmentId message to shuffleService instead of ShuffleBlockId message.
6. For a ShuffleBlockId, use a sequence of ManagedBuffers to present its block instead of a ManagedBuffer.
7. In ShuffleBlockFetcherIterator, create a PriorityBlockQueue for a ShuffleBlockId to store the fetched SegmentManagedBuffer, when all segments of a ShuffleBlockId are fetched, take relative sequence of managedBuffers(which are ordered by segmentId) as a successResult for a ShuffleBlockID.
8. In the shuffle serivice side, if the blockId of openBlocks is a ShuffleBlockSegmentId, response a segment managedBuffer of block , if the blockId is a ShuffleBlockId response a whole managedBuffer of block as before.