Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10494

Streams: enableSendingOldValues should not call parent if node is itself materialized

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 2.7.0
    • Fix Version/s: 2.7.0
    • Component/s: streams
    • Labels:
      None

      Description

      Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is unnecessarily calling `enableSendingOldValues` on the parent, even when the processor itself is materialized. This can force the parent table to be materialized unnecessarily.

      For example:

      StreamsBuilder builder = new StreamsBuilder();builder
           .table("t1", Consumed.of(...))
           .filter(predicate, Materialized.as("t2"))
           .<downStreamOps>
      

      If `downStreamOps` result in a call to `enableSendingOldValues` on the table returned by the `filter` call, i.e. `t2`, then it will result in `t1` being materialized unnecessarily.

      This ticket was raised off the back of comments in a PR] while working on KAFKA-10077.

      A good test that highlights this would be to add this to `KTableFilterTest`:

      @Test
       public void shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic1 = "topic1";
        final KTableImpl<String, Integer, Integer> table1 =
            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
         final KTableImpl<String, Integer, Integer> table2 =
            (KTableImpl<String, Integer, Integer>) table1.filter(predicate, Materialized.as("store2"));
        table2.enableSendingOldValues(false);
        doTestSendingOldValue(builder, table1, table2, topic1);
       }
      

      Though this problem is not restricted to the filter call. Other processor suppliers suffer from the same issue.

      In addition, once https://github.com/apache/kafka/pull/9156 is merged,  if you call enableSendingOldValues without forcing materialization on a table that is itself materialized, but who's upstream is not. In such a situation, the table will not enable sending old values, but should.

       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                BigAndy Andy Coates
                Reporter:
                BigAndy Andy Coates
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: