Details
-
Improvement
-
Status: Closed
-
Minor
-
Resolution: Abandoned
-
1.7.2
-
None
-
None
Description
I'm looking for a way to extend the builtin throughput and latency metrics for operators with my own metric variables.
My specific use case:
I have a job that defines a list of independent source -> sink streams. I would like to add my own metric variables to each of these independent streams. For example, something like this:
class MyFilter extends RichFilterFunction { override def open(parameters: Configuration): Unit = { val mg = getRuntimeContext.getMetricGroup // Includes "streamName" -> "A|B" // Init some user defined metrics here... } } // Stream A // Native operator metrics and user defined metrics in rich operators include "streamName" -> "A" streamA = env.withMetricGroup((mg) => mg.addGroup("streamName", "A").addSource(...).filter(new MyFilter).addSink(...) // Stream B // Native operator metrics and user defined metrics in rich operators include "streamName" -> "B" streamB = env.withMetricGroup((mg) => mg.addGroup("streamName", "B").addSource(...).filter(new MyFilter).addSink(...)
Is this possible? Would a new hook into StreamTransformation have to be added?