Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-12446

Define KGroupedTable#aggregate subtractor + adder order of execution

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • None
    • 3.5.0
    • streams

    Description

      KIP-904: https://cwiki.apache.org/confluence/display/KAFKA/KIP-904%3A+Kafka+Streams+-+Guarantee+subtractor+is+called+before+adder+if+key+has+not+changed 

       

      Currently, when an update is processed by KGroupedTable#aggregate, the subtractor is called first, then the adder. But per the docs the order of execution is not defined (ie. could change in future releases).

      https://kafka.apache.org/26/documentation/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-aggregating

      When subsequent non-null values are received for a key (e.g., UPDATE), then (1) the subtractor is called with the old value as stored in the table and (2) the adder is called with the new value of the input record that was just received. The order of execution for the subtractor and adder is not defined.

      This ticket proposes making the current order of execution part of the public contract.

      That would allow Kafka Streams DSL users the freedom to use aggregates such as: 

      aggregate(
        HashMap::new,
        (aggKey, newValue, aggValue) -> { aggValue.put(newValue.getKey(), newValue.getValue() }, // adder
        (aggKey, oldValue, aggValue) -> { aggValue.remove(newValue.getKey() } // subtractor
      )

      and handle updates where key remains the same but value changes.

      The Kafka Music Example at

      https://github.com/confluentinc/kafka-streams-examples/blob/6.0.1-post/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java#L345

      relies on the subtractor being called first.

       

      See discussion at https://github.com/confluentinc/kafka-streams-examples/issues/380

      See also the more general point made at https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined 

      If the adder and subtractor are non-commutative operations and the order in which they are executed can vary, you can end up with different results depending on the order of execution of adder and subtractor. An example of a useful non-commutative operation would be something like if we’re aggregating records into a Set: 

      .aggregate[Set[Animal]](Set.empty)(
       adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + animalValue,
       subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - animalValue
      )
      

      In this example, for duplicated events, if the adder is called before the subtractor you would end up removing the value entirely from the set (which would be problematic for most use-cases I imagine).

      As mjsax notes on https://github.com/confluentinc/kafka-streams-examples/issues/380

       

      the implementation used the same order since 0.10.0 release and it was never changed

      so making this behavior part of the standard amounts to making official what has already been stable for a long time.

      Cost:

      •  Limits your options for the future. If you ever needed Kafka Streams to change the order of execution (or make that order indeterminate instead of its current hard coded order), you would have to make that a breaking change.

      Benefit:

      • Encourages wider use of the KGroupedTable#aggregate method (current lack of a defined order prevents using aggregate with non-commutative adder/subtractor functions)
      • Simplifies reasoning about how to use KGroupedTable#aggregate (knowing that a given order can be relied upon makes the method itself easier to understand)

       

       


       

      Attachments

        Activity

          People

            fqpublic Farooq Qaiser
            benissimo Ben Ellis
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: