Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22752

FileNotFoundException while reading from Kafka

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.2.0
    • None
    • Structured Streaming
    • None

    Description

      We are running a stateful structured streaming job which reads from Kafka and writes to HDFS. And we are hitting this exception:

      17/12/08 05:20:12 ERROR FileFormatWriter: Aborting job null.
      org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, hcube1-1n03.eng.hortonworks.com, executor 1): java.lang.IllegalStateException: Error reading delta file /checkpointDir/state/0/0/1.delta of HDFSStateStoreProvider[id = (op=0, part=0), dir = /checkpointDir/state/0/0]: /checkpointDir/state/0/0/1.delta does not exist
      	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:410)
      	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:362)
      	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:359)
      	at scala.Option.getOrElse(Option.scala:121)
      	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:359)
      	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:358)
      	at scala.Option.getOrElse(Option.scala:121)
      	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:358)
      	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:360)
      	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:359)
      	at scala.Option.getOrElse(Option.scala:121)
      

      Of course, the file doesn't exist in HDFS. And in the state/0/0 directory there is no file at all. While we have some files in the commits and offsets folders. I am not sure about the reason of this behavior. It seems to happen on the second time the job is started, after the first one failed. So it looks like task failures can generate it. Or it might be related to watermarks, since there are some problems related to the incoming data for which the watermark was filtering all the incoming data.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              mgaido Marco Gaido
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: