Description
Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is unnecessarily calling `enableSendingOldValues` on the parent, even when the processor itself is materialized. This can force the parent table to be materialized unnecessarily.
For example:
StreamsBuilder builder = new StreamsBuilder();builder .table("t1", Consumed.of(...)) .filter(predicate, Materialized.as("t2")) .<downStreamOps>
If `downStreamOps` result in a call to `enableSendingOldValues` on the table returned by the `filter` call, i.e. `t2`, then it will result in `t1` being materialized unnecessarily.
This ticket was raised off the back of comments in a PR] while working on KAFKA-10077.
A good test that highlights this would be to add this to `KTableFilterTest`:
@Test public void shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize() { final StreamsBuilder builder = new StreamsBuilder(); final String topic1 = "topic1"; final KTableImpl<String, Integer, Integer> table1 = (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed); final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate, Materialized.as("store2")); table2.enableSendingOldValues(false); doTestSendingOldValue(builder, table1, table2, topic1); }
Though this problem is not restricted to the filter call. Other processor suppliers suffer from the same issue.
In addition, once https://github.com/apache/kafka/pull/9156 is merged, if you call enableSendingOldValues without forcing materialization on a table that is itself materialized, but who's upstream is not. In such a situation, the table will not enable sending old values, but should.
Attachments
Issue Links
- is related to
-
KAFKA-10077 Filter downstream of state-store results in spurious tombstones
- Resolved
- links to