Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26411

Streaming: _spark_metadata and checkpoints out of sync cause checkpoint packing failure

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.3.0
    • None
    • Structured Streaming

    Description

      Spark Structured Streaming File source to File sink seems to be picking up information from `_spark_metadata` directory for checkpoint data compaction

      Worst part is that output and checkpoint being out of sync, data is not being written.

      This is not documented anywhere. Removing checkpoint data and leaving _spark_metadata in the output directory WILL CAUSE data loss.

       

      FileSourceScanExec.createNonBucketedReadRDD kicks off compaction and fails the whole job, because it expects deltas to be present.
      But the delta files are never written because FileStreamSink.addBatch doesn't execute the Dataframe that it receives.

      ...
      INFO  [2018-12-17 03:20:02,784] org.apache.spark.sql.execution.streaming.FileStreamSink: Skipping already committed batch 75 
      ...
      INFO [2018-12-17 03:30:01,691] org.apache.spark.sql.execution.streaming.FileStreamSource: Log offset set to 76 with 29 new files INFO [2018-12-17 03:30:01,700] org.apache.spark.sql.execution.streaming.MicroBatchExecution: Committed offsets for batch 76. Metadata OffsetSeqMetadata(0,1545017401691,Map(spark.sql.shuffle.partitions -> 200, spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider)) INFO [2018-12-17 03:30:01,704] org.apache.spark.sql.execution.streaming.FileStreamSource: Processing 29 files from 76:76 INFO [2018-12-17 03:30:01,983] org.apache.spark.sql.execution.datasources.FileSourceStrategy: Pruning directories with: INFO [2018-12-17 03:30:01,983] org.apache.spark.sql.execution.datasources.FileSourceStrategy: Post-Scan Filters: INFO [2018-12-17 03:30:01,984] org.apache.spark.sql.execution.datasources.FileSourceStrategy: Output Data Schema: struct<value: string> INFO [2018-12-17 03:30:01,984] org.apache.spark.sql.execution.FileSourceScanExec: Pushed Filters: INFO [2018-12-17 03:30:02,581] org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Code generated in 16.205011 ms INFO [2018-12-17 03:30:02,593] org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Code generated in 9.368244 ms INFO [2018-12-17 03:30:02,629] org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Code generated in 31.126375 ms INFO [2018-12-17 03:30:02,640] org.apache.spark.SparkContext: Created broadcast 86 from start at SourceStream.scala:55 INFO [2018-12-17 03:30:02,643] org.apache.spark.sql.execution.FileSourceScanExec: Planning scan with bin packing, max size: 14172786 bytes, open cost is considered as scanning 4194304 bytes. INFO [2018-12-17 03:30:02,700] org.apache.spark.ContextCleaner: Cleaned accumulator 4321 INFO [2018-12-17 03:30:02,700] org.apache.spark.ContextCleaner: Cleaned accumulator 4326 INFO [2018-12-17 03:30:02,700] org.apache.spark.ContextCleaner: Cleaned accumulator 4324 INFO [2018-12-17 03:30:02,700] org.apache.spark.ContextCleaner: Cleaned accumulator 4320 INFO [2018-12-17 03:30:02,700] org.apache.spark.ContextCleaner: Cleaned accumulator 4325 INFO [2018-12-17 03:30:02,737] org.apache.spark.SparkContext: Created broadcast 87 from start at SourceStream.scala:55 INFO [2018-12-17 03:30:02,756] org.apache.spark.SparkContext: Starting job: start at SourceStream.scala:55 INFO [2018-12-17 03:30:02,761] org.apache.spark.SparkContext: Created broadcast 88 from broadcast at DAGScheduler.scala:1079 INFO [2018-12-17 03:30:03,860] org.apache.spark.ExecutorAllocationManager: Requesting 3 new executors because tasks are backlogged (new desired total will be 3) INFO [2018-12-17 03:30:04,863] org.apache.spark.ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 4) INFO [2018-12-17 03:30:06,545] org.apache.spark.SparkContext: Created broadcast 89 from broadcast at DAGScheduler.scala:1079 WARN [2018-12-17 03:30:07,214] org.apache.spark.scheduler.TaskSetManager: Lost task 19.0 in stage 87.0 (TID 6145, ip-10-172-18-94.ec2.internal, executor 1): java.lang.IllegalStateException: Error reading delta file hdfs://ip-10-172-19-174.ec2.internal:8020/user/hadoop/clickstream/checkpoint/state/0/19/1.delta of HDFSStateStoreProvider[id = (op=0,part=19),dir = hdfs://ip-10-172-19-174.ec2.internal:8020/user/hadoop/clickstream/checkpoint/state/0/19]: hdfs://ip-10-172-19-174.ec2.internal:8020/user/hadoop/clickstream/checkpoint/state/0/19/1.delta does not exist at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:371) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply$mcVJ$sp(HDFSBackedStateStoreProvider.scala:333) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332) at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
      

       

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            jalexoid Alexander Panzhin
            Votes:
            1 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment