Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23004

Structured Streaming raise "llegalStateException: Cannot remove after already committed or aborted"

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1, 2.3.0
    • 2.3.1, 2.4.0
    • Structured Streaming
    • None
    • Run on yarn or local both raise the exception.

    Description

      A structured streaming query with a streaming aggregation can throw the following error in rare cases. 

      java.lang.IllegalStateException: Cannot remove after already committed or aborted at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659 ) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233) at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191) at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:80) at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111) at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 18-01-05 13:29:57 WARN TaskSetManager:66 Lost task 68.0 in stage 1933.0 (TID 196171, localhost, executor driver): java.lang.IllegalStateException: Cannot remove after already committed or aborted at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659)

       

      This can happen when the following conditions are accidentally hit. 

      1. Streaming aggregation with aggregation function that is a subset of TypedImperativeAggregation (for example, collect_set, collect_list, percentile, etc.). 
      2. Query running in update mode
      3. After the shuffle, a partition has exactly 128 records. 

      This happens because of the following. 

      1. The StateStoreSaveExec used in streaming aggregations has the following logic when used in update mode.
        1. There is an iterator that reads data from its parent iterator and updates the StateStore.
        2. When the parent iterator is fully consumed (i.e. baseIterator.hasNext returns false) then all state changes are committed by calling StateStore.commit
        3. The implementation of StateStore.commit() in HDFSBackedStateStore does not allow itself to be called twice. However, the logic is such that, if hasNext is called multiple times after baseIterator.hasNext has returned false then each time it will call StateStore.commit.
        4. For most aggregation functions, this is okay because hasNext is only called once. But thats not the case with ImperativeTypedAggregates.
      2. ImperativeTypedAggregates are executed using ObjectHashAggregateExec which will try to use two kinds of hashmaps for aggregations. 
        1. It will first try to use an unsorted hashmap. If the size of the hashmap increases beyond a certain threshold (default 128), then it will switch to using a sorted hashmap. 
        2. The switching logic in ObjectAggregationIterator (used by ObjectHashAggregateExec)  is such that when the number of records matches the threshold (i.e. 128), it will end up calling the iterator.hasNext twice.

      When combined with the above two conditions are combined, it leads to the above error. This latent bug has existed since Spark 2.1 when ObjectHashAggregateExec was introduced in Spark.

       

      Attachments

        Activity

          People

            tdas Tathagata Das
            secfree deprecated-se
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: