Description
Steps to Reproduce
Set processing.guarantee = exactly_once and remove local state dir in order to force state restoration from changelog topics that have to be non empty.
Expected Behavior
There are registered MBeans like kafka.streams:type=stream-state-metrics,client-id=<application.id>-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1,task-id=0_0,rocksdb-state-id=<state.store> for persistent RocksDB KeyValueStore-s after streams task state restoration.
Actual Behavior
There are no registered MBeans like above after streams task state restoration.
Details
I managed to inject custom MetricsReporter in order to log metricChange and metricRemoval calls. According to the logs at some point the missing metrics are removed and never restored later. Here is an excerpt for number-open-files metric:
16:33:40.403 DEBUG c.m.r.LoggingMetricsReporter - metricChange MetricName [name=number-open-files, group=stream-state-metrics, description=Number of currently open files, tags={client-id=morpheus.conversion-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1, task-id=0_0, rocksdb-state-id=buffered-event}] 16:33:40.403 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics Recorder for buffered-event] Adding metrics recorder of task 0_0 to metrics recording trigger 16:33:40.403 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics Recorder for buffered-event] Adding statistics for store buffered-event of task 0_0 16:33:40.610 INFO o.a.k.s.p.i.StoreChangelogReader - stream-thread [morpheus.conversion-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1] No checkpoint found for task 0_0 state store buffered-event changelog morpheus.conversion-buffered-event-changelog-0 with EOS turned on. Reinitializing the task and restore its state from the beginning. 16:33:40.610 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics Recorder for buffered-event] Removing statistics for store buffered-event of task 0_0 16:33:40.610 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics Recorder for buffered-event] Removing metrics recorder for store buffered-event of task 0_0 from metrics recording trigger 16:33 16:33:40.611 DEBUG c.m.r.LoggingMetricsReporter - metricRemoval MetricName [name=number-open-files, group=stream-state-metrics, description=Number of currently open files, tags={client-id=morpheus.conversion-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1, task-id=0_0, rocksdb-state-id=buffered-event}] 16:33:40.625 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics Recorder for buffered-event] Adding metrics recorder of task 0_0 to metrics recording trigger 16:33:40.625 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics Recorder for buffered-event] Adding statistics for store buffered-event of task 0_0 ... (no more calls to metricChange for the removed number-open-files metric)
Also a complete log is attached metric-removal.log
Metric removal happens along this call stack:
19:27:35.509 DEBUG c.m.r.LoggingMetricsReporter - metricRemoval MetricName [name=number-open-files, group=stream-state-metrics, description=Number of currently open files, tags={client-id=morpheus.conversion-9b76f302-7149-47de-b17b-362d642e05d5-StreamThread-1, task-id=0_0, rocksdb-state-id=buffered-event}] java.lang.Exception: null at casino.morpheus.reporter.LoggingMetricsReporter.metricRemoval(LoggingMetricsReporter.scala:24) at org.apache.kafka.common.metrics.Metrics.removeMetric(Metrics.java:534) at org.apache.kafka.common.metrics.Metrics.removeSensor(Metrics.java:448) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.removeAllStoreLevelSensors(StreamsMetricsImpl.java:440) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.close(MeteredKeyValueStore.java:345) at org.apache.kafka.streams.processor.internals.StateManagerUtil.reinitializeStateStoresForPartitions(StateManagerUtil.java:93) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:190) at org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:215) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:234) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81) at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
Attachments
Attachments
Issue Links
- relates to
-
KAFKA-6498 Add RocksDB statistics via Streams metrics
- Resolved
- links to