Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10844

groupBy without shuffling

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.6.0
    • None
    • streams

    Description

      The idea is to give a way to keep the current partitioning while doing a groupBy.

      Our use-case is the following:
      We process device data (stream is partitioned by device-id), each device produces several metrics. We want to aggregate by metric, so currently we do a

       selectKey( ... => (device, metric)).groupByKey.windowedBy(...).aggregate(...)  

      This shuffles the data around, but it's not necessary, each (device, metric) group could stay in the original partition.

      This is not only an optimization question. We are experiencing invalid aggregations when reprocessing history. In these reprocessing, we frequently see some tasks moving faster on some partitions. This causes problems with event-time: Lets' say data for device d1 is in partition p1 and stream-time t1, and device d2 / partition p2 / time t2.
      Now, if I re-key by (device, metric), records from both devices could have the same hash-key and land in the same partition. And if t2 is far ahead of t1, then all time-windows for t1 get expired at once.

      Maybe I miss some way of doing this with the existing API, please let me know. Currently, I manually repartition and specify a custom partitioner, but it's tedious.

      If I were to rewrite the aggregations manually with Transformer API, I would use (device, key) for my state store key, without changing the record key.

       

      (poke vvcephei following our discussion on users ml)

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              mathieude Mathieu DESPRIEE
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated: