Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26359

Spark checkpoint restore fails after query restart

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Information Provided
    • Affects Version/s: 2.4.0
    • Fix Version/s: None
    • Labels:
      None
    • Environment:

      Description

      We had an incident where one of our structured streaming queries could not be restarted after an usual S3 checkpointing failure. Now to clarify before everything else - we are aware of the issues with S3 and are working towards moving to HDFS but this will take time. S3 will cause queries to fail quite often during peak hours and we have separate logic to handle this that will attempt to restart the failed queries if possible.

      In this particular case we could not restart one of the failed queries. Seems like between detecting a failure in the query and starting it up again something went really wrong with Spark and state in checkpoint folder got corrupted for some reason.

      The issue starts with the usual FileNotFoundException that happens with S3

      2018-12-10 21:09:25.785 ERROR MicroBatchExecution: Query feedback [id = c074233a-2563-40fc-8036-b5e38e2e2c42, runId = e607eb6e-8431-4269-acab-cc2c1f9f09dd]
      terminated with error
      java.io.FileNotFoundException: No such file or directory: s3a://some.domain/spark/checkpoints/49/feedback/offsets/.37
      348.8227943f-a848-4af5-b5bf-1fea81775b24.tmp
      at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
              at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
              at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
              at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2715)
              at org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
              at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
              at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
              at org.apache.hadoop.fs.FileContext.rename(FileContext.java:965)
              at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:331)
              at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
              at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataL
      og.scala:126)
              at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112)
              at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
              at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
              at scala.Option.getOrElse(Option.scala:121)
              at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
              at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
              at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:381)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
              at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
              at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
              at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
              at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
              at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
      2018-12-10 21:09:25.791 WARN InsightsSparkAggregates$: Query feedback terminated with exception, attempting restart
      

      At the last line we claim that a restart will be attempted for the query named feedback. We start the query up and encounter this almost immediately

      2018-12-10 21:09:26.870 WARN InsightsSparkAggregates$: Query feedback currently not running, starting query in own scheduling pool
      2018-12-10 21:09:51.776 WARN TaskSetManager: Lost task 11.0 in stage 66240.0 (TID 2782264, ec2-52-87-158-48.compute-1.amazonaws.com, executor 29): java.lang.IllegalStateException: Error reading delta file s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta of HDFSStateStoreProvider[id = (op=2,part=11),dir = s3a://some.domain/spark/checkpoints/49/feedback/state/2/11]: s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta does not exist
              at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:427)
              at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply$mcVJ$sp(HDFSBackedStateStoreProvider.scala:384)
              at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply(HDFSBackedStateStoreProvider.scala:383)
              at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply(HDFSBackedStateStoreProvider.scala:383)
              at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
              at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:383)
              at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:356)
              at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:535)
              at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap(HDFSBackedStateStoreProvider.scala:356)
              at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:204)
              at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:371)
              at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:88)
              at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
              at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
              at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
              at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
              at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
              at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
              at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
              at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
              at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
              at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
              at org.apache.spark.scheduler.Task.run(Task.scala:121)
              at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
              at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
              at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)
      Caused by: java.io.FileNotFoundException: No such file or directory: s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta
              at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
              at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
              at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
              at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
              at org.apache.hadoop.fs.DelegateToFileSystem.open(DelegateToFileSystem.java:190)
              at org.apache.hadoop.fs.AbstractFileSystem.open(AbstractFileSystem.java:649)
              at org.apache.hadoop.fs.FileContext$6.next(FileContext.java:802)
              at org.apache.hadoop.fs.FileContext$6.next(FileContext.java:798)
              at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
              at org.apache.hadoop.fs.FileContext.open(FileContext.java:804)
              at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.open(CheckpointFileManager.scala:322)
              at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:424)
              ... 28 more
      

      And this will go on for ever until we bump the checkpoint folder name.

      2018-12-10 21:09:57.261 WARN TaskSetManager: Lost task 7.0 in stage 66265.0 (TID 2783200, ec2-34-236-156-197.compute-1.amazonaws.com, executor 40): java.lang.IllegalStateException: Error committing version 49464 into HDFSStateStore[id=(op=1,part=7),dir=s3a://some.domain/spark/checkpoints/49/dlr/state/1/7]
              at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:138)
              at org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1$$anonfun$apply$1.apply$mcV$sp(FlatMapGroupsWithStateExec.scala:135)
      
          .....
      

      Now when looking into S3 it indeed looks like this delta file never was created. Instead we have a

       
      s3://some.domain/spark/checkpoints/49/feedback/state/2/11/.36870.delta.02e75d1f-aa48-4c61-9257-a5928b32e22e.TID2469780.tmp
      

      file that I assume is named like that as long as the whole operation is not finished yet. So this file never got renamed to 36870.delta and the application will keep trying to reference it.

      I will have all the relevant redacted logs attached to this report together with ls output of S3 folders and also the metadata file. If any more information is needed then I would be happy to provide it. Would also appreciate on some input on how to best resolve this issue? For now it has happened on 2 separate days and the solution has been to bump the checkpoint.

        Attachments

        1. driver-redacted
          21 kB
          Kaspar Tint
        2. metadata
          0.0 kB
          Kaspar Tint
        3. redacted-offsets
          9 kB
          Kaspar Tint
        4. state-redacted
          49 kB
          Kaspar Tint
        5. worker-redacted
          7 kB
          Kaspar Tint

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              Tint Kaspar Tint
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: