Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9743

StreamTask could fail to close during HandleNewAssignment

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.6.0
    • 2.6.0
    • None
    • None

    Description

      We found this particular bug from happening in soak:

      [2020-03-20T16:12:02-07:00] (streams-soak-trunk-eos_soak_i-026133a325ea91147_streamslog) [2020-03-20 23:12:01,534] ERROR [stream-soak-test-7ece4c7d-f528-4c92-93e2-9b32f1f722b1-StreamThread-2] stream-thread [stream-soak-test-7ece4c7d-f528-4c92-93e2-9b32f1f722b1-StreamThread-2] Encountered the following exception during processing and the thread is going to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)

      [2020-03-20T16:12:02-07:00] (streams-soak-trunk-eos_soak_i-026133a325ea91147_streamslog) java.lang.IllegalStateException: RocksDB metrics recorder for store "KSTREAM-AGGREGATE-STATE-STORE-0000000040" of task 2_2 has already been added. This is a bug in Kafka Streams.

              at org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger.addMetricsRecorder(RocksDBMetricsRecordingTrigger.java:30)

              at org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder.addStatistics(RocksDBMetricsRecorder.java:98)

              at org.apache.kafka.streams.state.internals.RocksDBStore.maybeSetUpMetricsRecorder(RocksDBStore.java:207)

              at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:193)

              at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:231)

              at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)

              at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:44)

              at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)

              at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:58)

              at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)

              at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:101)

              at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:801)

              at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:101)

              at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:81)

              at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:191)

              at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:329)

              at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:587)

              at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:501)

              at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:475)

       

      Which could bring the entire instance down. The bug was that if we fail to do the commit during task close section, the actual `closeClean` call could not be triggered.

      Attachments

        Issue Links

          Activity

            People

              bchen225242 Boyang Chen
              bchen225242 Boyang Chen
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: