Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8582

Consider adding an ExpiredWindowRecordHandler to Suppress

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • streams
    • None

    Description

      I got some feedback on Suppress:

      Specifying how to handle events outside the grace period does seem like a business concern, and simply discarding them thus seems risky (for example imagine any situation where money is involved).

      This sort of situation is addressed by the late-triggering approach associated with watermarks (https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102), given this I wondered if you were considering adding anything similar?

      It seems like, if a record has arrived past the grace period for its window, then the state of the windowed aggregation would already have been lost, so if we were to compute an aggregation result, it would be incorrect. Plus, since the window is already expired, we can't store the new (incorrect, but more importantly expired) aggregation result either, so any subsequent super-late records would also face the same blank-slate. I think this would wind up looking like this: if you have three timely records for a window, and then three more that arrive after the grace period, and you were doing a count aggregation, you'd see the counts emitted for the window as [1, 2, 3, 1, 1, 1]. I guess we could add a flag to the post-expiration results to indicate that they're broken, but this seems like the wrong approach. The post-expiration aggregation results are meaningless, but I could see wanting to send the past-expiration input records to a dead-letter queue or something instead of dropping them.

      Along this line of thinking, I wonder if we should add an optional past-expiration record handler interface to the suppression operator. Then, you could define your own logic, whether it's a dead-letter queue, sending it to some alerting pipeline, or even just crashing the application before it can do something wrong. This would be a similar pattern to how we allow custom logic to handle deserialization errors by supplying a org.apache.kafka.streams.errors.DeserializationExceptionHandler.

      Attachments

        Issue Links

          Activity

            People

              ipiddubnyi Igor Piddubnyi
              vvcephei John Roesler
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated: