Details
-
New Feature
-
Status: Resolved
-
Minor
-
Resolution: Not A Problem
-
None
-
None
-
None
Description
Consider following code:
final KStream<String, String> streamByProfileId = streamsBuilder .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) .selectKey((key, value) -> value); streamByProfileId .groupByKey() .aggregate( () -> 0d, (key, value, aggregate) -> aggregate, Materialized.as("store-1") ); streamByProfileId .groupByKey() .aggregate( () -> 0d, (key, value, aggregate) -> aggregate, Materialized.as("store-2") );
This code will generate following topology:
Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic]) --> KSTREAM-KEY-SELECT-0000000001 Processor: KSTREAM-KEY-SELECT-0000000001 (stores: []) --> KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000008 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-FILTER-0000000004 (stores: []) --> KSTREAM-SINK-0000000003 <-- KSTREAM-KEY-SELECT-0000000001 Processor: KSTREAM-FILTER-0000000008 (stores: []) --> KSTREAM-SINK-0000000007 <-- KSTREAM-KEY-SELECT-0000000001 Sink: KSTREAM-SINK-0000000003 (topic: store-1-repartition) <-- KSTREAM-FILTER-0000000004 Sink: KSTREAM-SINK-0000000007 (topic: store-2-repartition) <-- KSTREAM-FILTER-0000000008 Sub-topology: 1 Source: KSTREAM-SOURCE-0000000005 (topics: [store-1-repartition]) --> KSTREAM-AGGREGATE-0000000002 Processor: KSTREAM-AGGREGATE-0000000002 (stores: [store-1]) --> none <-- KSTREAM-SOURCE-0000000005 Sub-topology: 2 Source: KSTREAM-SOURCE-0000000009 (topics: [store-2-repartition]) --> KSTREAM-AGGREGATE-0000000006 Processor: KSTREAM-AGGREGATE-0000000006 (stores: [store-2]) --> none <-- KSTREAM-SOURCE-0000000009
Kafka Streams creates two repartition topics for each `groupByKey` operation. In this example, two repartition topics are not really necessary and processing can be done with one sub-topology.
Kafka Streams user, in DSL, may specify repartition topic manually using KStream#through method:
final KStream<Object, Object> streamByProfileId = streamsBuilder .stream("input-topic") .selectKey((key, value) -> value) .through("repartition-topic"); streamByProfileId .groupByKey() .aggregate( () -> 0d, (key, value, aggregate) -> aggregate, Materialized.as("store-1") ); streamByProfileId .groupByKey() .aggregate( () -> 0d, (key, value, aggregate) -> aggregate, Materialized.as("store-2") );
Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic]) --> KSTREAM-KEY-SELECT-0000000001 Processor: KSTREAM-KEY-SELECT-0000000001 (stores: []) --> KSTREAM-SINK-0000000002 <-- KSTREAM-SOURCE-0000000000 Sink: KSTREAM-SINK-0000000002 (topic: repartition-topic) <-- KSTREAM-KEY-SELECT-0000000001 Sub-topology: 1 Source: KSTREAM-SOURCE-0000000003 (topics: [repartition-topic]) --> KSTREAM-AGGREGATE-0000000004, KSTREAM-AGGREGATE-0000000005 Processor: KSTREAM-AGGREGATE-0000000004 (stores: [store-1]) --> none <-- KSTREAM-SOURCE-0000000003 Processor: KSTREAM-AGGREGATE-0000000005 (stores: [store-2]) --> none <-- KSTREAM-SOURCE-0000000003
While this gives possibility to optimizes Kafka Streams application, user still has to manually create repartition topic with correct number of partitions based on input topic. It would be great if in DSL we could have something like repartition() operation on KStream which can generate repartition topic based on user command.