Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13261

KTable to KTable foreign key join loose events when using several partitions



    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.8.0, 2.7.1
    • 3.1.0
    • streams


      KIP-775: https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins


      Two incoming streams A and B. 

      Stream A uses a composite key [a, b]

      Stream B has key [b]

      Stream B has 4 partitions and steams A has 1 partition.

      What we try to do is repartition stream A to have 4 partitions too, then put both A and B into KTable and do a foreign key join on from A to B

      When doing this, all messages does not end up in the output topic.

      Repartitioning both to only use 1 partition each solve the problem so it seem like it has something to do with the foreign key join in combination with several partitions. 

      One suspicion would be that it is not possible to define what partitioner to use for the join.

      Any insight or help is greatly appreciated.

      Example code of the problem

      static Topology createTopoology(){
          var builder = new StreamsBuilder();
          KTable<String, String> tableB = builder.table("B",  stringMaterialized("table.b"));
              .stream("A", Consumed.with(Serde.of(KeyA.class), Serde.of(EventA.class)))
              .toTable(Named.as("table.a"), aMaterialized("table.a"))
              .join(tableB, EventA::getKeyB, topicAandBeJoiner(), Named.as("join.ab"), joinMaterialized("join.ab"))
              .to("output", with(...));
          return builder.build();
      private static Materialized<KeyA, EventA> aMaterialized(String name) {
        Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> table = Materialized.as(name);
        return table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
      private static Repartitioned<DriverPeriod, DriverCosts> repartitionTopicA() {
          Repartitioned<DriverPeriod, DriverCosts> repartitioned = Repartitioned.as("driverperiod");
          return repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
      private static StreamPartitioner<DriverPeriod, DriverCosts> topicAPartitioner() {
          return (topic, key, value, numPartitions) -> Math.abs(key.getKeyB().hashCode()) % numPartitions;
      private static Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> joinMaterialized(String name) {
          Materialized<DriverPeriod, DriverCosts, KeyValueStore<Bytes, byte[]>> table = Materialized.as(name);
          return table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));


        1. KafkaTest.java
          4 kB
          Tomas Forsman

        Issue Links



              vcrfxia Victoria Xia
              xnix Tomas Forsman
              Matthias J. Sax Matthias J. Sax
              0 Vote for this issue
              8 Start watching this issue