Description
The idea is to give a way to keep the current partitioning while doing a groupBy.
Our use-case is the following:
We process device data (stream is partitioned by device-id), each device produces several metrics. We want to aggregate by metric, so currently we do a
selectKey( ... => (device, metric)).groupByKey.windowedBy(...).aggregate(...)
This shuffles the data around, but it's not necessary, each (device, metric) group could stay in the original partition.
This is not only an optimization question. We are experiencing invalid aggregations when reprocessing history. In these reprocessing, we frequently see some tasks moving faster on some partitions. This causes problems with event-time: Lets' say data for device d1 is in partition p1 and stream-time t1, and device d2 / partition p2 / time t2.
Now, if I re-key by (device, metric), records from both devices could have the same hash-key and land in the same partition. And if t2 is far ahead of t1, then all time-windows for t1 get expired at once.
Maybe I miss some way of doing this with the existing API, please let me know. Currently, I manually repartition and specify a custom partitioner, but it's tedious.
If I were to rewrite the aggregations manually with Transformer API, I would use (device, key) for my state store key, without changing the record key.
(poke vvcephei following our discussion on users ml)
KIP-759: https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling
Attachments
Issue Links
- is duplicated by
-
KAFKA-12540 Sub-key support to avoid unnecessary rekey operations with new key is a compound key of the original key + sub-field
- Resolved
- is related to
-
KAFKA-4835 Avoid repartitioning when key change doesn't change partitions
- Open
- links to
- mentioned in
-
Page Loading...