Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8769

Consider computing stream time independently per key



    • Type: Improvement
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: streams


      Currently, Streams uses a concept of "stream time", which is computed as the highest timestamp observed by stateful operators, per partition. This concept of time backs grace period, retention time, and suppression.

      For use cases in which data is produced to topics in roughly chronological order (as in db change capture), this reckoning is fine.

      Some use cases have a different pattern, though. For example, in IOT applications, it's common for sensors to save up quite a bit of data and then dump it all at once into the topic. See https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware for a concrete example of the use case.

      I have heard of cases where each sensor dumps 24 hours' worth of data at a time into the topic. This results in a pattern in which, when reading a single partition, the operators observe a lot of consecutive records for one key that increase in timestamp for 24 hours, then a bunch of consecutive records for another key that are also increasing in timestamp over the same 24 hour period. With our current stream-time definition, this means that the partition's stream time increases while reading the first key's data, but then stays paused while reading the second key's data, since the second batch of records all have timestamps in the "past".


      A@t0 (stream time: 0)
      A@t1 (stream time: 1)
      A@t2 (stream time: 2)
      A@t3 (stream time: 3)
      B@t0 (stream time: 3)
      B@t1 (stream time: 3)
      B@t2 (stream time: 3)
      B@t3 (stream time: 3)

      This pattern results in an unfortunate compromise in which folks are required to set the grace period to the max expected time skew, for example 24 hours, or Streams will just drop the second key's data (since it is late). But, this means that if they want to use Suppression for "final results", they have to wait 24 hours for the result.

      This tradeoff is not strictly necessary, though, because each key represents a logically independent sequence of events. Tracking by partition is simply convenient, but typically not logically meaningful. That is, the partitions are just physically independent sequences of events, so it's convenient to track stream time at this granularity. It would be just as correct, and more useful for IOT-like use cases, to track time independently for each key.

      However, before considering this change, we need to solve the testing/low-traffic problem. This is the opposite issue, where a partition doesn't get enough traffic to advance stream time and results remain "stuck" in the suppression buffers. We can provide some mechanism to force the advancement of time across all partitions, for use in testing when you want to flush out all results, or in production when some topic is low volume. We shouldn't consider tracking time more granularly until this problem is solved, since it would just make the low-traffic problem worse.




            • Assignee:
              vvcephei John Roesler
            • Votes:
              1 Vote for this issue
              11 Start watching this issue


              • Created: