Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
2.2.0
-
None
-
None
Description
Running the example
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java gives incorrect output. Specifically, for every input word, it outputs the word twice in update mode, which is incorrect.
This happens because in FlatMapGroupsWithStateExec, when the timeout timestamp is written to the state row, it gets corrupdated. This leads to
1. The state is updated, hence the word is output once.
2. Later, when the timed out states are being processed, the same word is found again because of the corrupdated timeout timestamp. Therefore
Ideally, the state row whose timeout timestamp was updated with T should never get caught in the search for timed out keys (i.e. timeout timestamp < T). But the corruption is returning a different timeout timestamp in the search.
Finally this must be a java bean encoder issue because the exact same query in the Scala example works fine - https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
Attachments
Issue Links
- duplicates
-
SPARK-18598 Encoding a Java Bean with extra accessors, produces inconsistent Dataset, resulting in AssertionError
- Closed