Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-12564

KTable#filter-method called twice after aggregation

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Bug
    • 2.7.0
    • None
    • streams
    • None

    Description

      Libraries from build.sbt:

      "org.apache.kafka" % "kafka_2.13" % "2.7.0",

      "org.apache.kafka" % "kafka-streams" % "2.7.0",

      "org.apache.kafka" % "kafka-clients" % "2.7.0",

      "org.apache.kafka" % "kafka-streams-scala_2.13" % "2.7.0",

       

      Feed the Stream "issue_stream" with:

      (1->"A")
      (1->"B")

       

      Topology:

      // #1
      val issueStream:KStream[Int,String] = builder.stream[Int,String]("issue_stream")

       

      // #2
      val aggTable:KTable[Int,String] =
      issueStream
      .groupBy((k,v)=>k)
      .aggregate[String]("EMPTY")((k,v,agg)=>s"$agg+$v")

       

      // #3
      aggTable
      .toStream
      .print(Printed.toSysOut)

       

      // #4
      aggTable.filter((k,v)=> {
        println(s"filter($k, $v) at ${System.nanoTime()}")
        true
      })
      .toStream
      .print(Printed.toSysOut)

       

      First Tuple: (1->"A")

      #3 Output as expected, the aggregated tuple ("EMPTY"+"+A")

      [KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A

       

      #4 The filter-method is called twice.
      First call with the expected tuple.

      filter(1, EMPTY+A) at 211379567071847

      The second call with the empty initialized aggregate.

      filter(1, EMPTY) at 211379567120375

      Output contains the correct tuple

      [KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A

       

      Second Tuple: (1->"B")

      #3 Output as expected the aggregated tuple ("EMPTY"+"A""+B")

      [KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A+B

      #4 Again a second unexpected call to filter(...) with the previous tuple before aggregation
      First call:

      filter(1, EMPTY+A+B) at 211379567498482

      Second call:

      filter(1, EMPTY+A) at 211379567524475

      But the output contains only one tuple as expected

      [KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A+B

      Attachments

        Activity

          People

            Unassigned Unassigned
            jessorlisa Jess J.
            Votes:
            1 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: