Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8289

KTable<Windowed<String>, Long> can't be suppressed

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 2.1.0, 2.2.0, 2.1.1
    • Fix Version/s: 2.3.0, 2.1.2, 2.2.1
    • Component/s: streams
    • Labels:
      None
    • Environment:
      Broker on a Linux, stream app on my win10 laptop.
      I add one row log.message.timestamp.type=LogAppendTime to my broker's server.properties. stream app all default config.

      Description

      I write a simple stream app followed official developer guide Stream DSL. but I got more than one Window Final Results from a session time window.

      time ticker A -> (4,A) / 25s,

      time ticker B -> (4, B) / 25s  all send to the same topic 

      below is my stream app code 

      kstreams[0]
      .peek((k, v) -> log.info("--> ping, k={},v={}", k, v))
      .groupBy((k, v) -> v, Grouped.with(Serdes.String(), Serdes.String()))
      .windowedBy(SessionWindows.with(Duration.ofSeconds(100)).grace(Duration.ofMillis(20)))
      .count()
      .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
      .toStream().peek((k, v) -> log.info("window={},k={},v={}", k.window(), k.key(), v));
      

      here is my log print

      2019-04-24 20:00:26.142  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:00:47.070  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556106587744, endMs=1556107129191},k=A,v=20
      2019-04-24 20:00:51.071  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:01:16.065  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:01:41.066  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:02:06.069  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:02:31.066  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:02:56.208  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:03:21.070  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:03:46.078  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:04:04.684  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=A
      2019-04-24 20:04:11.069  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:04:19.371  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107226473, endMs=1556107426409},k=B,v=9
      2019-04-24 20:04:19.372  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107445012, endMs=1556107445012},k=A,v=1
      2019-04-24 20:04:29.604  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=A
      2019-04-24 20:04:36.067  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:04:49.715  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107226473, endMs=1556107451397},k=B,v=10
      2019-04-24 20:04:49.716  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107445012, endMs=1556107469935},k=A,v=2
      2019-04-24 20:04:54.593  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=A
      2019-04-24 20:05:01.070  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:05:19.599  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=A
      2019-04-24 20:05:20.045  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107226473, endMs=1556107476398},k=B,v=11
      2019-04-24 20:05:20.047  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107226473, endMs=1556107501398},k=B,v=12
      2019-04-24 20:05:26.075  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:05:44.598  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=A
      2019-04-24 20:05:50.399  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107445012, endMs=1556107519930},k=A,v=4
      2019-04-24 20:05:50.400  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107226473, endMs=1556107526405},k=B,v=13
      2019-04-24 20:05:51.067  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:06:09.595  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=A
      2019-04-24 20:06:16.089  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:06:20.765  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107445012, endMs=1556107544929},k=A,v=5
      2019-04-24 20:06:20.767  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107445012, endMs=1556107569926},k=A,v=6
      2019-04-24 20:06:34.595  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=A
      2019-04-24 20:06:41.063  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:06:51.081  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107226473, endMs=1556107576415},k=B,v=15
      2019-04-24 20:06:51.082  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107445012, endMs=1556107594925},k=A,v=7
      2019-04-24 20:06:59.607  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=A
      2019-04-24 20:07:06.072  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:07:21.440  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107226473, endMs=1556107601391},k=B,v=16
      2019-04-24 20:07:21.441  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107445012, endMs=1556107619935},k=A,v=8
      2019-04-24 20:07:24.596  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=A
      2019-04-24 20:07:31.066  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:07:49.608  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=A
      2019-04-24 20:07:51.775  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107226473, endMs=1556107626420},k=B,v=17
      2019-04-24 20:07:51.777  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107226473, endMs=1556107651396},k=B,v=18
      2019-04-24 20:07:56.064  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:08:14.591  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=A
      2019-04-24 20:08:21.066  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:08:22.125  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107445012, endMs=1556107669943},k=A,v=10
      2019-04-24 20:08:22.126  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107445012, endMs=1556107694921},k=A,v=11
      2019-04-24 20:08:39.619  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=A
      2019-04-24 20:08:46.067  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:08:52.457  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107226473, endMs=1556107701397},k=B,v=20
      2019-04-24 20:08:52.458  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107445012, endMs=1556107719949},k=A,v=12
      2019-04-24 20:09:04.599  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=A
      2019-04-24 20:09:11.066  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : --> ping, k=4,v=B
      2019-04-24 20:09:22.794  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107226473, endMs=1556107726398},k=B,v=21   
      2019-04-24 20:09:22.796  INFO --- [-StreamThread-1] c.g.k.AppStreams                        : window=Window{startMs=1556107445012, endMs=1556107744928},k=A,v=13

       Can‘t a [SessionWindowedKStream] be suppressed after count operation? It seems the latest type record produce a previous type record 'Window Final Results'. I just want get exactly one Window Final Results

      First i just start one time ticker, log print seems ok, when I start the second, then window info print log appeared.

      My source stream record rate is a same v record / 25s, gap of inactivity is 100s, 25<100. I shouldn't got so many window info print log. Session window close and then reopened? my grace is 20 millisecond, it should not be ah. 

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                vvcephei John Roesler
                Reporter:
                xiaoxiaoliner Xiaolin Jia
              • Votes:
                0 Vote for this issue
                Watchers:
                6 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: