Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6360

RocksDB segments not removed when store is closed causes re-initialization to fail

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.10.2.1, 0.11.0.2, 1.0.1, 1.1.0
    • Fix Version/s: 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.0
    • Component/s: streams
    • Labels:
      None

      Description

      When a store is re-initialized it is first closed, before it is opened again. When this happens the segments in the Segments class are closed, but they are not removed from the list of segments. So when the store is re-initialized the old closed segments are used. This results in:

      [2017-12-13 09:29:32,037] ERROR [streams-saak-test-client-StreamThread-3] task [1_3] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000024:  (org.apache.kafka.streams.processor.internals.ProcessorStateManager)
      org.apache.kafka.streams.errors.InvalidStateStoreException: Store KSTREAM-AGGREGATE-STATE-STORE-0000000024.1513080000000 is currently closed
              at org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:241)
              at org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:289)
              at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:102)
              at org.apache.kafka.streams.state.internals.RocksDBSessionStore.put(RocksDBSessionStore.java:122)
              at org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:78)
              at org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:33)
              at org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179)
              at org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38)
              at org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88)
              at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142)
              at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100)
              at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
              at org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196)
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                damianguy Damian Guy
                Reporter:
                damianguy Damian Guy
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: