Hi Matt, thanks a lot for writing all that up. My initial comment would be that this sounds like a systems-level approach towards providing general versioning of data that may change over time. That's really cool.
I think the problem of retractions within Beam is actually much more constrained, though, so I'm not sure we'd need that full level of generality. And as a portable layer, we also generally will need to avoid taking system-level dependencies like zookeeper, etc. Runner can choose to take such dependencies, but the framework itself needs to be very careful about such things, typically formulating them via contracts in the model rather than direct dependencies to actual systems.
The set of problems we need to tackle with retractions in Beam boils down, I think, to essentially these:
1. Support remembering the last published panes for a window, so we can retract them in the future. Can be done with Beam's persistent state layer. Will need to support storing multiple previous panes in the case of merging windows like sessions. This is probably pretty straightforward.
2. Support annotating retraction elements as retractions. This will be some form of metadata on the record. Also relatively straightforward.
3. Support retraction propagation. This are is progressively more interesting as you go down the list of sub tasks, I think:
a. Inputs which are retractions always generate outputs which are retractions. So records which are retractions should always be processed independently from normal data, so the DoFn itself need not be aware of whether the data are retractions or not.
b. For specific use cases, we may want to provide a way for a DoFn to find out if it is processing retractions. But having a concrete use case for this would be nice before committing to do so.
c. CombineFn will need a retractInput method to complement addInput, which will essentially uncombine the given values from the accumulator.
d. Sinks will need to be made retraction aware. Dan Halperin may have thoughts here.
e. Ideally, we would come up with a scheme to make retractions compatible with non-deterministic DoFns. This is expensive in the general case (remember all inputs and their corresponding outputs, so that you can re-produce that output as a retraction when you see a corresponding input). Would be cool if we can come up with something smarter, but I'm not sure what that would be. It may be that we simply need to provide a way to annotate a DoFn as non-deterministic to ensure that the expensive-but-correct mode supporting non-determinism is used.
Additional things we could consider adding:
4. Support for publishing retractions directly from sources. This would allow for the input data themselves to be annotated as retractions for use cases where it is known ahead of time that you're retracting a previously provided value.
Given that, I'd be curious to hear your thoughts on how Bloklinx relates to this. There doesn't seem to be sufficient information in the existing docs for me to do that well, beyond seeing that it appears to solve a similar, but more general problem in a self-contained system.
One thing you mention above that isn't covered here is retractions in light of structural changes. From the perspective of providing a highly general solution, I see why that makes sense. But I'd be curious to hear your thoughts on real world use cases where that's applicable. That ties into the much larger question of supporting pipeline updates more cleanly within the Beam model itself, which itself is an interesting area to explore in the future. But I've never considered the idea of actually retracting data from portions of the pipeline that have been removed, and I can't immediately come up with use cases where that would be desirable. Any light you could shed here would be appreciated.