Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-4835

Streaming saveAs*HadoopFiles() methods may throw FileAlreadyExistsException during checkpoint recovery

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.2.1, 1.3.0
    • Component/s: DStreams
    • Labels:
      None

      Description

      While running (a slightly modified version of) the "recovery with saveAsHadoopFiles operation" test in the streaming CheckpointSuite, I noticed the following error message in the streaming driver log:

      14/12/12 17:42:50.687 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO JobScheduler: Added jobs for time 1500 ms
      14/12/12 17:42:50.687 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO RecurringTimer: Started timer for JobGenerator at time 2000
      14/12/12 17:42:50.688 sparkDriver-akka.actor.default-dispatcher-3 INFO JobScheduler: Starting job streaming job 1500 ms.0 from job set of time 1500 ms
      14/12/12 17:42:50.688 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO JobGenerator: Restarted JobGenerator at 2000 ms
      14/12/12 17:42:50.688 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO JobScheduler: Started JobScheduler
      14/12/12 17:42:50.689 sparkDriver-akka.actor.default-dispatcher-3 INFO JobScheduler: Starting job streaming job 1500 ms.1 from job set of time 1500 ms
      14/12/12 17:42:50.689 sparkDriver-akka.actor.default-dispatcher-3 ERROR JobScheduler: Error running job streaming job 1500 ms.0
      org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/var/folders/0k/2qp2p2vs7bv033vljnb8nk1c0000gn/T/1418434967213-0/-1500.result already exists
      	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:121)
      	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1045)
      	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:944)
      	at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9.apply(PairDStreamFunctions.scala:677)
      	at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9.apply(PairDStreamFunctions.scala:675)
      	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
      	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
      	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
      	at scala.util.Try$.apply(Try.scala:161)
      	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
      	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      14/12/12 17:42:50.691 pool-12-thread-1 INFO SparkContext: Starting job: apply at Transformer.scala:22
      

      Spark Streaming's saveAsHadoopFiles method calls Spark's rdd.saveAsHadoopFile method. The Spark method, in turn, called PairRDDFunctions.saveAsHadoopDataset(), which has error-checking to ensure that the output directory does not already exist:

          if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
            // FileOutputFormat ignores the filesystem parameter
            val ignoredFs = FileSystem.get(hadoopConf)
            hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
          }
      

      If Spark Streaming recovers from a checkpoint and re-runs the last batch in the checkpoint, then saveAsHadoopDataset will have been called twice with the same output path. If the output path exists from the first, pre-recovery run, then the recovery will fail.

      This seems like it could be a pretty serious issue: imagine that a streaming job fails partway through a save() operation, then recovers: in this case, the existing directory will prevent us from ever recovering and finishing the save().

      Fortunately, this should be simple to fix: we should disable the existing directory checks for output operations called by streaming jobs.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                tdas Tathagata Das
                Reporter:
                joshrosen Josh Rosen
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: