Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13261

KTable to KTable foreign key join loose events when using several partitions

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.8.0, 2.7.1
    • 3.1.0
    • streams

    Description

      KIP-775: https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins

       

      Two incoming streams A and B. 

      Stream A uses a composite key [a, b]

      Stream B has key [b]

      Stream B has 4 partitions and steams A has 1 partition.

      What we try to do is repartition stream A to have 4 partitions too, then put both A and B into KTable and do a foreign key join on from A to B

      When doing this, all messages does not end up in the output topic.

      Repartitioning both to only use 1 partition each solve the problem so it seem like it has something to do with the foreign key join in combination with several partitions. 

      One suspicion would be that it is not possible to define what partitioner to use for the join.

      Any insight or help is greatly appreciated.

      Example code of the problem

      static Topology createTopoology(){
          var builder = new StreamsBuilder();
      
          KTable<String, String> tableB = builder.table("B",  stringMaterialized("table.b"));
      
          builder
              .stream("A", Consumed.with(Serde.of(KeyA.class), Serde.of(EventA.class)))
              .repartition(repartitionTopicA())
              .toTable(Named.as("table.a"), aMaterialized("table.a"))
              .join(tableB, EventA::getKeyB, topicAandBeJoiner(), Named.as("join.ab"), joinMaterialized("join.ab"))
              .toStream()
              .to("output", with(...));
      
          return builder.build();
      }
      
      private static Materialized<KeyA, EventA> aMaterialized(String name) {
        Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> table = Materialized.as(name);
        return table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
      }
      
      private static Repartitioned<DriverPeriod, DriverCosts> repartitionTopicA() {
          Repartitioned<DriverPeriod, DriverCosts> repartitioned = Repartitioned.as("driverperiod");
          return repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
              .withStreamPartitioner(topicAPartitioner())
              .withNumberOfPartitions(4);
      }
      
      private static StreamPartitioner<DriverPeriod, DriverCosts> topicAPartitioner() {
          return (topic, key, value, numPartitions) -> Math.abs(key.getKeyB().hashCode()) % numPartitions;
      }
      
      private static Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> joinMaterialized(String name) {
          Materialized<DriverPeriod, DriverCosts, KeyValueStore<Bytes, byte[]>> table = Materialized.as(name);
          return table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
      }
      
      

      Attachments

        1. KafkaTest.java
          4 kB
          Tomas Forsman

        Issue Links

          Activity

            People

              vcrfxia Victoria Xia
              xnix Tomas Forsman
              Matthias J. Sax Matthias J. Sax
              Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: