Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-19645

structured streaming job restart bug

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Duplicate
    • Affects Version/s: 2.1.0
    • Fix Version/s: None
    • Component/s: Structured Streaming
    • Labels:
      None

      Description

      We are trying to use Structured Streaming in product, however currently there exists a bug refer to the process of streaming job restart.
      The following is the concrete error message:

      Caused by: java.lang.IllegalStateException: Error committing version 2 into HDFSStateStore[id = (op=0, part=136), dir = /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136]
      at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:162)
      at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(StatefulAggregate.scala:173)
      at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(StatefulAggregate.scala:123)
      at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:99)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: java.io.IOException: Failed to rename /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136/temp--5345709896617324284 to /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136/2.delta
      at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:259)
      at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:156)
      ... 14 more

      The bug can be easily reproduced, just modify val metadataRoot = "hdfs://localhost:8020/tmp/checkpoint" in StreamTest and then run the test sort after aggregate in complete mode in StreamingAggregationSuite. The main reason is that when restart streaming job spark will recompute WAL offsets and generate the same hdfs delta file(latest delta file generated before restart and named "currentBatchId.delta") . In my opinion, this is a bug. If you guy consider that this is a bug also, I can fix it.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                guifengleaf@gmail.com Feng Gui
              • Votes:
                3 Vote for this issue
                Watchers:
                8 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: