Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9355

RocksDB statistics are removed from JMX when EOS enabled and empty local state dir

Agile BoardAttach filesAttach ScreenshotVotersStop watchingWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 2.4.0
    • 2.5.0
    • metrics, streams
    • None

    Description

      Steps to Reproduce

      Set processing.guarantee = exactly_once and remove local state dir in order to force state restoration from changelog topics that have to be non empty.

      Expected Behavior

      There are registered MBeans like kafka.streams:type=stream-state-metrics,client-id=<application.id>-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1,task-id=0_0,rocksdb-state-id=<state.store> for persistent RocksDB KeyValueStore-s after streams task state restoration.

      Actual Behavior

      There are no registered MBeans like above after streams task state restoration.

      Details

      I managed to inject custom MetricsReporter in order to log metricChange and metricRemoval calls. According to the logs at some point the missing metrics are removed and never restored later. Here is an excerpt for number-open-files metric:

      16:33:40.403 DEBUG c.m.r.LoggingMetricsReporter - metricChange MetricName [name=number-open-files, group=stream-state-metrics, description=Number of currently open files, tags={client-id=morpheus.conversion-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1, task-id=0_0, rocksdb-state-id=buffered-event}]
      16:33:40.403 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics Recorder for buffered-event] Adding metrics recorder of task 0_0 to metrics recording trigger
      16:33:40.403 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics Recorder for buffered-event] Adding statistics for store buffered-event of task 0_0
      16:33:40.610 INFO  o.a.k.s.p.i.StoreChangelogReader - stream-thread [morpheus.conversion-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1] No checkpoint found for task 0_0 state store buffered-event changelog morpheus.conversion-buffered-event-changelog-0 with EOS turned on. Reinitializing the task and restore its state from the beginning.
      16:33:40.610 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics Recorder for buffered-event] Removing statistics for store buffered-event of task 0_0
      16:33:40.610 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics Recorder for buffered-event] Removing metrics recorder for store buffered-event of task 0_0 from metrics recording trigger
      16:33
      16:33:40.611 DEBUG c.m.r.LoggingMetricsReporter - metricRemoval MetricName [name=number-open-files, group=stream-state-metrics, description=Number of currently open files, tags={client-id=morpheus.conversion-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1, task-id=0_0, rocksdb-state-id=buffered-event}]
      16:33:40.625 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics Recorder for buffered-event] Adding metrics recorder of task 0_0 to metrics recording trigger
      16:33:40.625 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics Recorder for buffered-event] Adding statistics for store buffered-event of task 0_0
      ...
      (no more calls to metricChange for the removed number-open-files metric)

      Also a complete log is attached metric-removal.log

      Metric removal happens along this call stack:

      19:27:35.509 DEBUG c.m.r.LoggingMetricsReporter - metricRemoval MetricName [name=number-open-files, group=stream-state-metrics, description=Number of currently open files, tags={client-id=morpheus.conversion-9b76f302-7149-47de-b17b-362d642e05d5-StreamThread-1, task-id=0_0, rocksdb-state-id=buffered-event}]
      java.lang.Exception: null
         at casino.morpheus.reporter.LoggingMetricsReporter.metricRemoval(LoggingMetricsReporter.scala:24)
         at org.apache.kafka.common.metrics.Metrics.removeMetric(Metrics.java:534)
         at org.apache.kafka.common.metrics.Metrics.removeSensor(Metrics.java:448)
         at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.removeAllStoreLevelSensors(StreamsMetricsImpl.java:440)
         at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.close(MeteredKeyValueStore.java:345)
         at org.apache.kafka.streams.processor.internals.StateManagerUtil.reinitializeStateStoresForPartitions(StateManagerUtil.java:93)
         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:190)
         at org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:215)
         at org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:234)
         at org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185)
         at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81)
         at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389)
         at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            cadonna Bruno Cadonna
            savulchik Stanislav Savulchik
            Votes:
            0 Vote for this issue
            Watchers:
            6 Stop watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment