Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27876

Split large shuffle partition to multi-segments to enable transfer oversize shuffle partition block.

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.3.2
    • None
    • Shuffle, Spark Core

    Description

      There is a limit for shuffle read.

      If a shuffle partition block's size is large than Integer.MaxValue(2GB) and this block is fetched from remote, an Exception will be thrown.

      
          2019-05-24 06:46:30,333 [9935] - WARN [shuffle-client-6-2:TransportChannelHandler@78] - Exception in connection from hadoop3747.jd.163.org/10.196.76.172:7337
          java.lang.IllegalArgumentException: Too large frame: 2991947178
          at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
          at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133)
          at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81)
          at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
      

      Then this task would throw a fetchFailedException.

      This task will retry and it would execute successfully only when this task was reScheduled to a executor whose host is same to this oversize shuffle partition block.

      However, if there are more than one oversize(>2GB) shuffle partitions block, this task would never execute successfully and it may cause the failure of application.

      In this PR, I propose a new method to fetch shuffle block, it would fetch multi times when the relative shuffle partition block is oversize.

      The simple brief introduction:

      1. Set a shuffle fetch threshold(SHUFFLE_FETCH_THRESHOLD) to Int.MaxValue(2GB)
      2. Set a parameter spark.shuffle.fetch.split to control whether enable fetch large partition multi times
      3. When creating mapStatus, caucluate the segemens of shuffle block (Math.ceil(size /SHUFFLE_FETCH_THRESHOLD )), and only record the segment number which is large than 1.
      4. Define a new BlockId type, ShuffleBlockSegmentId, used to identifiy the fetch method.
      5. When spark.shuffle.fetch.split is enabled, send ShuffleBlockSegmentId message to shuffleService instead of ShuffleBlockId message.
      6. For a ShuffleBlockId, use a sequence of ManagedBuffers to present its block instead of a ManagedBuffer.
      7. In ShuffleBlockFetcherIterator, create a PriorityBlockQueue for a ShuffleBlockId to store the fetched SegmentManagedBuffer, when all segments of a ShuffleBlockId are fetched, take relative sequence of managedBuffers(which are ordered by segmentId) as a successResult for a ShuffleBlockID.
      8. In the shuffle serivice side, if the blockId of openBlocks is a ShuffleBlockSegmentId, response a segment managedBuffer of block , if the blockId is a ShuffleBlockId response a whole managedBuffer of block as before.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            hzfeiwang feiwang
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Issue deployment