Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10077

Filter downstream of state-store results in spurious tombstones

Agile BoardAttach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.5.0
    • Fix Version/s: 2.7.0
    • Component/s: streams
    • Labels:
      None

      Description

      Adding a `filter` call downstream of anything that has a state store, e.g. a table source, results in spurious tombstones being emitted from the topology for any key where a new entry doesn't match the filter, even when no previous value existed for the row.

      To put this another way: a filer downstream of a state-store will output a tombstone on an INSERT the doesn't match the filter, when it should only output a tombstone on an UPDATE.

       

      This code shows the problem:

      final StreamsBuilder builder = new StreamsBuilder();
      
      builder
       .table("table", Materialized.with(Serdes.Long(), Serdes.Long()))
       .filter((k, v) -> v % 2 == 0)
       .toStream()
       .to("bob");
      
      final Topology topology = builder.build();
      
      final Properties props = new Properties();
      props.put("application.id", "fred");
      props.put("bootstrap.servers", "who cares");
      
      final TopologyTestDriver driver = new TopologyTestDriver(topology, props);
      
      final TestInputTopic<Long, Long> input = driver
       .createInputTopic("table", Serdes.Long().serializer(), Serdes.Long().serializer());
      
      input.pipeInput(1L, 2L);
      input.pipeInput(1L, 1L);
      input.pipeInput(2L, 1L);
      
      
      final TestOutputTopic<Long, Long> output = driver
       .createOutputTopic("bob", Serdes.Long().deserializer(), Serdes.Long().deserializer());
      
      final List<KeyValue<Long, Long>> keyValues = output.readKeyValuesToList();
      
      // keyValues contains:
      // 1 -> 1
      // 1 -> null <-- correct tombstone: deletes previous row.
      // 2 -> null <-- spurious tombstone: no previous row. 
      

       

      These spurious tombstones can cause a LOT of noise when, for example, the filter is looking for a specific key.  In such a situation, every input record that does not have that key results in a tombstone! meaning there are many more tombstones than useful data.

       I believe the fix is to turn on KTableImpl::enableSendingOldValues for any filter that is downstream of a statestore 

        Attachments

        Issue Links

          Activity

            People

            • Assignee:
              BigAndy Andy Coates
              Reporter:
              BigAndy Andy Coates

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment