Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-18970

FileSource failure during file list refresh doesn't cause an application to fail, but stops further processing

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.0.0, 2.0.2
    • 2.1.0
    • SQL, Structured Streaming
    • None

    Description

      Spark streaming application uses S3 files as streaming sources. After running for several day processing stopped even though an application continued to run.
      Stack trace:

      java.io.FileNotFoundException: No such file or directory 's3n://XXXXXXXXXXXXXXXXX'
      	at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:818)
      	at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:511)
      	at org.apache.spark.sql.execution.datasources.HadoopFsRelation$$anonfun$7$$anonfun$apply$3.apply(fileSourceInterfaces.scala:465)
      	at org.apache.spark.sql.execution.datasources.HadoopFsRelation$$anonfun$7$$anonfun$apply$3.apply(fileSourceInterfaces.scala:462)
      	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
      	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
      	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
      	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
      	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
      	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
      	at scala.collection.AbstractIterator.to(Iterator.scala:1336)
      	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
      	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
      	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
      	at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
      	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:893)
      	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:893)
      	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
      	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
      	at org.apache.spark.scheduler.Task.run(Task.scala:85)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      

      I believe 2 things should (or can) be fixed:
      1. Application should fail in case of such an error.
      2. Allow application to ignore such failure, since there is a chance that during next refresh the error will not resurface. (In my case I believe an error was cased by S3 cleaning the bucket exactly at the same moment when refresh was running)

      My code to create streaming processing looks as the following:

            val cq = sqlContext.readStream
              .format("json")
              .schema(struct)
              .load(s"input")
              .writeStream
              .option("checkpointLocation", s"checkpoints")
              .foreach(new ForeachWriter[Row] {...})
              .trigger(ProcessingTime("10 seconds")).start()
      		
      	  cq.awaitTermination()	
      

      Attachments

        1. sparkerror.log
          43 kB
          Lev

        Activity

          People

            Unassigned Unassigned
            lev.numerify Lev
            Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: