Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9921

Caching is not working properly with WindowStateStore when retaining duplicates

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.5.0
    • Fix Version/s: 2.6.0, 2.5.1
    • Component/s: streams
    • Labels:
      None

      Description

      I'm using the current latest version 2.5.0 but this is not something new.

      I have WindowStateStore configured as following (where true stands for the retainDuplicates paramter):
      builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, retentionPeriod, windowSize, true), keySerde, valueSerde).withCachingEnabled())

      If I put 4 key-value pairs with the same key and values 1, 2, 3, 4 in that order when reading them through the iterator I'll get the values 4, 2, 3, 4.
      I've done a bit of investigation myself and the problem is that the whole caching feature is written without consideration of the case where duplicates are retained.

      The observed behavior is due to having the last value in the cache (and it can have only one since it's not aware of the retain duplicates option) and it is read first (while skipping the first from the RocksDB iterator even though the values are different). This can be observed (for version 2.5.0) in AbstractMergedSortedCacheStoreIterator#next() lines 95-97. Then the next 3 values are read from the RocksDB iterator so they are as expected.

      As I said, the whole feature is not considering the retainDuplicates option so there are other examples of incorrect behavior like in AbstractMergedSortedCacheStoreIterator#peekNextKey() - for each call, you would skip one duplicate entry in the RocksDB iterator for the given key.

      In my use case, I want to persist a list of values for a given key without increasing the complexity to linear for a single event (which would be the case if I was always reading the current list appending one value and writing it back). So I go for List<KeyValuePair<K, V>> instead of KeyValuePair<K, List<V>>. The whole use case is more complex than that so I use #transformValues and state stores.

      So as an impact I can't use caching on my state stores. For others - they'll have incorrect behavior that may take a lot of time to be discovered and even more time to fix the results.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                ableegoldman A. Sophie Blee-Goldman
                Reporter:
                georgi.petkov Georgi Petkov
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: