Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-19276

FetchFailures can be hidden by user (or sql) exception handling

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 2.1.0
    • 2.2.0
    • Scheduler, Spark Core, SQL
    • None

    Description

      The scheduler handles node failures by looking for a special FetchFailedException thrown by the shuffle block fetcher. This is handled in Executor and then passed as a special msg back to the driver: https://github.com/apache/spark/blob/278fa1eb305220a85c816c948932d6af8fa619aa/core/src/main/scala/org/apache/spark/executor/Executor.scala#L403

      However, user code exists in between the shuffle block fetcher and that catch block – it could intercept the exception, wrap it with something else, and throw a different exception. If that happens, spark treats it as an ordinary task failure, and retries the task, rather than regenerating the missing shuffle data. The task eventually is retried 4 times, its doomed to fail each time, and the job is failed.

      You might think that no user code should do that – but even sparksql does it:
      https://github.com/apache/spark/blob/278fa1eb305220a85c816c948932d6af8fa619aa/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L214

      Here's an example stack trace. This is from Spark 1.6, so the sql code is not the same, but the problem is still there:

      17/01/13 19:18:02 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1983.0 (TID 304851, xxx): org.apache.spark.SparkException: Task failed while writing rows.
              at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:414)
              at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
              at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
              at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
              at org.apache.spark.scheduler.Task.run(Task.scala:89)
              at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.spark.shuffle.FetchFailedException: Failed to connect to xxx/yyy:zzz
              at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
      ...
      17/01/13 19:19:29 ERROR scheduler.TaskSetManager: Task 0 in stage 1983.0 failed 4 times; aborting job
      

      I think the right fix here is to also set a fetch failure status in the TaskContextImpl, so the executor can check that instead of just one exception.

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            imranr Imran Rashid
            irashid Imran Rashid
            Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment