Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6360

RocksDB segments not removed when store is closed causes re-initialization to fail

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 0.10.2.1, 0.11.0.2, 1.0.1, 1.1.0
    • 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.0
    • streams
    • None

    Description

      When a store is re-initialized it is first closed, before it is opened again. When this happens the segments in the Segments class are closed, but they are not removed from the list of segments. So when the store is re-initialized the old closed segments are used. This results in:

      [2017-12-13 09:29:32,037] ERROR [streams-saak-test-client-StreamThread-3] task [1_3] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000024:  (org.apache.kafka.streams.processor.internals.ProcessorStateManager)
      org.apache.kafka.streams.errors.InvalidStateStoreException: Store KSTREAM-AGGREGATE-STATE-STORE-0000000024.1513080000000 is currently closed
              at org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:241)
              at org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:289)
              at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:102)
              at org.apache.kafka.streams.state.internals.RocksDBSessionStore.put(RocksDBSessionStore.java:122)
              at org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:78)
              at org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:33)
              at org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179)
              at org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38)
              at org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88)
              at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142)
              at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100)
              at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
              at org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196)
      

      Attachments

        Issue Links

          Activity

            People

              damianguy Damian Guy
              damianguy Damian Guy
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: