diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java index cff1f6c..2b3036b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java @@ -77,4 +77,12 @@ public interface ReadOnlyWindowStore { * @throws NullPointerException If null is used for any key. */ KeyValueIterator, V> fetch(K from, K to, long timeFrom, long timeTo); + + /** + * Gets all the key-value pairs in the existing windows. + * + * @returns an iterator over windowed key-value pairs {@code , value>} + * @throws InvalidStateStoreException if the store is not initialized + */ + KeyValueIterator, V> all(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 19e6e09..ea72c82 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.CacheFlushListener; import org.apache.kafka.streams.processor.ProcessorContext; +import java.util.NavigableSet; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; @@ -217,4 +218,33 @@ class CachingWindowStore extends WrappedStateStore.AbstractStateStore impl } } + @Override + public KeyValueIterator, byte[]> all() { + validateStoreOpen(); + + final KeyValueIterator, byte[]> underlyingIterator = underlying.all(); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(name); + + final NavigableSet allBytes = cache.setOfAllKeys(name); + if (allBytes == null) return null; + + final Bytes firstKey = allBytes.first(); + final Bytes lastKey = allBytes.last(); + final Bytes keyFrom = WindowStoreUtils.bytesKeyFromBinaryKey(firstKey.get()); + final Bytes keyTo = WindowStoreUtils.bytesKeyFromBinaryKey(lastKey.get()); + final long firstTimestamp = WindowStoreUtils.timestampFromBinaryKey(firstKey.get()); + final long lastTimestamp = WindowStoreUtils.timestampFromBinaryKey(lastKey.get()); + + final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyFrom, keyTo, firstTimestamp, lastTimestamp); + final PeekingKeyValueIterator filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction); + + return new MergedSortedCacheWindowStoreKeyValueIterator( + filteredCacheIterator, + underlyingIterator, + bytesSerdes, + windowSize, + cacheFunction + ); + } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java index 0035019..e3bf9f2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java @@ -56,7 +56,11 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore return bytesStore.fetch(keyFrom, keyTo, from, to); } - + @Override + public KeyValueIterator, byte[]> all() { + return bytesStore.all(); + } + @Override public void put(final Bytes key, final byte[] value) { put(key, value, context.timestamp()); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java index 5acb6b4..fb96888 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java @@ -80,4 +80,24 @@ public class CompositeReadOnlyWindowStore implements ReadOnlyWindowStore, V> all() { + final List> stores = provider.stores(storeName, windowStoreType); + for (ReadOnlyWindowStore windowStore : stores) { + try { + final KeyValueIterator, V> result = windowStore.all(); + if (!result.hasNext()) { + result.close(); + } else { + return result; + } + } catch (InvalidStateStoreException e) { + throw new InvalidStateStoreException( + "State store is not available anymore and may have been migrated to another instance; " + + "please re-discover its location from the state metadata."); + } + } + return KeyValueIterators.emptyIterator(); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 20c7c43..7817e82 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -113,6 +113,11 @@ public class MeteredWindowStore extends WrappedStateStore.AbstractStateSto } @Override + public KeyValueIterator, V> all() { + return new MeteredWindowedKeyValueIterator<>(inner.all(), fetchTime, metrics, serdes, time); + } + + @Override public KeyValueIterator, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) { return new MeteredWindowedKeyValueIterator<>(inner.fetch(keyBytes(from), keyBytes(to), timeFrom, timeTo), fetchTime, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java index 47d5152..8601e76 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java @@ -32,6 +32,7 @@ import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; @@ -278,6 +279,10 @@ class NamedCache { synchronized Iterator allKeys() { return keySetIterator(cache.navigableKeySet()); } + + synchronized NavigableSet setOfAllKeys() { + return cache.navigableKeySet(); + } synchronized LRUCacheEntry first() { if (head == null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java index 4d4ee41..31047ca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java @@ -65,6 +65,25 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore { keySchema.hasNextCondition(keyFrom, keyTo, from, to), binaryFrom, binaryTo); } + + @Override + public KeyValueIterator all() { + final List searchSpace = keySchema.allSegments(segments); + + final Segment minSegment = keySchema.minSegment(segments); + final Segment maxSegment = keySchema.maxSegment(segments); + final Bytes minKey = minSegment.first().key; + final Bytes maxKey = maxSegment.last().key; + + final long minTimestamp = keySchema.segmentTimestamp(minKey); + final long maxTimestamp = keySchema.segmentTimestamp(maxKey); + final Bytes keyFrom = WindowStoreUtils.bytesKeyFromBinaryKey(minKey.get()); + final Bytes keyTo = WindowStoreUtils.bytesKeyFromBinaryKey(maxKey.get()); + + return new SegmentIterator(searchSpace.iterator(), + keySchema.hasNextCondition(keyFrom, keyTo, minTimestamp, maxTimestamp), + minKey, maxKey); + } @Override public void remove(final Bytes key) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index ea01694..4c7e61b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -387,6 +387,28 @@ public class RocksDBStore implements KeyValueStore { return rocksDbIterator; } + public synchronized KeyValue first() { + validateStoreOpen(); + + RocksIterator innerIter = db.newIterator(); + innerIter.seekToFirst(); + KeyValue pair = new KeyValue<>(serdes.keyFrom(innerIter.key()), serdes.valueFrom(innerIter.value())); + innerIter.close(); + + return pair; + } + + public synchronized KeyValue last() { + validateStoreOpen(); + + RocksIterator innerIter = db.newIterator(); + innerIter.seekToLast(); + KeyValue pair = new KeyValue<>(serdes.keyFrom(innerIter.key()), serdes.valueFrom(innerIter.value())); + innerIter.close(); + + return pair; + } + /** * Return an approximate count of key-value mappings in this store. * diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index b7dd532..d7f4dd5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -118,6 +118,12 @@ public class RocksDBWindowStore extends WrappedStateStore.AbstractStateSto final KeyValueIterator bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)), timeFrom, timeTo); return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator(); } + + @Override + public KeyValueIterator, V> all() { + final KeyValueIterator bytesIterator = bytesStore.all(); + return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator(); + } void maybeUpdateSeqnumForDups() { if (retainDuplicates) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java index 72ae6e2..55ec603 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java @@ -49,6 +49,8 @@ public interface SegmentedBytesStore extends StateStore { * @return an iterator over key-value pairs */ KeyValueIterator fetch(final Bytes keyFrom, final Bytes keyTo, final long from, final long to); + + KeyValueIterator all(); /** * Remove the record with the provided key. The key @@ -159,5 +161,11 @@ public interface SegmentedBytesStore extends StateStore { * @return List of segments to search */ List segmentsToSearch(Segments segments, long from, long to); + + List allSegments(Segments segments); + + Segment minSegment(Segments segments); + + Segment maxSegment(Segments segments); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java index 7c6bb53..fffe710 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java @@ -61,6 +61,14 @@ class Segments { this.formatter = new SimpleDateFormat("yyyyMMddHHmm"); this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC")); } + + Segment minSegment() { + return getSegment(minSegmentId); + } + + Segment maxSegment() { + return getSegment(maxSegmentId); + } long segmentId(final long timestamp) { return timestamp / segmentInterval; @@ -142,6 +150,21 @@ class Segments { return segments; } + List allSegments() { + final List segments = new ArrayList<>(); + for (long segmentId = minSegmentId; segmentId <= maxSegmentId; segmentId++) { + Segment segment = getSegment(segmentId); + if (segment != null && segment.isOpen()) { + try { + segments.add(segment); + } catch (InvalidStateStoreException ise) { + // segment may have been closed by streams thread; + } + } + } + return segments; + } + void flush() { for (Segment segment : segments.values()) { segment.flush(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java index 6d6d9bf..6facea7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java @@ -97,4 +97,19 @@ class SessionKeySchema implements SegmentedBytesStore.KeySchema { return segments.segments(from, Long.MAX_VALUE); } + @Override + public List allSegments(Segments segments) { + return segments.allSegments(); + } + + @Override + public Segment minSegment(Segments segments) { + return null; + } + + @Override + public Segment maxSegment(Segments segments) { + return null; + } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java index 01a4bef..66dab93 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.NoSuchElementException; /** @@ -196,7 +197,12 @@ public class ThreadCache { return new MemoryLRUCacheBytesIterator(cache.allKeys(), cache); } - + public NavigableSet setOfAllKeys(final String namespace) { + final NamedCache cache = getCache(namespace); + if (cache == null) return null; + return cache.setOfAllKeys(); + } + public long size() { long size = 0; for (NamedCache cache : caches.values()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java index 214f36b..4a99b3d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java @@ -92,4 +92,20 @@ class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { public List segmentsToSearch(final Segments segments, final long from, final long to) { return segments.segments(from, to); } + + @Override + public List allSegments(final Segments segments) { + return segments.allSegments(); + } + + @Override + public Segment minSegment(final Segments segments) { + return segments.minSegment(); + } + + @Override + public Segment maxSegment(final Segments segments) { + return segments.maxSegment(); + } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java index 3ad6475..51a5728 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java @@ -61,4 +61,9 @@ public class NoOpWindowStore implements ReadOnlyWindowStore, StateStore { public WindowStoreIterator fetch(Object from, Object to, long timeFrom, long timeTo) { return null; } + + @Override + public WindowStoreIterator all() { + return null; + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index f1a0038..a7877a9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -129,6 +129,26 @@ public class CachingWindowStoreTest { assertFalse(iterator.hasNext()); assertEquals(2, cache.size()); } + + @Test + public void shouldPutAllFromCache() { + cachingStore.put(bytesKey("a"), bytesValue("a")); + cachingStore.put(bytesKey("b"), bytesValue("b")); + cachingStore.put(bytesKey("c"), bytesValue("c")); + cachingStore.put(bytesKey("d"), bytesValue("d")); + cachingStore.put(bytesKey("e"), bytesValue("e")); + cachingStore.put(bytesKey("f"), bytesValue("f")); + cachingStore.put(bytesKey("g"), bytesValue("g")); + cachingStore.put(bytesKey("h"), bytesValue("h")); + + final KeyValueIterator, byte[]> iterator = cachingStore.all(); + String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"}; + for (String s : array) { + verifyWindowedKeyValue(iterator.next(), new Windowed<>(bytesKey(s), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), s); + } + assertFalse(iterator.hasNext()); + assertEquals(8, cache.size()); + } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java index 6974240..e3bb90e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java @@ -47,6 +47,7 @@ public class ReadOnlyWindowStoreStub implements ReadOnlyWindowStore, public ReadOnlyWindowStoreStub(long windowSize) { this.windowSize = windowSize; } + @Override public WindowStoreIterator fetch(final K key, final long timeFrom, final long timeTo) { @@ -64,6 +65,51 @@ public class ReadOnlyWindowStoreStub implements ReadOnlyWindowStore, } @Override + public KeyValueIterator, V> all() { + if (!open) { + throw new InvalidStateStoreException("Store is not open"); + } + final List, V>> results = new ArrayList<>(); + for (long now : data.keySet()) { + final NavigableMap kvMap = data.get(now); + if (kvMap != null) { + for (Entry entry : kvMap.entrySet()) { + results.add(new KeyValue<>(new Windowed<>(entry.getKey(), new TimeWindow(now, now + windowSize)), entry.getValue())); + } + } + } + final Iterator, V>> iterator = results.iterator(); + + return new KeyValueIterator, V>() { + @Override + public void close() { + + } + + @Override + public Windowed peekNextKey() { + throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName()); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public KeyValue, V> next() { + return iterator.next(); + } + + + @Override + public void remove() { + throw new UnsupportedOperationException("remove() not supported in " + getClass().getName()); + } + }; + } + + @Override public KeyValueIterator, V> fetch(K from, K to, long timeFrom, long timeTo) { if (!open) { throw new InvalidStateStoreException("Store is not open"); diff --git a/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java b/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java index 7f8457c..43cffd2 100644 --- a/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java @@ -58,6 +58,12 @@ public class SegmentedBytesStoreStub implements SegmentedBytesStore { fetchCalled = true; return new KeyValueIteratorStub<>(Collections.>emptyIterator()); } + + @Override + public KeyValueIterator all() { + fetchCalled = true; + return new KeyValueIteratorStub<>(Collections.>emptyIterator()); + } @Override public void remove(final Bytes key) {