Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24699

Watermark / Append mode should work with Trigger.Once

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.3.1
    • 2.4.0
    • Structured Streaming
    • None

    Description

      I have a use case where I would like to trigger a structured streaming job from an external scheduler (once every 15 minutes or so) and have it write window aggregates to Kafka.

      I am able to get my code to work when running with `Trigger.ProcessingTime` but when I switch to `Trigger.Once` the watermarking feature of structured streams does not persist to (or is not recollected from) the checkpoint state.

      This causes the stream to never generate output because the watermark is perpetually stuck at `1970-01-01T00:00:00Z`.

      I have created a failing test case in the `EventTimeWatermarkSuite`, I will create a [WIP] pull request on github and link it here.

       

      It seems that even if it generated the watermark, and given the current streaming behavior, I would have to trigger the job twice to generate any output.

       

      The microbatcher only calculates the watermark off of the previous batch's input and emits new aggs based off of that timestamp.

      This state is not available to a newly started `MicroBatchExecution` stream.

      Would it be an appropriate strategy to create a new checkpoint file with the most up to watermark or watermark and query stats?

      Attachments

        1. watermark-once.scala
          2 kB
          Chris Horn
        2. watermark-stream.scala
          2 kB
          Chris Horn

        Activity

          apachespark Apache Spark added a comment -

          User 'c-horn' has created a pull request for this issue:
          https://github.com/apache/spark/pull/21676

          apachespark Apache Spark added a comment - User 'c-horn' has created a pull request for this issue: https://github.com/apache/spark/pull/21676
          chorn Chris Horn added a comment -

          I have attached two scala repl scripts for reproducing this behavior. The "once" variant fails to produce output or updated watermarks, the "stream" variant behaves mostly as expected.

          chorn Chris Horn added a comment - I have attached two scala repl scripts for reproducing this behavior. The "once" variant fails to produce output or updated watermarks, the "stream" variant behaves mostly as expected.
          apachespark Apache Spark added a comment -

          User 'tdas' has created a pull request for this issue:
          https://github.com/apache/spark/pull/21746

          apachespark Apache Spark added a comment - User 'tdas' has created a pull request for this issue: https://github.com/apache/spark/pull/21746
          tdas Tathagata Das added a comment -

          Issue resolved by pull request 21746
          https://github.com/apache/spark/pull/21746

          tdas Tathagata Das added a comment - Issue resolved by pull request 21746 https://github.com/apache/spark/pull/21746

          People

            tdas Tathagata Das
            chorn Chris Horn
            Votes:
            1 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: