Description
Currently, Streams offers two emission models:
- emit-on-window-close: (using Suppression)
- emit-on-update: (i.e., emit a new result whenever a new record is processed, regardless of whether the result has changed)
There is also an option to drop some intermediate results, either using caching or suppression.
However, there is no support for emit-on-change, in which results would be forwarded only if the result has changed. This has been reported to be extremely valuable as a performance optimizations for some high-traffic applications, and it reduces the computational burden both internally for downstream Streams operations, as well as for external systems that consume the results, and currently have to deal with a lot of "no-op" changes.
It would be pretty straightforward to implement this, by loading the prior results before a stateful operation and comparing with the new result before persisting or forwarding. In many cases, we load the prior result anyway, so it may not be a significant performance impact either.
One design challenge is what to do with timestamps. If we get one record at time 1 that produces a result, and then another at time 2 that produces a no-op, what should be the timestamp of the result, 1 or 2? emit-on-change would require us to say 1.
Clearly, we'd need to do some serious benchmarks to evaluate any potential implementation of emit-on-change.
Another design challenge is to decide if we should just automatically provide emit-on-change for stateful operators, or if it should be configurable. Configuration increases complexity, so unless the performance impact is high, we may just want to change the emission model without a configuration.
Attachments
Issue Links
- causes
-
KAFKA-12508 Emit-on-change tables may lose updates on error or restart in at_least_once
- Resolved
1.
|
Drop idempotent KTable source updates | Resolved | Richard Yu | |
2.
|
Drop idempotent updates for aggregations | Open | Unassigned | |
3.
|
Drop idempotent updates for repartition operations | Open | Unassigned |