Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10494

Streams: enableSendingOldValues should not call parent if node is itself materialized

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 2.7.0
    • 2.7.0
    • streams
    • None

    Description

      Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is unnecessarily calling `enableSendingOldValues` on the parent, even when the processor itself is materialized. This can force the parent table to be materialized unnecessarily.

      For example:

      StreamsBuilder builder = new StreamsBuilder();builder
           .table("t1", Consumed.of(...))
           .filter(predicate, Materialized.as("t2"))
           .<downStreamOps>
      

      If `downStreamOps` result in a call to `enableSendingOldValues` on the table returned by the `filter` call, i.e. `t2`, then it will result in `t1` being materialized unnecessarily.

      This ticket was raised off the back of comments in a PR] while working on KAFKA-10077.

      A good test that highlights this would be to add this to `KTableFilterTest`:

      @Test
       public void shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic1 = "topic1";
        final KTableImpl<String, Integer, Integer> table1 =
            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
         final KTableImpl<String, Integer, Integer> table2 =
            (KTableImpl<String, Integer, Integer>) table1.filter(predicate, Materialized.as("store2"));
        table2.enableSendingOldValues(false);
        doTestSendingOldValue(builder, table1, table2, topic1);
       }
      

      Though this problem is not restricted to the filter call. Other processor suppliers suffer from the same issue.

      In addition, once https://github.com/apache/kafka/pull/9156 is merged,  if you call enableSendingOldValues without forcing materialization on a table that is itself materialized, but who's upstream is not. In such a situation, the table will not enable sending old values, but should.

       

      Attachments

        Issue Links

          Activity

            People

              BigAndy Andy Coates
              BigAndy Andy Coates
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: