Description
@Override
public void flush() {
lock.writeLock().lock();
try
finally
{ lock.writeLock().unlock(); }}
@Override
public void close() {
flush();
underlying.close();
cache.close(cacheName);
An exception leading to this, notice that another store is already closed
and therefore not available:
2018-10-04 12:18:44,961 ERROR
[org.apache.kafka.streams.processor.internals.ProcessorStateManager]
(...-StreamThread-8) - task [8_11] Failed to close state store
...-STATE-STORE-0000000038: :
org.apache.kafka.streams.errors.InvalidStateStoreException: Store
KSTREAM-REDUCE-STATE-STORE-0000000025 is currently closed.
at
org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.validateStoreOpen(WrappedStateStore.java:70)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:150)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
at
org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:186)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112)
at
org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:124)
at
org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.get(KTableFilter.java:132)
at
org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:89)
at
org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:58)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at
org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:40)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:132)
at
org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.close(WrappedStateStore.java:89)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:269)
at
org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:245)
at
org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:546)
at
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:624)
at
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:410)
at
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
at
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1172)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
If the store is not closed we have witnessed that the lock is RocksDB is
not removed properly which can lead to
2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) - Caused
by: org.rocksdb.RocksDBException: While lock file:
...-STATE-STORE-0000000038/LOCK: No locks available
2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) - at
org.rocksdb.RocksDB.open(Native Method)
Attachments
Issue Links
- links to