Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-5172

CachingSessionStore doesn't fetchPrevious correctly.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 0.11.0.0
    • streams
    • None

    Description

      When using KStreamSessionWindowAggregate by calling KGroupedStream#aggregate() a CachingSessionStore is created.
      This causes the following chain of method calls when a new record that requires removing others from the store appear:

      KStreamSessionWindowAggregate
      CachingSessionStore.remove(Windowed<K>)
      CachingSessionStore.put(Windowed<K>, V)
      ThreadCache.put(String, Bytes containing Windowed<K> info, LRUCacheEntry)
      ThreadCache.maybeEvict(String)
      NamedCache.evict()
      NamedCache.flush(LRUNode containing Bytes and LRUCacheEntry from ThreadCache#put)
      DirtyEntryFlushListener defined in CachingSessionStore line 80 .apply(ThreadCache.DirtyEntry containing Bytes and LRUCacheEntry from ThreadCache#put)
      CachingSessionStore.putAndMaybeForward(ThreadCache.DirtyEntry containing Bytes and LRUCacheEntry from ThreadCache#put, InternalProcessorContext)
      CachingSessionStore.fetchPrevious(Bytes containing Windowed<K> info)
      RocksDBSessionStore.fetch(Bytes containing Windowed<K> info)
      RocksDBSessionStore.findSessions on line 48 (Bytes containing Windowed<K> info, 0, Long.MAX_VALUE)
      MeteredSegmentedByteStore.fetch(Bytes containing Windowed<K> info, 0, Long.MAX_VALUE)
      ChangeLoggingSegmentedByteStore.fetch(Bytes containing Windowed<K> info, 0, Long.MAX_VALUE)
      RocksDBSegmentedBytesStore.fetch(Bytes containing Windowed<K> info, 0, Long.MAX_VALUE)
      SessionKeySchema.lower/upperRange(Bytes containing Windowed<K> info, Long)

                • in this method the already Windowed<K> gets Windowed again *****

      The point of showing all this is to point out that the windowed gets windowed and because it passes the 0, Long.MAX_VALUE it searches for a strange key and searches all times for it. I think the fetchPrevious method of CachingSessionStore should be changed to call the byteStores.findSessions(Bytes.wrap(serdes.rawKey(key.key())), key.window().start(), key.window().end()).

      Attachments

        Issue Links

          Activity

            People

              winkelman.kyle Kyle Winkelman
              winkelman.kyle Kyle Winkelman
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: