Description
If I execute the following Streams app once with two input topics each with 1 partition and then with input topics each with two partitions, I get different results.
final KStream<String, String> leftSide = builder.stream(leftSideTopic); final KStream<String, String> rightSide = builder.stream(rightSideTopic); final KStream<String, String> leftAndRight = leftSide.outerJoin( rightSide, (leftValue, rightValue) -> (rightValue == null) ? leftValue + "/NOTPRESENT": leftValue + "/" + rightValue, JoinWindows.ofTimeDifferenceAndGrace( Duration.ofSeconds(20), Duration.ofSeconds(10)), StreamJoined.with( Serdes.String(), /* key */ Serdes.String(), /* left value */ Serdes.String() /* right value */ )); leftAndRight.print(Printed.toSysOut());
To reproduce, produce twice the following batch of records with an interval greater than window + grace period (i.e. > 30 seconds) in between the two batches:
(0, 0) (1, 1) (2, 2) (3, 3) (4, 4) (5, 5) (6, 6) (7, 7) (8, 8) (9, 9)
With input topics with 1 partition I get:
[KSTREAM-PROCESSVALUES-0000000008]: 0, 0/NOTPRESENT [KSTREAM-PROCESSVALUES-0000000008]: 1, 1/NOTPRESENT [KSTREAM-PROCESSVALUES-0000000008]: 2, 2/NOTPRESENT [KSTREAM-PROCESSVALUES-0000000008]: 3, 3/NOTPRESENT [KSTREAM-PROCESSVALUES-0000000008]: 4, 4/NOTPRESENT [KSTREAM-PROCESSVALUES-0000000008]: 5, 5/NOTPRESENT [KSTREAM-PROCESSVALUES-0000000008]: 6, 6/NOTPRESENT [KSTREAM-PROCESSVALUES-0000000008]: 7, 7/NOTPRESENT [KSTREAM-PROCESSVALUES-0000000008]: 8, 8/NOTPRESENT [KSTREAM-PROCESSVALUES-0000000008]: 9, 9/NOTPRESENT
With input topics with 2 partitions I get:
[KSTREAM-PROCESSVALUES-0000000008]: 1, 1/NOTPRESENT [KSTREAM-PROCESSVALUES-0000000008]: 3, 3/NOTPRESENT [KSTREAM-PROCESSVALUES-0000000008]: 4, 4/NOTPRESENT [KSTREAM-PROCESSVALUES-0000000008]: 7, 7/NOTPRESENT [KSTREAM-PROCESSVALUES-0000000008]: 8, 8/NOTPRESENT [KSTREAM-PROCESSVALUES-0000000008]: 9, 9/NOTPRESENT
I would expect to get the same set of records, maybe in a different order due to the partitioning.