Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7534

Error during CachingKeyValueStore.flush may not allow RocksDB to close

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.0.3, 1.1.2, 2.1.0, 2.0.2
    • Component/s: streams
    • Labels:
      None

      Description

      @Override
      public void flush() {
          lock.writeLock().lock();
          try

      {         cache.flush(cacheName);         underlying.flush();     }

      finally

      {         lock.writeLock().unlock();     }

      }

      @Override
      public void close() {
          flush();
          underlying.close();
          cache.close(cacheName);

      An exception leading to this, notice that another store is already closed
      and therefore not available:
      2018-10-04 12:18:44,961 ERROR
      [org.apache.kafka.streams.processor.internals.ProcessorStateManager]
      (...-StreamThread-8) - task [8_11] Failed to close state store
      ...-STATE-STORE-0000000038: :
      org.apache.kafka.streams.errors.InvalidStateStoreException: Store
      KSTREAM-REDUCE-STATE-STORE-0000000025 is currently closed.
      at
      org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.validateStoreOpen(WrappedStateStore.java:70)
      at
      org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:150)
      at
      org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
      at
      org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:186)
      at
      org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112)
      at
      org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:124)
      at
      org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.get(KTableFilter.java:132)
      at
      org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:89)
      at
      org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:58)
      at
      org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
      at
      org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
      at
      org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
      at
      org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
      at
      org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
      at
      org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
      at
      org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:40)
      at
      org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
      at
      org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
      at
      org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
      at
      org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
      at
      org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
      at
      org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
      at
      org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
      at
      org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:132)
      at
      org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.close(WrappedStateStore.java:89)
      at
      org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:269)
      at
      org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:245)
      at
      org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:546)
      at
      org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:624)
      at
      org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:410)
      at
      org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
      at
      org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1172)
      at
      org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)

      If the store is not closed we have witnessed that the lock is RocksDB is
      not removed properly which can lead to

      2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) -   Caused
      by: org.rocksdb.RocksDBException: While lock file:
      ...-STATE-STORE-0000000038/LOCK: No locks available
      2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) -   at
      org.rocksdb.RocksDB.open(Native Method)

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                bbejeck Bill Bejeck
                Reporter:
                bbejeck Bill Bejeck
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: