Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-20928

Continuous Processing Mode for Structured Streaming

    Details

    • Type: Improvement
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.2.0
    • Fix Version/s: None
    • Component/s: Structured Streaming
    • Labels:
      None
    • Target Version/s:

      Description

      Given the current Source API, the minimum possible latency for any record is bounded by the amount of time that it takes to launch a task. This limitation is a result of the fact that getBatch requires us to know both the starting and the ending offset, before any tasks are launched. In the worst case, the end-to-end latency is actually closer to the average batch time + task launching time.

      For applications where latency is more important than exactly-once output however, it would be useful if processing could happen continuously. This would allow us to achieve fully pipelined reading and writing from sources such as Kafka. This kind of architecture would make it possible to process records with end-to-end latencies on the order of 1 ms, rather than the 10-100ms that is possible today.

      One possible architecture here would be to change the Source API to look like the following rough sketch:

        trait Epoch {
          def data: DataFrame
      
          /** The exclusive starting position for `data`. */
          def startOffset: Offset
      
          /** The inclusive ending position for `data`.  Incrementally updated during processing, but not complete until execution of the query plan in `data` is finished. */
          def endOffset: Offset
        }
      
        def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], limits: Limits): Epoch
      

      The above would allow us to build an alternative implementation of StreamExecution that processes continuously with much lower latency and only stops processing when needing to reconfigure the stream (either due to a failure or a user requested change in parallelism.

        Activity

        Hide
        CodingCat Nan Zhu added a comment -

        Hi, is there any description on what does it mean?

        Show
        CodingCat Nan Zhu added a comment - Hi, is there any description on what does it mean?
        Hide
        cody@koeninger.org Cody Koeninger added a comment -

        For jobs that only have narrow stages, I think it should be possible to maintain delivery semantics as well.

        Show
        cody@koeninger.org Cody Koeninger added a comment - For jobs that only have narrow stages, I think it should be possible to maintain delivery semantics as well.
        Hide
        CodingCat Nan Zhu added a comment -

        if I understand correctly the tasks will be "long-term" tasks just like the receiver tasks in receiver-based InputDStream in Spark Streaming?

        Show
        CodingCat Nan Zhu added a comment - if I understand correctly the tasks will be "long-term" tasks just like the receiver tasks in receiver-based InputDStream in Spark Streaming?
        Hide
        wlsc Wladimir Schmidt added a comment -

        I am really looking forward for it! This sub millisecond streaming is really exciting.

        Show
        wlsc Wladimir Schmidt added a comment - I am really looking forward for it! This sub millisecond streaming is really exciting.
        Hide
        cody@koeninger.org Cody Koeninger added a comment -

        This needs an improvement proposal.

        Based on discussions on the mailing list and representations made at Spark Summit, this is already a "work in progress".

        If that's the case, it needs community involvement now, not later, to reduce the danger of Databricks developing something that meets their internal needs but not community needs.

        Show
        cody@koeninger.org Cody Koeninger added a comment - This needs an improvement proposal. Based on discussions on the mailing list and representations made at Spark Summit, this is already a "work in progress". If that's the case, it needs community involvement now, not later, to reduce the danger of Databricks developing something that meets their internal needs but not community needs.
        Hide
        marmbrus Michael Armbrust added a comment -

        Hi Cody, I do plan to flesh this out with the other sections of the SIP document and will email the dev list at that point. All that has been done so far is some basic prototyping to estimate how much work an alternative StreamExecution would take to build, and some experiments to validate the latencies that this arch could achieve. Do you have specific concerns with the proposal as it stands?

        Show
        marmbrus Michael Armbrust added a comment - Hi Cody, I do plan to flesh this out with the other sections of the SIP document and will email the dev list at that point. All that has been done so far is some basic prototyping to estimate how much work an alternative StreamExecution would take to build, and some experiments to validate the latencies that this arch could achieve. Do you have specific concerns with the proposal as it stands?
        Hide
        cody@koeninger.org Cody Koeninger added a comment -

        Cool, can you label it SPIP so it shows up linked from http://spark.apache.org/improvement-proposals.html

        My only concern so far was the one I mentioned already, namely that it seems like you shouldn't have to give up exactly-once delivery semantics in all cases.

        Show
        cody@koeninger.org Cody Koeninger added a comment - Cool, can you label it SPIP so it shows up linked from http://spark.apache.org/improvement-proposals.html My only concern so far was the one I mentioned already, namely that it seems like you shouldn't have to give up exactly-once delivery semantics in all cases.
        Hide
        ibobak Ihor Bobak added a comment -

        Dear All, on this video https://www.youtube.com/watch?v=qAZ5XUz32yM Michael told "I am switching to new mode of continuous processing". So, as I understand, it is already done, or almost done.

        Can anyone give a hint of how to switch on this mode? Thanks

        Show
        ibobak Ihor Bobak added a comment - Dear All, on this video https://www.youtube.com/watch?v=qAZ5XUz32yM Michael told "I am switching to new mode of continuous processing". So, as I understand, it is already done, or almost done. Can anyone give a hint of how to switch on this mode? Thanks
        Hide
        TomaszGaweda Tomasz Gawęda added a comment -

        Michael Armbrust Is it still planned for 2.3.0?

        Show
        TomaszGaweda Tomasz Gawęda added a comment - Michael Armbrust Is it still planned for 2.3.0?
        Hide
        marmbrus Michael Armbrust added a comment - - edited

        Hey everyone, thanks for your interest in this feature! I'm still targeting Spark 2.3, but unfortunately have been busy with other things since the summit. Will post more details on the design as soon as we have them! The Spark summit demo just showed a hacked-together prototype, but we need to do more to figure out how to best integrate it into Spark.

        Show
        marmbrus Michael Armbrust added a comment - - edited Hey everyone, thanks for your interest in this feature! I'm still targeting Spark 2.3, but unfortunately have been busy with other things since the summit. Will post more details on the design as soon as we have them! The Spark summit demo just showed a hacked-together prototype, but we need to do more to figure out how to best integrate it into Spark.

          People

          • Assignee:
            Unassigned
            Reporter:
            marmbrus Michael Armbrust
          • Votes:
            13 Vote for this issue
            Watchers:
            73 Start watching this issue

            Dates

            • Created:
              Updated:

              Development