Details
Description
When a store is re-initialized it is first closed, before it is opened again. When this happens the segments in the Segments class are closed, but they are not removed from the list of segments. So when the store is re-initialized the old closed segments are used. This results in:
[2017-12-13 09:29:32,037] ERROR [streams-saak-test-client-StreamThread-3] task [1_3] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000024: (org.apache.kafka.streams.processor.internals.ProcessorStateManager) org.apache.kafka.streams.errors.InvalidStateStoreException: Store KSTREAM-AGGREGATE-STATE-STORE-0000000024.1513080000000 is currently closed at org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:241) at org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:289) at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:102) at org.apache.kafka.streams.state.internals.RocksDBSessionStore.put(RocksDBSessionStore.java:122) at org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:78) at org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:33) at org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179) at org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38) at org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100) at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127) at org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196)
Attachments
Issue Links
- links to