Details

    • Type: New Feature
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: beam-model
    • Labels:
      None

      Description

      We still haven't added retractions to Beam, even though they're a core part of the model. We should document all the necessary aspects (uncombine, reverting DoFn output with DoOvers, sink integration, source-level retractions, etc), and then implement them.

        Activity

        Hide
        voidpointer101 Matt Pouttu-Clarke added a comment -

        Apologies in advance for the SQL like code examples but they are most understandable to the general public: https://github.com/LamdaFu/bloklinx/wiki/Semantics-and-Usage

        Show
        voidpointer101 Matt Pouttu-Clarke added a comment - Apologies in advance for the SQL like code examples but they are most understandable to the general public: https://github.com/LamdaFu/bloklinx/wiki/Semantics-and-Usage
        Hide
        voidpointer101 Matt Pouttu-Clarke added a comment -

        With regards versioning structural changes, removing a field is one example as is changing the type of a field. In this case one must replay all relevant history with the change applied AND more importantly quickly identify the root cause of failures related to the structural change.

        With regard to retaining "deleted" data and relationships, the best real example I have are versioned hierarchical structures like zip codes and sales territories. You cannot reject mail because the zip code has changed or moved, and sales people will have a conniption if their numbers change and effect their commissions. Thus in the real world these historical structures remain frozen in time potentially forever even when they are "deleted".

        Show
        voidpointer101 Matt Pouttu-Clarke added a comment - With regards versioning structural changes, removing a field is one example as is changing the type of a field. In this case one must replay all relevant history with the change applied AND more importantly quickly identify the root cause of failures related to the structural change. With regard to retaining "deleted" data and relationships, the best real example I have are versioned hierarchical structures like zip codes and sales territories. You cannot reject mail because the zip code has changed or moved, and sales people will have a conniption if their numbers change and effect their commissions. Thus in the real world these historical structures remain frozen in time potentially forever even when they are "deleted".
        Hide
        voidpointer101 Matt Pouttu-Clarke added a comment -

        Yes agreed it is not clear yet from the docs how this relates directly to Beam. However this is mainly a terminology issue in my perspective. The bespoke systems I have built over the last few years to unify batch and stream processing all rely on data versioning to ensure point-in-session consistency (watermarks) across streams and all data derived from streams such as aggregates, transforms, splits, and replicas.

        There is no hard dependency on a configuration service but it is critical to keep a current water mark and all historical watermarks in a system of record. This could be as simple as a shared file system or as complex as etcd.

        That aside the versioning model I set forward for flatbuffers is an example using more recent technologies. I have done the same with relational tables and Avro in the past.

        I'll work on the examples of how the versioning model feeds aggregate refresh and hopefully it will become more clear.

        Show
        voidpointer101 Matt Pouttu-Clarke added a comment - Yes agreed it is not clear yet from the docs how this relates directly to Beam. However this is mainly a terminology issue in my perspective. The bespoke systems I have built over the last few years to unify batch and stream processing all rely on data versioning to ensure point-in-session consistency (watermarks) across streams and all data derived from streams such as aggregates, transforms, splits, and replicas. There is no hard dependency on a configuration service but it is critical to keep a current water mark and all historical watermarks in a system of record. This could be as simple as a shared file system or as complex as etcd. That aside the versioning model I set forward for flatbuffers is an example using more recent technologies. I have done the same with relational tables and Avro in the past. I'll work on the examples of how the versioning model feeds aggregate refresh and hopefully it will become more clear.
        Hide
        takidau Tyler Akidau added a comment -

        Hi Matt, thanks a lot for writing all that up. My initial comment would be that this sounds like a systems-level approach towards providing general versioning of data that may change over time. That's really cool.

        I think the problem of retractions within Beam is actually much more constrained, though, so I'm not sure we'd need that full level of generality. And as a portable layer, we also generally will need to avoid taking system-level dependencies like zookeeper, etc. Runner can choose to take such dependencies, but the framework itself needs to be very careful about such things, typically formulating them via contracts in the model rather than direct dependencies to actual systems.

        The set of problems we need to tackle with retractions in Beam boils down, I think, to essentially these:

        1. Support remembering the last published panes for a window, so we can retract them in the future. Can be done with Beam's persistent state layer. Will need to support storing multiple previous panes in the case of merging windows like sessions. This is probably pretty straightforward.
        2. Support annotating retraction elements as retractions. This will be some form of metadata on the record. Also relatively straightforward.
        3. Support retraction propagation. This are is progressively more interesting as you go down the list of sub tasks, I think:
        a. Inputs which are retractions always generate outputs which are retractions. So records which are retractions should always be processed independently from normal data, so the DoFn itself need not be aware of whether the data are retractions or not.
        b. For specific use cases, we may want to provide a way for a DoFn to find out if it is processing retractions. But having a concrete use case for this would be nice before committing to do so.
        c. CombineFn will need a retractInput method to complement addInput, which will essentially uncombine the given values from the accumulator.
        d. Sinks will need to be made retraction aware. Dan Halperin may have thoughts here.
        e. Ideally, we would come up with a scheme to make retractions compatible with non-deterministic DoFns. This is expensive in the general case (remember all inputs and their corresponding outputs, so that you can re-produce that output as a retraction when you see a corresponding input). Would be cool if we can come up with something smarter, but I'm not sure what that would be. It may be that we simply need to provide a way to annotate a DoFn as non-deterministic to ensure that the expensive-but-correct mode supporting non-determinism is used.

        Additional things we could consider adding:

        4. Support for publishing retractions directly from sources. This would allow for the input data themselves to be annotated as retractions for use cases where it is known ahead of time that you're retracting a previously provided value.

        Given that, I'd be curious to hear your thoughts on how Bloklinx relates to this. There doesn't seem to be sufficient information in the existing docs for me to do that well, beyond seeing that it appears to solve a similar, but more general problem in a self-contained system.

        One thing you mention above that isn't covered here is retractions in light of structural changes. From the perspective of providing a highly general solution, I see why that makes sense. But I'd be curious to hear your thoughts on real world use cases where that's applicable. That ties into the much larger question of supporting pipeline updates more cleanly within the Beam model itself, which itself is an interesting area to explore in the future. But I've never considered the idea of actually retracting data from portions of the pipeline that have been removed, and I can't immediately come up with use cases where that would be desirable. Any light you could shed here would be appreciated.

        Show
        takidau Tyler Akidau added a comment - Hi Matt, thanks a lot for writing all that up. My initial comment would be that this sounds like a systems-level approach towards providing general versioning of data that may change over time. That's really cool. I think the problem of retractions within Beam is actually much more constrained, though, so I'm not sure we'd need that full level of generality. And as a portable layer, we also generally will need to avoid taking system-level dependencies like zookeeper, etc. Runner can choose to take such dependencies, but the framework itself needs to be very careful about such things, typically formulating them via contracts in the model rather than direct dependencies to actual systems. The set of problems we need to tackle with retractions in Beam boils down, I think, to essentially these: 1. Support remembering the last published panes for a window, so we can retract them in the future. Can be done with Beam's persistent state layer. Will need to support storing multiple previous panes in the case of merging windows like sessions. This is probably pretty straightforward. 2. Support annotating retraction elements as retractions. This will be some form of metadata on the record. Also relatively straightforward. 3. Support retraction propagation. This are is progressively more interesting as you go down the list of sub tasks, I think: a. Inputs which are retractions always generate outputs which are retractions. So records which are retractions should always be processed independently from normal data, so the DoFn itself need not be aware of whether the data are retractions or not. b. For specific use cases, we may want to provide a way for a DoFn to find out if it is processing retractions. But having a concrete use case for this would be nice before committing to do so. c. CombineFn will need a retractInput method to complement addInput, which will essentially uncombine the given values from the accumulator. d. Sinks will need to be made retraction aware. Dan Halperin may have thoughts here. e. Ideally, we would come up with a scheme to make retractions compatible with non-deterministic DoFns. This is expensive in the general case (remember all inputs and their corresponding outputs, so that you can re-produce that output as a retraction when you see a corresponding input). Would be cool if we can come up with something smarter, but I'm not sure what that would be. It may be that we simply need to provide a way to annotate a DoFn as non-deterministic to ensure that the expensive-but-correct mode supporting non-determinism is used. Additional things we could consider adding: 4. Support for publishing retractions directly from sources. This would allow for the input data themselves to be annotated as retractions for use cases where it is known ahead of time that you're retracting a previously provided value. Given that, I'd be curious to hear your thoughts on how Bloklinx relates to this. There doesn't seem to be sufficient information in the existing docs for me to do that well, beyond seeing that it appears to solve a similar, but more general problem in a self-contained system. One thing you mention above that isn't covered here is retractions in light of structural changes. From the perspective of providing a highly general solution, I see why that makes sense. But I'd be curious to hear your thoughts on real world use cases where that's applicable. That ties into the much larger question of supporting pipeline updates more cleanly within the Beam model itself, which itself is an interesting area to explore in the future. But I've never considered the idea of actually retracting data from portions of the pipeline that have been removed, and I can't immediately come up with use cases where that would be desirable. Any light you could shed here would be appreciated.
        Hide
        voidpointer101 Matt Pouttu-Clarke added a comment -

        Added information the the local repo (similar to git local repo): https://github.com/LamdaFu/bloklinx/wiki/Local-Repo

        Working on Bloklinx swarm design next (i.e. what happens during and after a push)

        Show
        voidpointer101 Matt Pouttu-Clarke added a comment - Added information the the local repo (similar to git local repo): https://github.com/LamdaFu/bloklinx/wiki/Local-Repo Working on Bloklinx swarm design next (i.e. what happens during and after a push)
        Hide
        voidpointer101 Matt Pouttu-Clarke added a comment -

        https://github.com/LamdaFu/bloklinx/wiki
        ^^ provides the basic description
        https://github.com/LamdaFu/bloklinx/wiki/Bloklinx-Schema-(flatbuffers)
        ^^ provides definition of the basic mapping to flatbuffer serialization

        I am working on examples of basic versioning as well as branching / merge / change data processing

        The question I get asked most is why an "UPDATE" is composed of a redaction followed by an assertion rather than just one record. The answer is that this provides several huge benefits including very efficient refresh of downstream aggregations, splits, merges, easier data diff, and much easier reconciliation. This will become clear with my subsequent examples to come.

        Show
        voidpointer101 Matt Pouttu-Clarke added a comment - https://github.com/LamdaFu/bloklinx/wiki ^^ provides the basic description https://github.com/LamdaFu/bloklinx/wiki/Bloklinx-Schema-(flatbuffers ) ^^ provides definition of the basic mapping to flatbuffer serialization I am working on examples of basic versioning as well as branching / merge / change data processing The question I get asked most is why an "UPDATE" is composed of a redaction followed by an assertion rather than just one record. The answer is that this provides several huge benefits including very efficient refresh of downstream aggregations, splits, merges, easier data diff, and much easier reconciliation. This will become clear with my subsequent examples to come.
        Hide
        voidpointer101 Matt Pouttu-Clarke added a comment -

        Yes, just wanted to be sure there was interest before documenting it. FYI: it requires some form of distributed configuration service such as etcd or zookeeper to keep track of in-process change sessions. Once the change sessions are done or "committed" (or time out), they are cleared from the config service but can be obtained from logs for later replays. Also, in terms of granularity of change sessions, a large number of change sessions making very small changes can cause problems for the design and should be throttled at the client side. I'll post a link here to the doco

        Show
        voidpointer101 Matt Pouttu-Clarke added a comment - Yes, just wanted to be sure there was interest before documenting it. FYI: it requires some form of distributed configuration service such as etcd or zookeeper to keep track of in-process change sessions. Once the change sessions are done or "committed" (or time out), they are cleared from the config service but can be obtained from logs for later replays. Also, in terms of granularity of change sessions, a large number of change sessions making very small changes can cause problems for the design and should be throttled at the client side. I'll post a link here to the doco
        Hide
        lcwik Luke Cwik added a comment -

        Matt, do you have something you can share that goes into more detail?

        Show
        lcwik Luke Cwik added a comment - Matt, do you have something you can share that goes into more detail?
        Hide
        voidpointer101 Matt Pouttu-Clarke added a comment -

        The design actually solves multiple problems: i.e. BEAM-25, BEAM-91, and BEAM-101 share a common solution.

        Show
        voidpointer101 Matt Pouttu-Clarke added a comment - The design actually solves multiple problems: i.e. BEAM-25 , BEAM-91 , and BEAM-101 share a common solution.
        Hide
        voidpointer101 Matt Pouttu-Clarke added a comment -

        I've solved this problem successfully several times since around 2007. It requires implementing data versioning and treating data much like you would treat code in github. You could also call it streaming with parallel universe support, as some consumers may not want or need your redactions, while others may have critical need of them (much like in the source code world some users do not want immediate "upgrades"). Also, please note that it is just as important to support redacting structural changes as it is to support redacting data changes. I have mature and battle tested designs in this area if there's interest.

        Show
        voidpointer101 Matt Pouttu-Clarke added a comment - I've solved this problem successfully several times since around 2007. It requires implementing data versioning and treating data much like you would treat code in github. You could also call it streaming with parallel universe support, as some consumers may not want or need your redactions, while others may have critical need of them (much like in the source code world some users do not want immediate "upgrades"). Also, please note that it is just as important to support redacting structural changes as it is to support redacting data changes. I have mature and battle tested designs in this area if there's interest.
        Hide
        dhalperi@google.com Daniel Halperin added a comment -

        This is the BEAM version of https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/114 (which is now closed).

        Show
        dhalperi@google.com Daniel Halperin added a comment - This is the BEAM version of https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/114 (which is now closed).
        Hide
        takidau Tyler Akidau added a comment -

        Ooops. I may need a DoOverFn.

        Show
        takidau Tyler Akidau added a comment - Ooops. I may need a DoOverFn.
        Hide
        frances Frances Perry added a comment -

        Did you mean "backsies"?

        Show
        frances Frances Perry added a comment - Did you mean "backsies"?

          People

          • Assignee:
            Unassigned
            Reporter:
            takidau Tyler Akidau
          • Votes:
            9 Vote for this issue
            Watchers:
            19 Start watching this issue

            Dates

            • Created:
              Updated:

              Time Tracking

              Estimated:
              Original Estimate - 672h
              672h
              Remaining:
              Remaining Estimate - 672h
              672h
              Logged:
              Time Spent - Not Specified
              Not Specified

                Development