Description
Reported in community Slack by Stanislav Savulchik. I've attached a patch fix, and am waiting for the reporter to submit a PR if they want to since they were the first to identify it!
It affects basically every version of Kafka Streams out there...
Hi everyone,
I’m investigating an unexpected behavior of a KeyValueStore.prefixScan method that sometimes returns previously deleted keys if caching is enabled. Example pseudocode:
val keyPrefixSerializer: Serializer[Int] = ??? // 4 bytes big endian val store: KeyValueStore[(Int, String), String] = ??? // store contents // (1, "A") -> "A" // (1, "B") -> "B" // using put instead of delete to avoid reading previous value store.put((1, "B"), null) // reading all key value pairs using key prefix val result: List[KeyValue[(Int, String), String]] = store.prefixScan(1, keyPrefixSerializer).asScala.toList // expected result // (1, "A") -> "A" // actual result // (1, "A") -> "A" // (1, "B") -> "B" (was previously deleted, but returned by the iterator)
I tried to come up with a unit test for MergedSortedCacheKeyValueBytesStoreIterator (returned by KeyValueStore.prefixScan and other methods like range, all) in order to reproduce the behavior. And it also showed that the iterator returns more items than expected if I delete a larger key:
@Test public void shouldSkipAllDeletedFromCache1() { final byte[][] bytes = {{0}, {1}}; for (final byte[] aByte : bytes) { store.put(Bytes.wrap(aByte), aByte); } cache.put(namespace, Bytes.wrap(bytes[1]), new LRUCacheEntry(null)); // simulate key deletion from store that is cached try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = createIterator()) { assertArrayEquals(bytes[0], iterator.next().key.get()); assertFalse(iterator.hasNext()); // org.opentest4j.AssertionFailedError: expected: <false> but was: <true> } }
But if I delete a smaller key the test is successful:
@Test public void shouldSkipAllDeletedFromCache0() { final byte[][] bytes = {{0}, {1}}; for (final byte[] aByte : bytes) { store.put(Bytes.wrap(aByte), aByte); } cache.put(namespace, Bytes.wrap(bytes[0]), new LRUCacheEntry(null)); try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = createIterator()) { assertArrayEquals(bytes[1], iterator.next().key.get()); assertFalse(iterator.hasNext()); } } Could someone help me verify if it is a bug or am I missing something? Thank you.
Attachments
Attachments
Issue Links
- links to