Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6398

Non-aggregation KTable generation operator does not construct value getter correctly

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 0.11.0.1, 1.0.0
    • 1.0.1, 1.1.0
    • streams

    Description

      For any operator that generates a KTable, its valueGetterSupplier has three code path:

      1. If the operator is a KTable source operator, using its materialized state store for value getter (note that currently we always materialize on KTable source).
      2. If the operator is an aggregation operator, then its generated KTable should always be materialized so we just use its materialized state store.
      3. Otherwise, we treat the value getter in a per-operator basis.

      For 3) above, what we SHOULD do is that, if the generated KTable is materialized, the value getter would just rely on its materialized state store to get the value; otherwise we just rely on the operator itself to define which parent's value getter to inherit and what computational logic to apply on-the-fly to get the value. For example, for KTable#filter() where the Materialized is not specified, in KTableFilterValueGetter we just get from parent's value getter and then apply the filter on the fly; and in addition we should let the future operators to be able to access its parent's materialized state store via connectProcessorAndStateStore.

      However, current code does not do this correctly: it 1) does not check if the result KTable is materialized or not, but always try to use its parent's value getter, and 2) it does not try to connect its parent's materialized store to the future operator. As a result, these operators such as KTable#filter, KTable#mapValues, and KTable#join(KTable) would result in TopologyException when building. The following is an example:

      ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

      Using a non-materialized KTable in a stream-table join fails:

      final KTable filteredKTable = builder.table("table-topic").filter(...);
      builder.stream("stream-topic").join(filteredKTable,...);
      

      fails with

      org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: StateStore null is not added yet.
      
      	at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021)
      	at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949)
      	at org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621)
      	at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577)
      	at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563)
      

      Adding a store name is not sufficient as workaround but fails differently:

      final KTable filteredKTable = builder.table("table-topic").filter(..., "STORE-NAME");
      builder.stream("stream-topic").join(filteredKTable,...);
      

      error:

      org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-JOIN-0000000005
      
      	at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113)
      	at org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339)
      	at org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153)
      Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Processor KSTREAM-JOIN-0000000005 has no access to StateStore KTABLE-SOURCE-STATE-STORE-0000000000
      	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69)
      	at org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45)
      	at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121)
      	at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44)
      	at org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53)
      	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
      	at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111)
      

      One can workaround by piping the result through a topic:

      final KTable filteredKTable = builder.table("table-topic").filter(...).through("TOPIC");;
      builder.stream("stream-topic").join(filteredKTable,...);
      

      ------------------------------------------------------------------------------------------------------------

      Note that there is another minor orthogonal issue of KTable#filter itself that it does not include its parent's queryable store name when itself is not materialized (see KTable#mapValues for reference).

      Attachments

        Activity

          People

            guozhang Guozhang Wang
            mjsax Matthias J. Sax
            Votes:
            1 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: