Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-38033

The structured streaming processing cannot be started because the commitId and offsetId are inconsistent

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.3.0
    • 3.3.0
    • Structured Streaming
    • None

    Description

      Streaming Processing could not start due to an unexpected machine shutdown.

      The exception is as follows

       

      ERROR 22/01/12 02:48:36 MicroBatchExecution: Query streaming_4a026335eafd4bb498ee51752b49f7fb [id = 647ba9e4-16d2-4972-9824-6f9179588806, runId = 92385d5b-f31f-40d0-9ac7-bb7d9796d774] terminated with error
      java.lang.IllegalStateException: batch 113258 doesn't exist
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
              at scala.Option.getOrElse(Option.scala:121)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:255)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:169)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
              at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
              at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
              at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
              at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
              at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
              at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
      
      

      I checked checkpoint file on HDFS and found the latest offset is 113259. But commits is 113257. The following

      commits
      /tmp/streaming_xxxxxxxx/commits/113253
      /tmp/streaming_xxxxxxxx/commits/113254
      /tmp/streaming_xxxxxxxx/commits/113255
      /tmp/streaming_xxxxxxxx/commits/113256
      /tmp/streaming_xxxxxxxx/commits/113257
      
      offset
      /tmp/streaming_xxxxxxxx/offsets/113253
      /tmp/streaming_xxxxxxxx/offsets/113254
      /tmp/streaming_xxxxxxxx/offsets/113255
      /tmp/streaming_xxxxxxxx/offsets/113256
      /tmp/streaming_xxxxxxxx/offsets/113257
      /tmp/streaming_xxxxxxxx/offsets/113259

      Finally, I deleted offsets “/tmp/streaming_xxxxxxxx/offsets/113259” and the program started normally. I think there is a problem here and we should try to handle this exception or give some resolution in the log.

       

       

      Attachments

        Activity

          People

            LeeeeLiu LeeeeLiu
            LeeeeLiu LeeeeLiu
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: