Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30786

Block replication is not retried on other BlockManagers when it fails on 1 of the peers

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.3.4, 2.4.5, 3.0.0
    • Fix Version/s: 3.1.0
    • Component/s: Block Manager, Spark Core
    • Labels:
      None

      Description

      When we cache an RDD with replication > 1, Firstly the RDD block is cached locally on one of the BlockManager and then it is replicated to (replication-1) number of BlockManagers. While replicating a block, if replication fails on one of the peers, it is supposed to retry the replication on some other peer (based on "spark.storage.maxReplicationFailures" config). But currently this doesn't happen because of some issue.

      Logs of 1 of the executor which is trying to replicate:

      20/02/10 09:01:47 INFO Executor: Starting executor ID 1 on host wn11-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net
      .
      .
      .
      20/02/10 09:06:45 INFO Executor: Running task 244.0 in stage 3.0 (TID 550)
      20/02/10 09:06:45 DEBUG BlockManager: Getting local block rdd_13_244
      20/02/10 09:06:45 DEBUG BlockManager: Block rdd_13_244 was not found
      20/02/10 09:06:45 DEBUG BlockManager: Getting remote block rdd_13_244
      20/02/10 09:06:45 DEBUG BlockManager: Block rdd_13_244 not found
      20/02/10 09:06:46 INFO MemoryStore: Block rdd_13_244 stored as values in memory (estimated size 33.3 MB, free 44.2 MB)
      20/02/10 09:06:46 DEBUG BlockManager: Told master about block rdd_13_244
      20/02/10 09:06:46 DEBUG BlockManager: Put block rdd_13_244 locally took  947 ms
      20/02/10 09:06:46 DEBUG BlockManager: Level for block rdd_13_244 is StorageLevel(memory, deserialized, 3 replicas)
      20/02/10 09:06:46 TRACE BlockManager: Trying to replicate rdd_13_244 of 34908552 bytes to BlockManagerId(2, wn10-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36711, None)
      20/02/10 09:06:47 TRACE BlockManager: Replicated rdd_13_244 of 34908552 bytes to BlockManagerId(2, wn10-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36711, None) in 205.849858 ms
      20/02/10 09:06:47 TRACE BlockManager: Trying to replicate rdd_13_244 of 34908552 bytes to BlockManagerId(5, wn2-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36463, None)
      20/02/10 09:06:47 TRACE BlockManager: Replicated rdd_13_244 of 34908552 bytes to BlockManagerId(5, wn2-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36463, None) in 180.501504 ms
      20/02/10 09:06:47 DEBUG BlockManager: Replicating rdd_13_244 of 34908552 bytes to 2 peer(s) took 387.381168 ms
      20/02/10 09:06:47 DEBUG BlockManager: block rdd_13_244 replicated to BlockManagerId(5, wn2-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36463, None), BlockManagerId(2, wn10-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36711, None)
      20/02/10 09:06:47 DEBUG BlockManager: Put block rdd_13_244 remotely took  423 ms
      20/02/10 09:06:47 DEBUG BlockManager: Putting block rdd_13_244 with replication took  1371 ms
      20/02/10 09:06:47 DEBUG BlockManager: Getting local block rdd_13_244
      20/02/10 09:06:47 DEBUG BlockManager: Level for block rdd_13_244 is StorageLevel(memory, deserialized, 3 replicas)
      20/02/10 09:06:47 INFO Executor: Finished task 244.0 in stage 3.0 (TID 550). 2253 bytes result sent to driver
      
      

      Logs of other executor where the block is being replicated to:

      20/02/10 09:01:47 INFO Executor: Starting executor ID 5 on host wn2-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net
      .
      .
      .
      20/02/10 09:06:47 INFO MemoryStore: Will not store rdd_13_244
      20/02/10 09:06:47 WARN MemoryStore: Not enough space to cache rdd_13_244 in memory! (computed 4.2 MB so far)
      20/02/10 09:06:47 INFO MemoryStore: Memory use = 4.9 GB (blocks) + 7.3 MB (scratch space shared across 2 tasks(s)) = 4.9 GB. Storage limit = 4.9 GB.
      20/02/10 09:06:47 DEBUG BlockManager: Put block rdd_13_244 locally took  12 ms
      20/02/10 09:06:47 WARN BlockManager: Block rdd_13_244 could not be removed as it was not found on disk or in memory
      20/02/10 09:06:47 WARN BlockManager: Putting block rdd_13_244 failed
      20/02/10 09:06:47 DEBUG BlockManager: Putting block rdd_13_244 without replication took  13 ms
      

      Note here that the block replication failed in Executor-5 with log line "Not enough space to cache rdd_13_244 in memory!". But Executor-1 shows that block is successfully replicated to executor-5 - "Replicated rdd_13_244 of 34908552 bytes to BlockManagerId(5, wn2-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36463, None)" and so it never retries the replication on some other executor.

      Sample code:

      sc.setLogLevel("INFO")
      def randomString(length: Int) = {
        val r = new scala.util.Random
        val sb = new StringBuilder
        for (i <- 1 to length) \{ sb.append(r.nextPrintableChar) }
        sb.toString
      }
       
      val df = sc.parallelize(1 to 300000, 300).map\{x => randomString(100000)}.toDF
      import org.apache.spark.storage.StorageLevel
      df.persist(StorageLevel(false, true, false, true, 3))
      df.count()
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                prakharjain09 Prakhar Jain
                Reporter:
                prakharjain09 Prakhar Jain
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: