Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-20374

Encoder generated using Java beans causes corruption in MapGroupsWithState

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 2.2.0
    • None
    • Structured Streaming
    • None

    Description

      Running the example
      https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java gives incorrect output. Specifically, for every input word, it outputs the word twice in update mode, which is incorrect.

      This happens because in FlatMapGroupsWithStateExec, when the timeout timestamp is written to the state row, it gets corrupdated. This leads to
      1. The state is updated, hence the word is output once.
      2. Later, when the timed out states are being processed, the same word is found again because of the corrupdated timeout timestamp. Therefore

      Ideally, the state row whose timeout timestamp was updated with T should never get caught in the search for timed out keys (i.e. timeout timestamp < T). But the corruption is returning a different timeout timestamp in the search.

      Finally this must be a java bean encoder issue because the exact same query in the Scala example works fine - https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              tdas Tathagata Das
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: