Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-49829

Issues with optimization on adding input to state store in stream-stream join

    XMLWordPrintableJSON

Details

    Description

      NOTE: The ticket is describing the actual issue after RCA of tests the reporter reported, but I want to still assign the reporter to azera, to give the proper credit. Tests are directly pointing to the edge cases and I could easily spot on the root cause based on the simple reproducers. Thanks azera !

      Report link (user@): https://lists.apache.org/thread/kmq9vlgdbsoytwlcch147n3mlxy56llf

      https://github.com/apache/spark/blob/039fd13eacb1cef835045e3a60cebf958589e1a2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L671-L677

              val isLeftSemiWithMatch =
                joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
              // Add to state store only if both removal predicates do not match,
              // and the row is not matched for left side of left semi join.
              val shouldAddToState =
                !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow) &&
                !isLeftSemiWithMatch 

      The above condition is to determine whether the stream-stream join add the input into state store. The criteria of `both removal predicates do not match` means the input is going to be evicted in this batch - before Spark introduced multiple stateful operators, watermark for late record and watermark for eviction were same, hence the input won't be matched with the condition after filtering out late records (Not sure about the edge case this condition was dealing with.)

      After multiple stateful operators (https://issues.apache.org/jira/browse/SPARK-40925), watermark for late record and watermark for eviction are no longer the same. That said, input can be determined as not late, and can be evicted at the same batch. The above condition has to reflect this change but it was missed, hence having correctness issues on the report.

      SPARK-45637 is a one of the case, which I'll mark as duplicated.

      Attachments

        Issue Links

          Activity

            People

              kabhwan Jungtaek Lim
              azera Andrzej Zera
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: