Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6360

RocksDB segments not removed when store is closed causes re-initialization to fail

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.10.2.1, 0.11.0.2, 1.0.1, 1.1.0
    • Fix Version/s: 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.0
    • Component/s: streams
    • Labels:
      None

      Description

      When a store is re-initialized it is first closed, before it is opened again. When this happens the segments in the Segments class are closed, but they are not removed from the list of segments. So when the store is re-initialized the old closed segments are used. This results in:

      [2017-12-13 09:29:32,037] ERROR [streams-saak-test-client-StreamThread-3] task [1_3] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000024:  (org.apache.kafka.streams.processor.internals.ProcessorStateManager)
      org.apache.kafka.streams.errors.InvalidStateStoreException: Store KSTREAM-AGGREGATE-STATE-STORE-0000000024.1513080000000 is currently closed
              at org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:241)
              at org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:289)
              at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:102)
              at org.apache.kafka.streams.state.internals.RocksDBSessionStore.put(RocksDBSessionStore.java:122)
              at org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:78)
              at org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:33)
              at org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179)
              at org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38)
              at org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88)
              at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142)
              at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100)
              at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
              at org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196)
      

        Attachments

          Activity

            People

            • Assignee:
              damianguy Damian Guy
              Reporter:
              damianguy Damian Guy

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment