Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13678

2nd punctuation using STREAM_TIME does not respect scheduled interval



    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.0.0
    • None
    • streams
    • None


      Scheduling a punctuator using stream time, the first punctuation occurs immediately as documented, but the second one is not triggered at t_schedule + interval but it could happen before that time.
      For example, assume that we schedule a punctuation every 10 sec at timestamp 5 (t5). The system now works like this:

      t5 -> schedule, punctuate, next schedule at t10
      t6 -> no punctuation
      t7 -> no punctuation
      t8 -> no punctuation
      t9 -> no punctuation
      t10 -> punctuate, next schedule at t20

      In this example the 2nd schedule occurs after 5 seconds from the first one, breaking the interval duration.

      From my point of view, a reasonable behaviour could be:

      t5 -> schedule, punctuate, next schedule at t15
      t6 -> no punctuation
      t7 -> no punctuation
      t8 -> no punctuation
      t9 -> no punctuation
      t10 -> no punctuation
      t11 -> no punctuation
      t12 -> no punctuation
      t13 -> no punctuation
      t14 -> no punctuation
      t15 -> punctuate, next schedule at t25

      The origin of this problem can be found in StreamTask.schedule:

      * Schedules a punctuation for the processor
      * @param interval the interval in milliseconds
      * @param type the punctuation type
      * @throws IllegalStateException if the current node is not null
      public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator punctuator) {
         switch (type) {
            case STREAM_TIME:
               // align punctuation to 0L, punctuate as soon as we have data
               return schedule(0L, interval, type, punctuator);
            case WALL_CLOCK_TIME:
               // align punctuation to now, punctuate after interval has elapsed
               return schedule(time.milliseconds() + interval, interval, type, punctuator);
               throw new IllegalArgumentException("Unrecognized PunctuationType: " + type);

      when, in case of stream time, it calls schedule with startTime=0.




            Unassigned Unassigned
            lorenzocagnatel Lorenzo Cagnatel
            0 Vote for this issue
            4 Start watching this issue