Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-4085

Job will fail if a shuffle file that's read locally gets deleted

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.2.0
    • Fix Version/s: 1.2.0
    • Component/s: Spark Core
    • Labels:
      None
    • Target Version/s:

      Description

      This commit: https://github.com/apache/spark/commit/665e71d14debb8a7fc1547c614867a8c3b1f806a changed the behavior of fetching local shuffle blocks such that if a shuffle block is not found locally, the shuffle block is no longer marked as failed, and a fetch failed exception is not thrown (this is because the "catch" block here won't ever be invoked: https://github.com/apache/spark/commit/665e71d14debb8a7fc1547c614867a8c3b1f806a#diff-e6e1631fa01e17bf851f49d30d028823R202 because the exception called from getLocalFromDisk() doesn't get thrown until next() gets called on the iterator).

      Reynold Xin Matei Alexandru Zaharia it looks like you guys changed the test for this to catch the new exception that gets thrown (https://github.com/apache/spark/commit/665e71d14debb8a7fc1547c614867a8c3b1f806a#diff-9c2e1918319de967045d04caf813a7d1R93). Was that intentional? Because the new exception is a SparkException and not a FetchFailedException, jobs with missing local shuffle data will now fail, rather than having the map stage get retried.

      This problem is reproducible with this test case:

        test("hash shuffle manager recovers when local shuffle files get deleted") {
          val conf = new SparkConf(false)
          conf.set("spark.shuffle.manager", "hash")
          sc = new SparkContext("local", "test", conf)
          val rdd = sc.parallelize(1 to 10, 2).map((_, 1)).reduceByKey(_+_)
          rdd.count()
      
          // Delete one of the local shuffle blocks.
          sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0)).delete()
          rdd.count()
        }
      

      which will fail on the second rdd.count().

      This is a regression from 1.1.

        Attachments

          Activity

            People

            • Assignee:
              rxin Reynold Xin
              Reporter:
              kayousterhout Kay Ousterhout
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: