Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.10.0.0
    • Component/s: None
    • Labels:
      None

      Description

      We need to add the implementation of the KTable aggregation operation. We will translate it into two stages in the underlying topology:

      Stage One:
      1. No stores attached.

      2. When receiving the record <K, Change<V>> from the upstream processor, call selector.apply on both Change<V>.newValue and Change<V>.oldValue.

      3. Forward the resulted two messages to an intermediate topic (no compaction) with key <agg-key> and value <selected-value, isAdd> where isAdd is a boolean.

      Stage Two:

      1. Add a K-V store with format <agg-key> : <agg-value> with <K1> ser-de and <T> ser-de.

      2. Upon consuming a record from the intermediate topic:
      2.1. First try fetch from the store, if not exist call initialValue().
      2.2. Based on "isAdd" determine to call add(..) or remove(..).
      2.3. Forward the aggregate value periodically based on the emit duration to the sink node with the intermediate topic with key <agg-key> and value Change<agg-value>.

        Attachments

          Activity

            People

            • Assignee:
              guozhang Guozhang Wang
              Reporter:
              guozhang Guozhang Wang
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: