Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7718

Allow customized header inheritance for stateful operators in DSL

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • streams

    Description

      As a follow-up work of https://cwiki.apache.org/confluence/display/KAFKA/KIP-244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API, we want to provide allow users to customize how record headers are inherited while traversing the topology at the DSL layer (at the lower-level Processor API layer, users are already capable for customizing and inheriting the headers as they forward the records to next processor nodes).

      Today the headers are implicitly inherited throughout the topology without any modifications within the Streams library. For stateless operators (filter, map, etc) this default inheritance policy should be sufficient. For stateful operators where multiple input records may be generating a single record (i.e. it is an n:1 transformations rather than 1:1 mapping), since we only inherit from the triggering record, which would seem to be a "random" choice to the users and other records' headers are lost.

      I'd propose we extend DSL to allow users to customize the headers inheritance policy for stateful operators, namely Joins and Aggregations. It would contain two parts:

      1) On the DSL layer, I'd suggest we extend `Joined` and `Grouped` control object with an additional function that allows users to pass in a lambda function (let's say its called HeadersMerger, but name subject to discuss over KIP) that takes two Headers object and generated a single Headers object in the return value.

      2) On the implementation layer, we need to actually store the headers at the materialized state store so that they can be retrieved along with the record for join / aggregation processor. This would be changing the state store value bytes organization and hence better be considered carefully. Then when join / aggregate processor is triggered, the Headers of both records will be retrieved (one from the triggering record, one read from the materialized state store) and then passed to the HeadersMerger. Some low-hanging optimizations can be considered though, e.g. if users do not have overridden this interface, then we can consider not reading the headers from the other side at all to save IO cost.

      Attachments

        Activity

          People

            Unassigned Unassigned
            guozhang Guozhang Wang
            Votes:
            4 Vote for this issue
            Watchers:
            10 Start watching this issue

            Dates

              Created:
              Updated: