Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Not A Bug
-
2.7.0
-
None
-
None
Description
Libraries from build.sbt:
"org.apache.kafka" % "kafka_2.13" % "2.7.0",
"org.apache.kafka" % "kafka-streams" % "2.7.0",
"org.apache.kafka" % "kafka-clients" % "2.7.0",
"org.apache.kafka" % "kafka-streams-scala_2.13" % "2.7.0",
Feed the Stream "issue_stream" with:
(1->"A")
(1->"B")
Topology:
// #1
val issueStream:KStream[Int,String] = builder.stream[Int,String]("issue_stream")
// #2
val aggTable:KTable[Int,String] =
issueStream
.groupBy((k,v)=>k)
.aggregate[String]("EMPTY")((k,v,agg)=>s"$agg+$v")
// #3
aggTable
.toStream
.print(Printed.toSysOut)
// #4
aggTable.filter((k,v)=> {
println(s"filter($k, $v) at ${System.nanoTime()}")
true
})
.toStream
.print(Printed.toSysOut)
First Tuple: (1->"A")
#3 Output as expected, the aggregated tuple ("EMPTY"+"+A")
[KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A
#4 The filter-method is called twice.
First call with the expected tuple.
filter(1, EMPTY+A) at 211379567071847
The second call with the empty initialized aggregate.
filter(1, EMPTY) at 211379567120375
Output contains the correct tuple
[KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A
Second Tuple: (1->"B")
#3 Output as expected the aggregated tuple ("EMPTY"+"A""+B")
[KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A+B
#4 Again a second unexpected call to filter(...) with the previous tuple before aggregation
First call:
filter(1, EMPTY+A+B) at 211379567498482
Second call:
filter(1, EMPTY+A) at 211379567524475
But the output contains only one tuple as expected
[KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A+B