Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 2.1.0
    • Component/s: streams
    • Labels:

      Description

      Consider the following DSL:

      Stream<String, String> source = builder.stream(Serdes.String(), Serdes.String(), "topic1");
      Stream<String, String> mapped = source.map(..);
      
              KTable<String, Long> counts = mapped
                      .groupByKey()
                      .count("Counts");
      
              KStream<String, String> sink = mapped.leftJoin(counts, ..);
      

      The resulted topology looks like this:

      ProcessorTopology:
      				KSTREAM-SOURCE-0000000000:
      					topics:		[topic1]
      					children:	[KSTREAM-MAP-0000000001]
      				KSTREAM-MAP-0000000001:
      					children:	[KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000007]
      				KSTREAM-FILTER-0000000004:
      					children:	[KSTREAM-SINK-0000000003]
      				KSTREAM-SINK-0000000003:
      					topic:		X-Counts-repartition
      				KSTREAM-FILTER-0000000007:
      					children:	[KSTREAM-SINK-0000000006]
      				KSTREAM-SINK-0000000006:
      					topic:		X-KSTREAM-MAP-0000000001-repartition
      
      ProcessorTopology:
      				KSTREAM-SOURCE-0000000008:
      					topics:		[X-KSTREAM-MAP-0000000001-repartition]
      					children:	[KSTREAM-LEFTJOIN-0000000009]
      				KSTREAM-LEFTJOIN-0000000009:
      					states:		[Counts]
      				KSTREAM-SOURCE-0000000005:
      					topics:		[X-Counts-repartition]
      					children:	[KSTREAM-AGGREGATE-0000000002]
      				KSTREAM-AGGREGATE-0000000002:
      					states:		[Counts]
      

      I.e. there are two repartition topics, one for the aggregate and one for the join, which not only introduce unnecessary overheads but also mess up the processing ordering (users are expecting each record to go through aggregation first then the join operator). And in order to get the following simpler topology users today need to add a through operator after map manually to enforce repartitioning.

      Stream<String, String> source = builder.stream(Serdes.String(), Serdes.String(), "topic1");
      Stream<String, String> repartitioned = source.map(..).through("topic2");
      
              KTable<String, Long> counts = repartitioned
                      .groupByKey()
                      .count("Counts");
      
              KStream<String, String> sink = repartitioned.leftJoin(counts, ..);
      

      The resulted topology then will look like this:

      ProcessorTopology:
      				KSTREAM-SOURCE-0000000000:
      					topics:		[topic1]
      					children:	[KSTREAM-MAP-0000000001]
      				KSTREAM-MAP-0000000001:
      					children:	[KSTREAM-SINK-0000000002]
      				KSTREAM-SINK-0000000002:
      					topic:		topic 2
      
      ProcessorTopology:
      				KSTREAM-SOURCE-0000000003:
      					topics:		[topic 2]
      					children:	[KSTREAM-AGGREGATE-0000000004, KSTREAM-LEFTJOIN-0000000005]
      				KSTREAM-AGGREGATE-0000000004:
      					states:		[Counts]
      				KSTREAM-LEFTJOIN-0000000005:
      					states:		[Counts]
      

      This kind of optimization should be automatic in Streams, which we can consider doing when extending from one-operator-at-a-time translation.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                guozhang Guozhang Wang
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: