diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 31ca68b..0771794 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -51,6 +51,10 @@ class CachingWindowStore extends WrappedStateStore.AbstractStateStore impl private CacheFlushListener, V> flushListener; private boolean sendOldValues; private final SegmentedCacheFunction cacheFunction; + private long minTimestamp = Long.MAX_VALUE; + private long maxTimestamp = 0; + private Bytes keyWithMinTimestamp; + private Bytes keyWithMaxTimestamp; CachingWindowStore(final WindowStore underlying, final Serde keySerde, @@ -151,6 +155,15 @@ class CachingWindowStore extends WrappedStateStore.AbstractStateStore impl // if store is open outside as well. validateStoreOpen(); + if(timestamp < minTimestamp) { + minTimestamp = timestamp; + keyWithMinTimestamp = key; + } + if(timestamp > maxTimestamp) { + maxTimestamp = timestamp; + keyWithMaxTimestamp = key; + } + final Bytes keyBytes = WindowStoreUtils.toBinaryKey(key, timestamp, 0, bytesSerdes); final LRUCacheEntry entry = new LRUCacheEntry(value, true, context.offset(), timestamp, context.partition(), context.topic()); @@ -207,6 +220,54 @@ class CachingWindowStore extends WrappedStateStore.AbstractStateStore impl ); } + public KeyValueIterator, byte[]> all() { + validateStoreOpen(); + + final KeyValueIterator, byte[]> underlyingIterator = underlying.fetch(keyWithMinTimestamp, keyWithMaxTimestamp, minTimestamp, maxTimestamp); + + final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyWithMinTimestamp, minTimestamp)); + final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyWithMaxTimestamp, maxTimestamp)); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom, cacheKeyTo); + + final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyWithMinTimestamp, + keyWithMaxTimestamp, + minTimestamp, + maxTimestamp); + final PeekingKeyValueIterator filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction); + + return new MergedSortedCacheWindowStoreKeyValueIterator( + filteredCacheIterator, + underlyingIterator, + bytesSerdes, + windowSize, + cacheFunction + ); + } + + public KeyValueIterator, byte[]> fetchAll(long timeFrom, long timeTo) { + validateStoreOpen(); + + final KeyValueIterator, byte[]> underlyingIterator = underlying.fetch(keyWithMinTimestamp, keyWithMaxTimestamp, timeFrom, timeTo); + + final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyWithMinTimestamp, timeFrom)); + final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyWithMaxTimestamp, timeTo)); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom, cacheKeyTo); + + final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyWithMinTimestamp, + keyWithMaxTimestamp, + timeFrom, + timeTo); + final PeekingKeyValueIterator filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction); + + return new MergedSortedCacheWindowStoreKeyValueIterator( + filteredCacheIterator, + underlyingIterator, + bytesSerdes, + windowSize, + cacheFunction + ); + } + private V fetchPrevious(final Bytes key, final long timestamp) { try (final WindowStoreIterator iter = underlying.fetch(key, timestamp, timestamp)) { if (!iter.hasNext()) {