XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 0.10.0.0
    • None
    • None

    Description

      We need to add the implementation of the KTable aggregation operation. We will translate it into two stages in the underlying topology:

      Stage One:
      1. No stores attached.

      2. When receiving the record <K, Change<V>> from the upstream processor, call selector.apply on both Change<V>.newValue and Change<V>.oldValue.

      3. Forward the resulted two messages to an intermediate topic (no compaction) with key <agg-key> and value <selected-value, isAdd> where isAdd is a boolean.

      Stage Two:

      1. Add a K-V store with format <agg-key> : <agg-value> with <K1> ser-de and <T> ser-de.

      2. Upon consuming a record from the intermediate topic:
      2.1. First try fetch from the store, if not exist call initialValue().
      2.2. Based on "isAdd" determine to call add(..) or remove(..).
      2.3. Forward the aggregate value periodically based on the emit duration to the sink node with the intermediate topic with key <agg-key> and value Change<agg-value>.

      Attachments

        Activity

          People

            guozhang Guozhang Wang
            guozhang Guozhang Wang
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: