Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-18326

Cached stores may return deleted values

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Critical
    • Resolution: Unresolved
    • None
    • None
    • streams
    • None

    Description

      Reported in community Slack by Stanislav Savulchik. I've attached a patch fix, and am waiting for the reporter to submit a PR if they want to since they were the first to identify it!

      It affects basically every version of Kafka Streams out there...


      Hi everyone,
      I’m investigating an unexpected behavior of a KeyValueStore.prefixScan  method that sometimes returns previously deleted keys if caching is enabled. Example pseudocode:

       val keyPrefixSerializer: Serializer[Int] = ??? // 4 bytes big endian
      val store: KeyValueStore[(Int, String), String] = ???
      // store contents
      // (1, "A") -> "A"
      // (1, "B") -> "B"
      // using put instead of delete to avoid reading previous value
      store.put((1, "B"), null)
      // reading all key value pairs using key prefix
      val result: List[KeyValue[(Int, String), String]] = 
          store.prefixScan(1, keyPrefixSerializer).asScala.toList
      // expected result 
      // (1, "A") -> "A"
      // actual result 
      // (1, "A") -> "A"
      // (1, "B") -> "B" (was previously deleted, but returned by the iterator)

      I tried to come up with a unit test for MergedSortedCacheKeyValueBytesStoreIterator (returned by KeyValueStore.prefixScan and other methods like range, all) in order to reproduce the behavior. And it also showed that the iterator returns more items than expected if I delete a larger key:

       @Test
          public void shouldSkipAllDeletedFromCache1() {
              final byte[][] bytes = {{0}, {1}};
              for (final byte[] aByte : bytes) {
                  store.put(Bytes.wrap(aByte), aByte);
              }
              cache.put(namespace, Bytes.wrap(bytes[1]), new LRUCacheEntry(null)); // simulate key deletion from store that is cached
              try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = createIterator()) {
                  assertArrayEquals(bytes[0], iterator.next().key.get());
                  assertFalse(iterator.hasNext()); // org.opentest4j.AssertionFailedError: expected: <false> but was: <true>
              }
          }

       
      But if I delete a smaller key the test is successful:

       @Test
          public void shouldSkipAllDeletedFromCache0() {
              final byte[][] bytes = {{0}, {1}};
              for (final byte[] aByte : bytes) {
                  store.put(Bytes.wrap(aByte), aByte);
              }
              cache.put(namespace, Bytes.wrap(bytes[0]), new LRUCacheEntry(null));
              try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = createIterator()) {
                  assertArrayEquals(bytes[1], iterator.next().key.get());
                  assertFalse(iterator.hasNext());
              }
          }    
      Could someone help me verify if it is a bug or am I missing something?
      Thank you.

      Attachments

        1. range-scan-fix.patch
          3 kB
          Almog Gavra

        Issue Links

          Activity

            People

              agavra Almog Gavra
              agavra Almog Gavra
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated: