Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-13352

BlockFetch does not scale well on large block

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • None
    • 1.6.2, 2.0.0
    • Block Manager, Spark Core
    • None

    Description

      BlockManager.getRemoteBytes() perform poorly on large block

        test("block manager") {
          val N = 500 << 20
          val bm = sc.env.blockManager
          val blockId = TaskResultBlockId(0)
          val buffer = ByteBuffer.allocate(N)
          buffer.limit(N)
          bm.putBytes(blockId, buffer, StorageLevel.MEMORY_AND_DISK_SER)
          val result = bm.getRemoteBytes(blockId)
          assert(result.isDefined)
          assert(result.get.limit() === (N))
        }
      

      Here are runtime for different block sizes:

      50M            3 seconds
      100M          7 seconds
      250M          33 seconds
      500M         2 min
      

      Attachments

        Activity

          davies Davies Liu added a comment -

          rxin Can someone help to look into this one?

          This is one of the reasons broadcast hash join is slow with large broadcast.

          davies Davies Liu added a comment - rxin Can someone help to look into this one? This is one of the reasons broadcast hash join is slow with large broadcast.
          rxin Reynold Xin added a comment - - edited

          I think the proper fix is to break up large blocks into small chunks. Basically there is no reason why we need to transfer a single, large, consecutive block of memory.

          rxin Reynold Xin added a comment - - edited I think the proper fix is to break up large blocks into small chunks. Basically there is no reason why we need to transfer a single, large, consecutive block of memory.
          davies Davies Liu added a comment -

          After more investigating, it turned out that the block fetcher in 1.6+ is two times slower than that in 1.5, it took 44 seconds to fetch a 289M block (22 seconds in 1.5).

          1.5

          16/04/01 11:58:33 DEBUG BlockManager: Getting block taskresult_5 from memory
          16/04/01 11:58:34 DEBUG TransportClient: Sending fetch chunk request 0 to localhost/127.0.0.1:54202
          16/04/01 11:58:35 DEBUG Cleaner0: java.nio.ByteBuffer.cleaner(): available
          16/04/01 11:58:56 DEBUG BlockManagerSlaveEndpoint: removing block taskresult_5
          16/04/01 11:58:56 DEBUG BlockManager: Removing block taskresult_5
          16/04/01 11:58:56 DEBUG MemoryStore: Block taskresult_5 of size 289281861 dropped from memory (free 2222933912)
          16/04/01 11:58:56 INFO BlockManagerInfo: Removed taskresult_5 on localhost:54202 in memory (size: 275.9 MB, free: 2.1 GB)
          16/04/01 11:58:56 DEBUG BlockManagerMaster: Updated info of block taskresult_5
          16/04/01 11:58:56 DEBUG BlockManager: Told master about block taskresult_5
          16/04/01 11:58:56 DEBUG BlockManagerSlaveEndpoint: Done removing block taskresult_5, response is true
          16/04/01 11:58:56 DEBUG BlockManagerSlaveEndpoint: Sent response: true to AkkaRpcEndpointRef(Actor[akka://sparkDriver/temp/$I])
          

          In 1.6 or master

          16/04/01 11:55:47 DEBUG BlockManager: Getting remote block taskresult_5 as bytes
          16/04/01 11:55:47 DEBUG BlockManager: Getting remote block taskresult_5 from BlockManagerId(driver, localhost, 54181)
          16/04/01 11:55:47 DEBUG TransportClientFactory: Creating new connection to localhost/127.0.0.1:54181
          16/04/01 11:55:47 DEBUG ResourceLeakDetector: -Dio.netty.leakDetectionLevel: simple
          16/04/01 11:55:47 DEBUG TransportClientFactory: Connection to localhost/127.0.0.1:54181 successful, running bootstraps...
          16/04/01 11:55:47 DEBUG TransportClientFactory: Successfully created connection to localhost/127.0.0.1:54181 after 31 ms (0 ms spent in bootstraps)
          16/04/01 11:55:47 DEBUG Recycler: -Dio.netty.recycler.maxCapacity.default: 262144
          16/04/01 11:55:47 DEBUG BlockManager: Level for block taskresult_5 is StorageLevel(true, true, false, false, 1)
          16/04/01 11:55:47 DEBUG BlockManager: Getting block taskresult_5 from memory
          16/04/01 11:55:48 DEBUG TransportClient: Sending fetch chunk request 0 to localhost/127.0.0.1:54181
          16/04/01 11:55:58 DEBUG Cleaner0: java.nio.ByteBuffer.cleaner(): available
          16/04/01 11:56:31 DEBUG BlockManagerSlaveEndpoint: removing block taskresult_5
          16/04/01 11:56:31 DEBUG BlockManager: Removing block taskresult_5
          16/04/01 11:56:31 DEBUG MemoryStore: Block taskresult_5 of size 289281861 dropped from memory (free 2851511312)
          16/04/01 11:56:31 INFO BlockManagerInfo: Removed taskresult_5 on localhost:54181 in memory (size: 275.9 MB, free: 2.7 GB)
          16/04/01 11:56:31 DEBUG BlockManagerMaster: Updated info of block taskresult_5
          16/04/01 11:56:31 DEBUG BlockManager: Told master about block taskresult_5
          16/04/01 11:56:31 DEBUG BlockManagerSlaveEndpoint: Done removing block taskresult_5, response is true
          16/04/01 11:56:31 DEBUG BlockManagerSlaveEndpoint: Sent response: true to 192.168.0.143:54179
          
          davies Davies Liu added a comment - After more investigating, it turned out that the block fetcher in 1.6+ is two times slower than that in 1.5, it took 44 seconds to fetch a 289M block (22 seconds in 1.5). 1.5 16/04/01 11:58:33 DEBUG BlockManager: Getting block taskresult_5 from memory 16/04/01 11:58:34 DEBUG TransportClient: Sending fetch chunk request 0 to localhost/127.0.0.1:54202 16/04/01 11:58:35 DEBUG Cleaner0: java.nio.ByteBuffer.cleaner(): available 16/04/01 11:58:56 DEBUG BlockManagerSlaveEndpoint: removing block taskresult_5 16/04/01 11:58:56 DEBUG BlockManager: Removing block taskresult_5 16/04/01 11:58:56 DEBUG MemoryStore: Block taskresult_5 of size 289281861 dropped from memory (free 2222933912) 16/04/01 11:58:56 INFO BlockManagerInfo: Removed taskresult_5 on localhost:54202 in memory (size: 275.9 MB, free: 2.1 GB) 16/04/01 11:58:56 DEBUG BlockManagerMaster: Updated info of block taskresult_5 16/04/01 11:58:56 DEBUG BlockManager: Told master about block taskresult_5 16/04/01 11:58:56 DEBUG BlockManagerSlaveEndpoint: Done removing block taskresult_5, response is true 16/04/01 11:58:56 DEBUG BlockManagerSlaveEndpoint: Sent response: true to AkkaRpcEndpointRef(Actor[akka: //sparkDriver/temp/$I]) In 1.6 or master 16/04/01 11:55:47 DEBUG BlockManager: Getting remote block taskresult_5 as bytes 16/04/01 11:55:47 DEBUG BlockManager: Getting remote block taskresult_5 from BlockManagerId(driver, localhost, 54181) 16/04/01 11:55:47 DEBUG TransportClientFactory: Creating new connection to localhost/127.0.0.1:54181 16/04/01 11:55:47 DEBUG ResourceLeakDetector: -Dio.netty.leakDetectionLevel: simple 16/04/01 11:55:47 DEBUG TransportClientFactory: Connection to localhost/127.0.0.1:54181 successful, running bootstraps... 16/04/01 11:55:47 DEBUG TransportClientFactory: Successfully created connection to localhost/127.0.0.1:54181 after 31 ms (0 ms spent in bootstraps) 16/04/01 11:55:47 DEBUG Recycler: -Dio.netty.recycler.maxCapacity. default : 262144 16/04/01 11:55:47 DEBUG BlockManager: Level for block taskresult_5 is StorageLevel( true , true , false , false , 1) 16/04/01 11:55:47 DEBUG BlockManager: Getting block taskresult_5 from memory 16/04/01 11:55:48 DEBUG TransportClient: Sending fetch chunk request 0 to localhost/127.0.0.1:54181 16/04/01 11:55:58 DEBUG Cleaner0: java.nio.ByteBuffer.cleaner(): available 16/04/01 11:56:31 DEBUG BlockManagerSlaveEndpoint: removing block taskresult_5 16/04/01 11:56:31 DEBUG BlockManager: Removing block taskresult_5 16/04/01 11:56:31 DEBUG MemoryStore: Block taskresult_5 of size 289281861 dropped from memory (free 2851511312) 16/04/01 11:56:31 INFO BlockManagerInfo: Removed taskresult_5 on localhost:54181 in memory (size: 275.9 MB, free: 2.7 GB) 16/04/01 11:56:31 DEBUG BlockManagerMaster: Updated info of block taskresult_5 16/04/01 11:56:31 DEBUG BlockManager: Told master about block taskresult_5 16/04/01 11:56:31 DEBUG BlockManagerSlaveEndpoint: Done removing block taskresult_5, response is true 16/04/01 11:56:31 DEBUG BlockManagerSlaveEndpoint: Sent response: true to 192.168.0.143:54179
          davies Davies Liu added a comment -

          cc adav

          davies Davies Liu added a comment - cc adav
          liyezhang556520 Zhang, Liye added a comment - - edited

          Hi davies, I think this JIRA is related with SPARK-14242 and SPARK-14290, can you test with spark master branch again to see if this issue still exists?

          liyezhang556520 Zhang, Liye added a comment - - edited Hi davies , I think this JIRA is related with SPARK-14242 and SPARK-14290 , can you test with spark master branch again to see if this issue still exists?
          davies Davies Liu added a comment - - edited

          The result is much better now (there is some fixed overhead for tests):

          50M            2.2 seconds
          100M          2.8 seconds
          250M          3.7 seconds
          500M          7.8 seconds
          
          davies Davies Liu added a comment - - edited The result is much better now (there is some fixed overhead for tests): 50M 2.2 seconds 100M 2.8 seconds 250M 3.7 seconds 500M 7.8 seconds
          apachespark Apache Spark added a comment -

          User 'liyezhang556520' has created a pull request for this issue:
          https://github.com/apache/spark/pull/12296

          apachespark Apache Spark added a comment - User 'liyezhang556520' has created a pull request for this issue: https://github.com/apache/spark/pull/12296
          liyezhang556520 Zhang, Liye added a comment -

          davies, the last result for 500M should be 7.8 seconds, not 7.8 min, right?

          liyezhang556520 Zhang, Liye added a comment - davies , the last result for 500M should be 7.8 seconds, not 7.8 min, right?
          davies Davies Liu added a comment -

          corrected, thanks

          davies Davies Liu added a comment - corrected, thanks

          People

            liyezhang556520 Zhang, Liye
            davies Davies Liu
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: