Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-49829

Issues with optimization on adding input to state store in stream-stream join

Rank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersWatch issueWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      NOTE: The ticket is describing the actual issue after RCA of tests the reporter reported, but I want to still assign the reporter to Andrzej Zera, to give the proper credit. Tests are directly pointing to the edge cases and I could easily spot on the root cause based on the simple reproducers. Thanks Andrzej Zera !

      Report link (user@): https://lists.apache.org/thread/kmq9vlgdbsoytwlcch147n3mlxy56llf

      https://github.com/apache/spark/blob/039fd13eacb1cef835045e3a60cebf958589e1a2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L671-L677

              val isLeftSemiWithMatch =
                joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
              // Add to state store only if both removal predicates do not match,
              // and the row is not matched for left side of left semi join.
              val shouldAddToState =
                !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow) &&
                !isLeftSemiWithMatch 

      The above condition is to determine whether the stream-stream join add the input into state store. The criteria of `both removal predicates do not match` means the input is going to be evicted in this batch - before Spark introduced multiple stateful operators, watermark for late record and watermark for eviction were same, hence the input won't be matched with the condition after filtering out late records (Not sure about the edge case this condition was dealing with.)

      After multiple stateful operators (https://issues.apache.org/jira/browse/SPARK-40925), watermark for late record and watermark for eviction are no longer the same. That said, input can be determined as not late, and can be evicted at the same batch. The above condition has to reflect this change but it was missed, hence having correctness issues on the report.

      SPARK-45637 is a one of the case, which I'll mark as duplicated.

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            kabhwan Jungtaek Lim
            azera Andrzej Zera
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment