Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7502

Cleanup KTable materialization logic in a single place

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • None
    • streams
    • None

    Description

      Today since we pre-create all the `KTableXXX` operator along with the logical node, we are effectively duplicating the logic to determine whether the resulted KTable should be materialized. More specifically, the materialization principle today is that:

      1) If users specified Materialized in the DSL and it contains a queryable name. We always materialize.
      2) If users specified Materialized in the DSL but not contains a queryable name, or if users do not specify a Materialized object at all, Streams may choose to materialize or not. But in any cases, even if the KTable is materialized it will not be queryable since there's no queryable name (i.e. only storeName is not null, but queryableName is null):
      2.a) If the resulted KTable is from an aggregation, we always materialize since it is needed for storing the aggregation (i.e. we use the MaterializedInternal constructor with nameProvider != null).
      2.b) If the resulted KTable is from a source topic, we delay the materialization until the downstream operator requires this KTable to be materialized or send-old-values (see `KTableSourceNode` and `KTableSource`).
      2.c) If the resulted KTable if from a join, we always materialize if users creates a Materialized object even without a queryable name. However this can be optimized similar to 2.b) but is orthogonal to this ticket (see `KTableImpl#buildJoin` where we always use constructor with nameProvider != null).
      2.d) If the resulted KTable is from a stateless operation like filter / mapValues, we never materialize.

      ------------

      Now, in all of these cases, we have logical node like "KTableKTableJoinNode", as well as physical node like `ProcessorNode`. Ideally we should always create the logical Plan (i.e. the StreamsGraph), and then optimize it if necessary, and then generate the physical plan (i.e. the Topology), however today we create some physical nodes beforehand, and the above logic is hence duplicated in the creation of both physical nodes and logical nodes. For example, in `KTableKTableJoinNode` we check if Materialized is null for adding a state store, and in `KTableImpl#doJoin` we check if materialized is specified (case 2.c) above).

      Another example is in TableProcessorNode which is used for 2.d) above, in which it includes the logic whereas its caller, `KTableImpl#doFilter` for example, also contains the logic when deciding to pass `queryableName` parameter to `KTableProcessorSupplier`.

      This is bug-vulnerable since we may update the logic in one class but forgot to update the other class.

      --------------

      What we want to have is a cleaner code path similar to what we have for 2.b), such that when creating the logical nodes we keep track of whether 1) materialized is specified, and 2) queryable name is provided. And during optimization phase, we may change the inner physical ProcessorBuilder's parameters like queryable name etc, and then when it is time to generate the physical node, we can just blindly take the parameters and go for it.

      Attachments

        Activity

          People

            dongjin Dongjin Lee
            guozhang Guozhang Wang
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: