Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8413

Add possibility to do repartitioning on KStream

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Resolved
    • Minor
    • Resolution: Not A Problem
    • None
    • None
    • streams
    • None

    Description

      Consider following code:

      final KStream<String, String> streamByProfileId = streamsBuilder
         .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
         .selectKey((key, value) -> value);
      
      streamByProfileId
         .groupByKey()
         .aggregate(
            () -> 0d,
            (key, value, aggregate) -> aggregate,
            Materialized.as("store-1")
         );
      
      streamByProfileId
         .groupByKey()
         .aggregate(
            () -> 0d,
            (key, value, aggregate) -> aggregate,
            Materialized.as("store-2")
         );
      

       

      This code will generate following topology:

      Topologies:
       Sub-topology: 0
       Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
       --> KSTREAM-KEY-SELECT-0000000001
       Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
       --> KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000008
       <-- KSTREAM-SOURCE-0000000000
       Processor: KSTREAM-FILTER-0000000004 (stores: [])
       --> KSTREAM-SINK-0000000003
       <-- KSTREAM-KEY-SELECT-0000000001
       Processor: KSTREAM-FILTER-0000000008 (stores: [])
       --> KSTREAM-SINK-0000000007
       <-- KSTREAM-KEY-SELECT-0000000001
       Sink: KSTREAM-SINK-0000000003 (topic: store-1-repartition)
       <-- KSTREAM-FILTER-0000000004
       Sink: KSTREAM-SINK-0000000007 (topic: store-2-repartition)
       <-- KSTREAM-FILTER-0000000008
      Sub-topology: 1
       Source: KSTREAM-SOURCE-0000000005 (topics: [store-1-repartition])
       --> KSTREAM-AGGREGATE-0000000002
       Processor: KSTREAM-AGGREGATE-0000000002 (stores: [store-1])
       --> none
       <-- KSTREAM-SOURCE-0000000005
      Sub-topology: 2
       Source: KSTREAM-SOURCE-0000000009 (topics: [store-2-repartition])
       --> KSTREAM-AGGREGATE-0000000006
       Processor: KSTREAM-AGGREGATE-0000000006 (stores: [store-2])
       --> none
       <-- KSTREAM-SOURCE-0000000009
       
      

      Kafka Streams creates two repartition topics for each `groupByKey` operation. In this example, two repartition topics are not really necessary and processing can be done with one sub-topology.

       

      Kafka Streams user, in DSL, may specify repartition topic manually using KStream#through method:

      final KStream<Object, Object> streamByProfileId = streamsBuilder
         .stream("input-topic")
         .selectKey((key, value) -> value)
         .through("repartition-topic");
      
      streamByProfileId
         .groupByKey()
         .aggregate(
            () -> 0d,
            (key, value, aggregate) -> aggregate,
            Materialized.as("store-1")
         );
      
      streamByProfileId
         .groupByKey()
         .aggregate(
            () -> 0d,
            (key, value, aggregate) -> aggregate,
            Materialized.as("store-2")
         );
      

       

       

      Topologies:
      Sub-topology: 0
      Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
      --> KSTREAM-KEY-SELECT-0000000001
      Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
      --> KSTREAM-SINK-0000000002
      <-- KSTREAM-SOURCE-0000000000
      Sink: KSTREAM-SINK-0000000002 (topic: repartition-topic)
      <-- KSTREAM-KEY-SELECT-0000000001
      
      Sub-topology: 1
      Source: KSTREAM-SOURCE-0000000003 (topics: [repartition-topic])
      --> KSTREAM-AGGREGATE-0000000004, KSTREAM-AGGREGATE-0000000005
      Processor: KSTREAM-AGGREGATE-0000000004 (stores: [store-1])
      --> none
      <-- KSTREAM-SOURCE-0000000003
      Processor: KSTREAM-AGGREGATE-0000000005 (stores: [store-2])
      --> none
      <-- KSTREAM-SOURCE-0000000003
      

        

      While this gives possibility to optimizes Kafka Streams application, user still has to manually create repartition topic with correct number of partitions based on input topic. It would be great if in DSL we could have something like repartition() operation on KStream which can generate repartition topic based on user command.

      Attachments

        1. topology-2.png
          188 kB
          Levani Kokhreidze
        2. topology-1.png
          291 kB
          Levani Kokhreidze

        Activity

          People

            Unassigned Unassigned
            lkokhreidze Levani Kokhreidze
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: