Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-42566

RocksDB StateStore lock acquisition should happen after getting input iterator from inputRDD

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 3.5.0
    • 3.5.0
    • Structured Streaming
    • None

    Description

      The current behavior of the `compute` method in both `StateStoreRDD` and `ReadStateStoreRDD` is: we first get the state store instance and then get the input iterator for the inputRDD.

      For RocksDB state store, the running task will acquire and hold the lock for this instance. The retried task or speculative task will fail to acquire the lock and eventually abort the job if there are some network issues. For example, When we shrink the executors, the alive one will try to fetch data from the killed ones because it doesn't know the target location (prefetched from the driver) is dead until it tries to fetch data. The query might be hanging for a long time as the executor will retry spark.shuffle.io.maxRetries=3 times and for each retry wait for spark.shuffle.io.connectionTimeout (default value is 120s) before timeout. In total, the task could be hanging for about 6 minutes. And the retried or speculative tasks won't be able to acquire the lock in this period.

      Making lock acquisition happen after retrieving the input iterator should be able to avoid this situation.

      Attachments

        Activity

          People

            huanli.wang Huanli Wang
            huanli.wang Huanli Wang
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: