The RemoteBlockPushResolver which handles the shuffle push blocks and merges them was introduced in #30062. We have identified 2 scenarios where the merged blocks get corrupted:
- StreamCallback.onFailure() is called more than once. Initially we assumed that the onFailure callback will be called just once per stream. However, we observed that this is called twice when a client connection is reset. When the client connection is reset then there are 2 events that get triggered in this order.
- exceptionCaught. This event is propagated to StreamInterceptor. StreamInterceptor.exceptionCaught() invokes callback.onFailure(streamId, cause). This is the first time StreamCallback.onFailure() will be invoked.
- channelInactive. Since the channel closes, the channelInactive event gets triggered which again is propagated to StreamInterceptor. StreamInterceptor.channelInactive() invokes callback.onFailure(streamId, new ClosedChannelException()). This is the second time StreamCallback.onFailure() will be invoked.
- The flag isWriting is set prematurely to true. This introduces an edge case where a stream that is trying to merge a duplicate block (created because of a speculative task) may interfere with an active stream if the duplicate stream fails.
Also adding additional changes that improve the code.
- Using positional writes all the time because this simplifies the code and with microbenchmarking haven't seen any performance impact.
- Additional minor changes.
- relates to
SPARK-32916 Add support for external shuffle service in YARN deployment mode to leverage push-based shuffle
- links to