Details
-
Bug
-
Status: Patch Available
-
Major
-
Resolution: Unresolved
-
2.8.2, 3.7.0
-
None
-
None
Description
We have two topics : left-topic[String, LeftRecord] and right-topic[String, String]
where LeftRecord :
case class LeftRecord(foreignKey: String, name: String)
we do a simple INNER foreign key join on left-topic's foreignKey field. The resulting join value is the value in right-topic. (same topology example as in KAFKA-16407)
Scenario: Unset foreign key of a primary key
rightTopic.pipeInput("fk1", "1") leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
Actual result
KeyValue(pk1, 3)
Expected result
KeyValue(pk1, 3) KeyValue(pk1, null) // This unsets the join between pk1 and fk1
However, in other cases, where the join result should be unset (e.g. the primary key is deleted, or the foreign key changes to a non existing FK), that record is correctly emitted.
Also, the importance of unsetting the join result is mentioned in the code: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36
//[...] Additionally, propagate null if no FK is found there, // since we must "unset" any output set by the previous FK-join. This is true for both INNER and LEFT join.
Attachments
Issue Links
- links to