Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-20928

SPIP: Continuous Processing Mode for Structured Streaming

    Details

    • Type: Improvement
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.2.0
    • Fix Version/s: None
    • Component/s: Structured Streaming
    • Labels:
    • Target Version/s:

      Description

      Given the current Source API, the minimum possible latency for any record is bounded by the amount of time that it takes to launch a task. This limitation is a result of the fact that getBatch requires us to know both the starting and the ending offset, before any tasks are launched. In the worst case, the end-to-end latency is actually closer to the average batch time + task launching time.

      For applications where latency is more important than exactly-once output however, it would be useful if processing could happen continuously. This would allow us to achieve fully pipelined reading and writing from sources such as Kafka. This kind of architecture would make it possible to process records with end-to-end latencies on the order of 1 ms, rather than the 10-100ms that is possible today.

      One possible architecture here would be to change the Source API to look like the following rough sketch:

        trait Epoch {
          def data: DataFrame
      
          /** The exclusive starting position for `data`. */
          def startOffset: Offset
      
          /** The inclusive ending position for `data`.  Incrementally updated during processing, but not complete until execution of the query plan in `data` is finished. */
          def endOffset: Offset
        }
      
        def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], limits: Limits): Epoch
      

      The above would allow us to build an alternative implementation of StreamExecution that processes continuously with much lower latency and only stops processing when needing to reconfigure the stream (either due to a failure or a user requested change in parallelism.

        Activity

        Hide
        XuanYuan Li Yuanjian added a comment -

        Our team discuss on the design sketch in detail, we have some ideas and questions take down below.
        1. Will the Window Operation support in the Continuous Processing Mode?
        Even if we only consider narrow dependencies currently like the design sketch described, the exactly-once assurance may not be accomplished based on current implementation of window and watermark.
        2. Should the EpochIDs aligned in the scenario of not map-only?

        The design can also work with blocking operators, although it’d require the blocking operators to ensure epoch markers from all the partitions have been received by the operator before moving forward to commit.

        is the `blocking operators` means 'operator need shuffle'? We think that only the operator has ordering relation(like window\mapState\sortByKey) need the EpochIDs aligned, others(like groupBy) doesn't.
        3. Also the scenario of many to one(like shuffle and window), should we use a new EpochID in shuffle read stage and window slide out trigger, or use the original EpochIDs batch?

        Show
        XuanYuan Li Yuanjian added a comment - Our team discuss on the design sketch in detail, we have some ideas and questions take down below. 1. Will the Window Operation support in the Continuous Processing Mode? Even if we only consider narrow dependencies currently like the design sketch described, the exactly-once assurance may not be accomplished based on current implementation of window and watermark. 2. Should the EpochIDs aligned in the scenario of not map-only? The design can also work with blocking operators, although it’d require the blocking operators to ensure epoch markers from all the partitions have been received by the operator before moving forward to commit. is the `blocking operators` means 'operator need shuffle'? We think that only the operator has ordering relation(like window\mapState\sortByKey) need the EpochIDs aligned, others(like groupBy) doesn't. 3. Also the scenario of many to one(like shuffle and window), should we use a new EpochID in shuffle read stage and window slide out trigger, or use the original EpochIDs batch?
        Hide
        rxin Reynold Xin added a comment -

        Maybe we can add some information metadata (like a string to string map) that can pass extra information over, in the case of map-only tasks.

        Show
        rxin Reynold Xin added a comment - Maybe we can add some information metadata (like a string to string map) that can pass extra information over, in the case of map-only tasks.
        Hide
        cody@koeninger.org Cody Koeninger added a comment -

        No, it doesn't exist yet as far as I know.

        Reason I ask is that Michael had said on the dev list in September "I
        think that we are going to have to change the Sink API as part of
        SPARK-20928, which is why I linked these tickets together."

        For aggregates, conceptually I think that the minimum and maximum per
        partition kafka offset for any data involved in the aggregate is
        sufficient to identify it. But it seems like map-only is the bigger
        focus here, which is probably fine.

        Show
        cody@koeninger.org Cody Koeninger added a comment - No, it doesn't exist yet as far as I know. Reason I ask is that Michael had said on the dev list in September "I think that we are going to have to change the Sink API as part of SPARK-20928 , which is why I linked these tickets together." For aggregates, conceptually I think that the minimum and maximum per partition kafka offset for any data involved in the aggregate is sufficient to identify it. But it seems like map-only is the bigger focus here, which is probably fine.
        Hide
        rxin Reynold Xin added a comment -

        That doesn't yet exist does it? How would that work for non-map jobs, e.g. an aggregate? That said, if it is for map-only, this can be tweaked to pass the offset ranges in addition to epoch id.

        Show
        rxin Reynold Xin added a comment - That doesn't yet exist does it? How would that work for non-map jobs, e.g. an aggregate? That said, if it is for map-only, this can be tweaked to pass the offset ranges in addition to epoch id.
        Hide
        cody@koeninger.org Cody Koeninger added a comment -

        Can you clarify how this impacts sinks having access to the underlying kafka offsets, e.g. https://issues.apache.org/jira/browse/SPARK-18258

        Show
        cody@koeninger.org Cody Koeninger added a comment - Can you clarify how this impacts sinks having access to the underlying kafka offsets, e.g. https://issues.apache.org/jira/browse/SPARK-18258
        Hide
        rxin Reynold Xin added a comment - - edited

        OK got it - you are basically saying if we can send the offset associated with each record (or a batch of records) to the sink, then the sink can potentially implement some sort of dedup to guarantee idempotency. For most sinks this probably won't work, but if a particular sink offers a way to do it, then end-to-end exactly once can be accomplished.

        Show
        rxin Reynold Xin added a comment - - edited OK got it - you are basically saying if we can send the offset associated with each record (or a batch of records) to the sink, then the sink can potentially implement some sort of dedup to guarantee idempotency. For most sinks this probably won't work, but if a particular sink offers a way to do it, then end-to-end exactly once can be accomplished.
        Hide
        cody@koeninger.org Cody Koeninger added a comment -

        If a given sink is handling a result, why does handling the
        corresponding offset to the result substantially increase overhead?

        Thinking about it in terms of a downstream database, if I'm doing a
        write per result, then the difference between writing (result) and
        writing (result, offset) seems like it should be overshadowed by the
        overall cost of the write.

        In more practical terms, poll() on the kafka consumer is returning a
        batch of pre-fetched messages anyway, not a single message, so one
        should be able to run their straight line map/filter/whatever on the
        batch and then commit results with the last offset.

        Show
        cody@koeninger.org Cody Koeninger added a comment - If a given sink is handling a result, why does handling the corresponding offset to the result substantially increase overhead? Thinking about it in terms of a downstream database, if I'm doing a write per result, then the difference between writing (result) and writing (result, offset) seems like it should be overshadowed by the overall cost of the write. In more practical terms, poll() on the kafka consumer is returning a batch of pre-fetched messages anyway, not a single message, so one should be able to run their straight line map/filter/whatever on the batch and then commit results with the last offset.
        Hide
        rxin Reynold Xin added a comment -

        Isn't there an issue with the overhead of tracking in the sinks? If we need to guarantee exactly once, then each record needs to be committed.

        Show
        rxin Reynold Xin added a comment - Isn't there an issue with the overhead of tracking in the sinks? If we need to guarantee exactly once, then each record needs to be committed.
        Hide
        cody@koeninger.org Cody Koeninger added a comment -

        I was talking about the specific case of jobs with only narrow stages. If there's no shuffle, then it should be sufficient at any given point to record the per-partition offset alongside the result.

        Show
        cody@koeninger.org Cody Koeninger added a comment - I was talking about the specific case of jobs with only narrow stages. If there's no shuffle, then it should be sufficient at any given point to record the per-partition offset alongside the result.
        Hide
        rxin Reynold Xin added a comment -

        Cody Koeninger can you write down your thoughts on how we should maintain exactly once semantics?

        Show
        rxin Reynold Xin added a comment - Cody Koeninger can you write down your thoughts on how we should maintain exactly once semantics?
        Hide
        marmbrus Michael Armbrust added a comment - - edited

        Hey everyone, thanks for your interest in this feature! I'm still targeting Spark 2.3, but unfortunately have been busy with other things since the summit. Will post more details on the design as soon as we have them! The Spark summit demo just showed a hacked-together prototype, but we need to do more to figure out how to best integrate it into Spark.

        Show
        marmbrus Michael Armbrust added a comment - - edited Hey everyone, thanks for your interest in this feature! I'm still targeting Spark 2.3, but unfortunately have been busy with other things since the summit. Will post more details on the design as soon as we have them! The Spark summit demo just showed a hacked-together prototype, but we need to do more to figure out how to best integrate it into Spark.
        Hide
        TomaszGaweda Tomasz Gawęda added a comment -

        Michael Armbrust Is it still planned for 2.3.0?

        Show
        TomaszGaweda Tomasz Gawęda added a comment - Michael Armbrust Is it still planned for 2.3.0?
        Hide
        ibobak Ihor Bobak added a comment -

        Dear All, on this video https://www.youtube.com/watch?v=qAZ5XUz32yM Michael told "I am switching to new mode of continuous processing". So, as I understand, it is already done, or almost done.

        Can anyone give a hint of how to switch on this mode? Thanks

        Show
        ibobak Ihor Bobak added a comment - Dear All, on this video https://www.youtube.com/watch?v=qAZ5XUz32yM Michael told "I am switching to new mode of continuous processing". So, as I understand, it is already done, or almost done. Can anyone give a hint of how to switch on this mode? Thanks
        Hide
        cody@koeninger.org Cody Koeninger added a comment -

        Cool, can you label it SPIP so it shows up linked from http://spark.apache.org/improvement-proposals.html

        My only concern so far was the one I mentioned already, namely that it seems like you shouldn't have to give up exactly-once delivery semantics in all cases.

        Show
        cody@koeninger.org Cody Koeninger added a comment - Cool, can you label it SPIP so it shows up linked from http://spark.apache.org/improvement-proposals.html My only concern so far was the one I mentioned already, namely that it seems like you shouldn't have to give up exactly-once delivery semantics in all cases.
        Hide
        marmbrus Michael Armbrust added a comment -

        Hi Cody, I do plan to flesh this out with the other sections of the SIP document and will email the dev list at that point. All that has been done so far is some basic prototyping to estimate how much work an alternative StreamExecution would take to build, and some experiments to validate the latencies that this arch could achieve. Do you have specific concerns with the proposal as it stands?

        Show
        marmbrus Michael Armbrust added a comment - Hi Cody, I do plan to flesh this out with the other sections of the SIP document and will email the dev list at that point. All that has been done so far is some basic prototyping to estimate how much work an alternative StreamExecution would take to build, and some experiments to validate the latencies that this arch could achieve. Do you have specific concerns with the proposal as it stands?
        Hide
        cody@koeninger.org Cody Koeninger added a comment -

        This needs an improvement proposal.

        Based on discussions on the mailing list and representations made at Spark Summit, this is already a "work in progress".

        If that's the case, it needs community involvement now, not later, to reduce the danger of Databricks developing something that meets their internal needs but not community needs.

        Show
        cody@koeninger.org Cody Koeninger added a comment - This needs an improvement proposal. Based on discussions on the mailing list and representations made at Spark Summit, this is already a "work in progress". If that's the case, it needs community involvement now, not later, to reduce the danger of Databricks developing something that meets their internal needs but not community needs.
        Hide
        wlsc Wladimir Schmidt added a comment -

        I am really looking forward for it! This sub millisecond streaming is really exciting.

        Show
        wlsc Wladimir Schmidt added a comment - I am really looking forward for it! This sub millisecond streaming is really exciting.
        Hide
        CodingCat Nan Zhu added a comment -

        if I understand correctly the tasks will be "long-term" tasks just like the receiver tasks in receiver-based InputDStream in Spark Streaming?

        Show
        CodingCat Nan Zhu added a comment - if I understand correctly the tasks will be "long-term" tasks just like the receiver tasks in receiver-based InputDStream in Spark Streaming?
        Hide
        cody@koeninger.org Cody Koeninger added a comment -

        For jobs that only have narrow stages, I think it should be possible to maintain delivery semantics as well.

        Show
        cody@koeninger.org Cody Koeninger added a comment - For jobs that only have narrow stages, I think it should be possible to maintain delivery semantics as well.
        Hide
        CodingCat Nan Zhu added a comment -

        Hi, is there any description on what does it mean?

        Show
        CodingCat Nan Zhu added a comment - Hi, is there any description on what does it mean?

          People

          • Assignee:
            Unassigned
            Reporter:
            marmbrus Michael Armbrust
          • Votes:
            16 Vote for this issue
            Watchers:
            87 Start watching this issue

            Dates

            • Created:
              Updated:

              Development