Description
Consider a topology, where the source KTable is followed by a transformValues operation that changes the key schema followed by a foreign key join. The FK join might miss records in such a topology because they might be sent to the wrong partitions.
As transformValues does not change the key itself, repartition won't happen after this operation. However, the KTable instance that calls doJoinOnForeignKey uses the new serde coming from transformValues rather than the original. As a result, all nodes in the FK join topology except for SubscriptionResolverJoinProcessorSupplier use the "new" serde. SubscriptionResolverJoinProcessorSupplier uses the old one because it uses valueGetterSupplier that in turn will retrieve the records from the topic.
A different serializer might serialize keys to different series of bytes, which will lead to sending them to the wrong partitions. To run into that issue, multiple things must happen:
- a topic should have more than one partition,
- KTable's serializer should be modified via a non-key-changing operation,
- the new serializer should serialize keys differently
In practice, it might happen if the key type is a Struct because it serializes to a JSON string columnName -> value. If the transformValues operation changes column names to avoid name clashes with the joining table, such join can lead to incorrect behavior.