Details
-
Sub-task
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
Description
We need to add the implementation of the KTable aggregation operation. We will translate it into two stages in the underlying topology:
Stage One:
1. No stores attached.
2. When receiving the record <K, Change<V>> from the upstream processor, call selector.apply on both Change<V>.newValue and Change<V>.oldValue.
3. Forward the resulted two messages to an intermediate topic (no compaction) with key <agg-key> and value <selected-value, isAdd> where isAdd is a boolean.
Stage Two:
1. Add a K-V store with format <agg-key> : <agg-value> with <K1> ser-de and <T> ser-de.
2. Upon consuming a record from the intermediate topic:
2.1. First try fetch from the store, if not exist call initialValue().
2.2. Based on "isAdd" determine to call add(..) or remove(..).
2.3. Forward the aggregate value periodically based on the emit duration to the sink node with the intermediate topic with key <agg-key> and value Change<agg-value>.