Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6037 Make Sub-topology Parallelism Tunable
  3. KAFKA-4835

Avoid repartitioning when key change doesn't change partitions

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 0.10.2.0
    • None
    • streams

    Description

      From https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030

      ...it would be good to provide users more control over the repartitioning.
      My use case is as follows (unrelated bits omitted for brevity):

      		KTable<String, Activity> loggedInCustomers = builder
      			.stream("customerLogins")
      			.groupBy((key, activity) -> 
      				activity.getCustomerRef())
      			.reduce((first,second) -> second, loginStore());
      		
      		builder
      			.stream("balanceUpdates")
      			.map((key, activity) -> new KeyValue<>(
      				activity.getCustomerRef(),
      				activity))
      			.join(loggedInCustomers, (activity, session) -> ...
      			.to("sessions");
      

      Both "groupBy" and "map" in the underlying implementation set the repartitionRequired flag (since the key changes), and the aggregation/join that follows will create the repartitioned topic.
      However, in our case I know that both input streams are already partitioned by the customerRef value, which I'm mapping into the key (because it's required by the join operation).
      So there are 2 unnecessary intermediate topics created with their associated overhead, while the ultimate goal is simply to do a join on a value that we already use to partition the original streams anyway.
      (Note, we don't have the option to re-implement the original input streams to make customerRef the message key.)

      I think it would be better to allow the user to decide (from their knowledge of the incoming streams) whether a repartition is mandatory on aggregation and join operations (overloaded version of the methods with the repartitionRequired flag exposed maybe?)
      An alternative would be to allow users to perform a join on a value other than the key (a keyValueMapper parameter to join, like the one used for joins with global tables), but I expect that to be more involved and error-prone to use for people who don't understand the partitioning requirements well (whereas it's safe for global tables).

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              mihbor Michal Borowiecki
              Votes:
              3 Vote for this issue
              Watchers:
              11 Start watching this issue

              Dates

                Created:
                Updated: