Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-38320

(flat)MapGroupsWithState can timeout groups which just received inputs in the same microbatch

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      We have identified an issue where the RocksDB state store iterator will not pick up store updates made after its creation. As a result of this, the timeoutProcessorIter in

      https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala

      will not pick up state changes made during newDataProcessorIter input processing. The user observed behavior is that a group state may receive input records and also be called with timeout in the same micro batch. This contradics the public documentation for GroupState -

      https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/GroupState.html

      • The timeout is reset every time the function is called on a group, that is, when the group has new data, or the group has timed out. So the user has to set the timeout duration every time the function is called, otherwise, there will not be any timeout set.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            alex-balikov Alex Balikov
            alex-balikov Alex Balikov
            Jungtaek Lim Jungtaek Lim
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment