Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Information Provided
-
2.4.0
-
None
-
None
-
Spark 2.4.0 deployed in standalone-client mode
Checkpointing is done to S3
The Spark application in question is responsible for running 4 different queries
Queries are written using Structured StreamingWe are using the following algorithm for hopes of better performance:
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2" # When the default is 1Spark 2.4.0 deployed in standalone-client mode Checkpointing is done to S3 The Spark application in question is responsible for running 4 different queries Queries are written using Structured Streaming We are using the following algorithm for hopes of better performance: spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2" # When the default is 1
Description
We had an incident where one of our structured streaming queries could not be restarted after an usual S3 checkpointing failure. Now to clarify before everything else - we are aware of the issues with S3 and are working towards moving to HDFS but this will take time. S3 will cause queries to fail quite often during peak hours and we have separate logic to handle this that will attempt to restart the failed queries if possible.
In this particular case we could not restart one of the failed queries. Seems like between detecting a failure in the query and starting it up again something went really wrong with Spark and state in checkpoint folder got corrupted for some reason.
The issue starts with the usual FileNotFoundException that happens with S3
2018-12-10 21:09:25.785 ERROR MicroBatchExecution: Query feedback [id = c074233a-2563-40fc-8036-b5e38e2e2c42, runId = e607eb6e-8431-4269-acab-cc2c1f9f09dd] terminated with error java.io.FileNotFoundException: No such file or directory: s3a://some.domain/spark/checkpoints/49/feedback/offsets/.37 348.8227943f-a848-4af5-b5bf-1fea81775b24.tmp at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088) at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2715) at org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) at org.apache.hadoop.fs.FileContext.rename(FileContext.java:965) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:331) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataL og.scala:126) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:381) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) 2018-12-10 21:09:25.791 WARN InsightsSparkAggregates$: Query feedback terminated with exception, attempting restart
At the last line we claim that a restart will be attempted for the query named feedback. We start the query up and encounter this almost immediately
2018-12-10 21:09:26.870 WARN InsightsSparkAggregates$: Query feedback currently not running, starting query in own scheduling pool 2018-12-10 21:09:51.776 WARN TaskSetManager: Lost task 11.0 in stage 66240.0 (TID 2782264, ec2-52-87-158-48.compute-1.amazonaws.com, executor 29): java.lang.IllegalStateException: Error reading delta file s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta of HDFSStateStoreProvider[id = (op=2,part=11),dir = s3a://some.domain/spark/checkpoints/49/feedback/state/2/11]: s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta does not exist at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:427) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply$mcVJ$sp(HDFSBackedStateStoreProvider.scala:384) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply(HDFSBackedStateStoreProvider.scala:383) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply(HDFSBackedStateStoreProvider.scala:383) at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:383) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:356) at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:535) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap(HDFSBackedStateStoreProvider.scala:356) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:204) at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:371) at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.FileNotFoundException: No such file or directory: s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088) at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699) at org.apache.hadoop.fs.DelegateToFileSystem.open(DelegateToFileSystem.java:190) at org.apache.hadoop.fs.AbstractFileSystem.open(AbstractFileSystem.java:649) at org.apache.hadoop.fs.FileContext$6.next(FileContext.java:802) at org.apache.hadoop.fs.FileContext$6.next(FileContext.java:798) at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) at org.apache.hadoop.fs.FileContext.open(FileContext.java:804) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.open(CheckpointFileManager.scala:322) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:424) ... 28 more
And this will go on for ever until we bump the checkpoint folder name.
2018-12-10 21:09:57.261 WARN TaskSetManager: Lost task 7.0 in stage 66265.0 (TID 2783200, ec2-34-236-156-197.compute-1.amazonaws.com, executor 40): java.lang.IllegalStateException: Error committing version 49464 into HDFSStateStore[id=(op=1,part=7),dir=s3a://some.domain/spark/checkpoints/49/dlr/state/1/7]
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:138)
at org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1$$anonfun$apply$1.apply$mcV$sp(FlatMapGroupsWithStateExec.scala:135)
.....
Now when looking into S3 it indeed looks like this delta file never was created. Instead we have a
s3://some.domain/spark/checkpoints/49/feedback/state/2/11/.36870.delta.02e75d1f-aa48-4c61-9257-a5928b32e22e.TID2469780.tmp
file that I assume is named like that as long as the whole operation is not finished yet. So this file never got renamed to 36870.delta and the application will keep trying to reference it.
I will have all the relevant redacted logs attached to this report together with ls output of S3 folders and also the metadata file. If any more information is needed then I would be happy to provide it. Would also appreciate on some input on how to best resolve this issue? For now it has happened on 2 separate days and the solution has been to bump the checkpoint.