Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10077

Filter downstream of state-store results in spurious tombstones



    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.5.0
    • 2.7.0
    • streams
    • None


      Adding a `filter` call downstream of anything that has a state store, e.g. a table source, results in spurious tombstones being emitted from the topology for any key where a new entry doesn't match the filter, even when no previous value existed for the row.

      To put this another way: a filer downstream of a state-store will output a tombstone on an INSERT the doesn't match the filter, when it should only output a tombstone on an UPDATE.


      This code shows the problem:

      final StreamsBuilder builder = new StreamsBuilder();
       .table("table", Materialized.with(Serdes.Long(), Serdes.Long()))
       .filter((k, v) -> v % 2 == 0)
      final Topology topology = builder.build();
      final Properties props = new Properties();
      props.put("application.id", "fred");
      props.put("bootstrap.servers", "who cares");
      final TopologyTestDriver driver = new TopologyTestDriver(topology, props);
      final TestInputTopic<Long, Long> input = driver
       .createInputTopic("table", Serdes.Long().serializer(), Serdes.Long().serializer());
      input.pipeInput(1L, 2L);
      input.pipeInput(1L, 1L);
      input.pipeInput(2L, 1L);
      final TestOutputTopic<Long, Long> output = driver
       .createOutputTopic("bob", Serdes.Long().deserializer(), Serdes.Long().deserializer());
      final List<KeyValue<Long, Long>> keyValues = output.readKeyValuesToList();
      // keyValues contains:
      // 1 -> 1
      // 1 -> null <-- correct tombstone: deletes previous row.
      // 2 -> null <-- spurious tombstone: no previous row. 


      These spurious tombstones can cause a LOT of noise when, for example, the filter is looking for a specific key.  In such a situation, every input record that does not have that key results in a tombstone! meaning there are many more tombstones than useful data.

       I believe the fix is to turn on KTableImpl::enableSendingOldValues for any filter that is downstream of a statestore 


        Issue Links



              BigAndy Andy Coates
              BigAndy Andy Coates
              0 Vote for this issue
              3 Start watching this issue