Details
-
Bug
-
Status: Patch Available
-
Major
-
Resolution: Unresolved
-
2.8.2, 3.0.2, 3.1.2, 3.2.3, 3.3.2, 3.4.1, 3.5.2, 3.7.0, 3.6.1
-
None
-
None
Description
We have two topics : left-topic[String, LeftRecord] and right-topic[String, String]
where LeftRecord :
case class LeftRecord(foreignKey: String, name: String)
we do a simple INNER foreign key join on left-topic's foreignKey field. The resulting join value is the value in right-topic.
Scenario: Primary key pk1 gets mapped to a new FK after having a null FK
rightTopic.pipeInput("fk", "1") leftTopic.pipeInput("pk1", LeftRecord(null, "pk1")) leftTopic.pipeInput("pk1", LeftRecord("fk", "pk1"))
Expected result
KeyValue(pk1, 1)
Actual result
# No output ! # Logs: 20:14:29,723 WARN org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] topic=[left-topic] partition=[0] offset=[0] 20:14:29,728 WARN org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] topic=[left-topic] partition=[0] offset=[1]
After looking into the code, I believe this is the line behind the issue : https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L147
Attachments
Attachments
Issue Links
- is blocked by
-
KAFKA-16343 Improve tests of streams foreignkeyjoin package
- Resolved
- links to