Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22579

BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.1.0
    • None
    • Block Manager, Spark Core

    Description

      when an RDD partition is cached on an executor bu the task requiring it is running on another executor (process locality ANY), the cached partition is fetched via BlockManager.getRemoteValues which delegates to BlockManager.getRemoteBytes, both calls are blocking.
      in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes cluster, cached to disk. rough math shows that average partition size is 700MB.
      looking at spark UI it was obvious that tasks running with process locality 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I was able to capture thread dumps of executors executing remote tasks and got this stake trace:

      Thread ID Thread Name Thread State Thread Locks
      1521 Executor task launch worker-1000 WAITING Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978})
      sun.misc.Unsafe.park(Native Method)
      java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
      java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
      java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
      scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
      scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
      scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
      scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
      scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
      scala.concurrent.Await$.result(package.scala:190)
      org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190)
      org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104)
      org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582)
      org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550)
      org.apache.spark.storage.BlockManager.get(BlockManager.scala:638)
      org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690)
      org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
      org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
      org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
      org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      org.apache.spark.rdd.RDD.iterator(RDD.scala:287)

      digging into the code showed that the block manager first fetches all bytes (getRemoteBytes) and then wraps it with a deserialization stream, this has several draw backs:
      1. blocking, requesting executor is blocked while the remote executor is serving the block.
      2. potentially large memory footprint on requesting executor, in my use case a 700mb of raw bytes stored in a ChunkedByteBuffer.
      3. inefficient, requesting side usually don't need all values at once as it consumes the values via an iterator.
      4. potentially large memory footprint on serving executor, in case the block is cached in deserialized form the serving executor has to serialize it into a ChunkedByteBuffer (BlockManager.doGetLocalBytes). this is both memory & CPU intensive, memory footprint can be reduced by using a limited buffer for serialization 'spilling' to the response stream.

      I suggest improving this either by implementing full streaming mechanism or some kind of pagination mechanism, in addition the requesting executor should be able to make progress with the data it already has, blocking only when local buffer is exhausted and remote side didn't deliver the next chunk of the stream (or page in case of pagination) yet.

      Attachments

        Activity

          People

            Unassigned Unassigned
            eyalfa Eyal Farago
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: