Description
When using KStreamSessionWindowAggregate by calling KGroupedStream#aggregate() a CachingSessionStore is created.
This causes the following chain of method calls when a new record that requires removing others from the store appear:
KStreamSessionWindowAggregate
CachingSessionStore.remove(Windowed<K>)
CachingSessionStore.put(Windowed<K>, V)
ThreadCache.put(String, Bytes containing Windowed<K> info, LRUCacheEntry)
ThreadCache.maybeEvict(String)
NamedCache.evict()
NamedCache.flush(LRUNode containing Bytes and LRUCacheEntry from ThreadCache#put)
DirtyEntryFlushListener defined in CachingSessionStore line 80 .apply(ThreadCache.DirtyEntry containing Bytes and LRUCacheEntry from ThreadCache#put)
CachingSessionStore.putAndMaybeForward(ThreadCache.DirtyEntry containing Bytes and LRUCacheEntry from ThreadCache#put, InternalProcessorContext)
CachingSessionStore.fetchPrevious(Bytes containing Windowed<K> info)
RocksDBSessionStore.fetch(Bytes containing Windowed<K> info)
RocksDBSessionStore.findSessions on line 48 (Bytes containing Windowed<K> info, 0, Long.MAX_VALUE)
MeteredSegmentedByteStore.fetch(Bytes containing Windowed<K> info, 0, Long.MAX_VALUE)
ChangeLoggingSegmentedByteStore.fetch(Bytes containing Windowed<K> info, 0, Long.MAX_VALUE)
RocksDBSegmentedBytesStore.fetch(Bytes containing Windowed<K> info, 0, Long.MAX_VALUE)
SessionKeySchema.lower/upperRange(Bytes containing Windowed<K> info, Long)
-
-
-
-
-
- in this method the already Windowed<K> gets Windowed again *****
-
-
-
-
The point of showing all this is to point out that the windowed gets windowed and because it passes the 0, Long.MAX_VALUE it searches for a strange key and searches all times for it. I think the fetchPrevious method of CachingSessionStore should be changed to call the byteStores.findSessions(Bytes.wrap(serdes.rawKey(key.key())), key.window().start(), key.window().end()).
Attachments
Issue Links
- links to