Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
2.2.0, 2.3.0
-
None
-
Spark 2.2
MetricsReporter.scala
Description
The wrong metric is being sent in MetricsReporter.scala
The current implementation for processingRate-total is assigned the wrong metric:
Look at the first and second registerGauge. The second one mistakenly uses inputRowsPerSecond instead of processedRowsPerSecond.
class MetricsReporter( stream: StreamExecution, override val sourceName: String) extends CodahaleSource with Logging { override val metricRegistry: MetricRegistry = new MetricRegistry // Metric names should not have . in them, so that all the metrics of a query are identified // together in Ganglia as a single metric group registerGauge("inputRate-total", () => stream.lastProgress.inputRowsPerSecond) registerGauge("processingRate-total", () => stream.lastProgress.inputRowsPerSecond) registerGauge("latency", () => stream.lastProgress.durationMs.get("triggerExecution").longValue()) private def registerGauge[T](name: String, f: () => T)(implicit num: Numeric[T]): Unit = { synchronized { metricRegistry.register(name, new Gauge[T] { override def getValue: T = f() }) } } }
After adjusting the line and rebuilding from source I tested the change by checking the csv files produced via the metrics properties file. Previously inputRate-total and processingRate-total were identical due to the same metric being used. After the change the processingRate-total file held the right value.
Please check the attached file "Processed Rows Per Second".
After altering the code the correct values are displayed in column B.
They match the data from the INFO StreamExecution displayed during streaming