Description
Two incoming streams A and B.
Stream A uses a composite key [a, b]
Stream B has key [b]
Stream B has 4 partitions and steams A has 1 partition.
What we try to do is repartition stream A to have 4 partitions too, then put both A and B into KTable and do a foreign key join on from A to B
When doing this, all messages does not end up in the output topic.
Repartitioning both to only use 1 partition each solve the problem so it seem like it has something to do with the foreign key join in combination with several partitions.
One suspicion would be that it is not possible to define what partitioner to use for the join.
Any insight or help is greatly appreciated.
Example code of the problem
static Topology createTopoology(){ var builder = new StreamsBuilder(); KTable<String, String> tableB = builder.table("B", stringMaterialized("table.b")); builder .stream("A", Consumed.with(Serde.of(KeyA.class), Serde.of(EventA.class))) .repartition(repartitionTopicA()) .toTable(Named.as("table.a"), aMaterialized("table.a")) .join(tableB, EventA::getKeyB, topicAandBeJoiner(), Named.as("join.ab"), joinMaterialized("join.ab")) .toStream() .to("output", with(...)); return builder.build(); } private static Materialized<KeyA, EventA> aMaterialized(String name) { Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> table = Materialized.as(name); return table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); } private static Repartitioned<DriverPeriod, DriverCosts> repartitionTopicA() { Repartitioned<DriverPeriod, DriverCosts> repartitioned = Repartitioned.as("driverperiod"); return repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)) .withStreamPartitioner(topicAPartitioner()) .withNumberOfPartitions(4); } private static StreamPartitioner<DriverPeriod, DriverCosts> topicAPartitioner() { return (topic, key, value, numPartitions) -> Math.abs(key.getKeyB().hashCode()) % numPartitions; } private static Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> joinMaterialized(String name) { Materialized<DriverPeriod, DriverCosts, KeyValueStore<Bytes, byte[]>> table = Materialized.as(name); return table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); }
Attachments
Attachments
Issue Links
- contains
-
KAFKA-13268 Add more integration tests for Table Table FK joins with repartitioning
- Resolved
- relates to
-
KAFKA-13268 Add more integration tests for Table Table FK joins with repartitioning
- Resolved
- links to