Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-12103

Extend builtin operator metrics with user defined scope and variables

    XMLWordPrintableJSON

    Details

    • Type: Wish
    • Status: Closed
    • Priority: Minor
    • Resolution: Abandoned
    • Affects Version/s: 1.7.2
    • Fix Version/s: None
    • Component/s: Runtime / Metrics
    • Labels:
      None

      Description

      I'm looking for a way to extend the builtin throughput and latency metrics for operators with my own metric variables.

      My specific use case:

      I have a job that defines a list of independent source -> sink streams. I would like to add my own metric variables to each of these independent streams. For example, something like this:

      class MyFilter extends RichFilterFunction {
        override def open(parameters: Configuration): Unit = {
          val mg = getRuntimeContext.getMetricGroup // Includes "streamName" -> "A|B"
      
          // Init some user defined metrics here...
        }
      }
      
      // Stream A
      // Native operator metrics and user defined metrics in rich operators include "streamName" -> "A"
      streamA = env.withMetricGroup((mg) => mg.addGroup("streamName", "A").addSource(...).filter(new MyFilter).addSink(...)
      
      // Stream B
      // Native operator metrics and user defined metrics in rich operators include "streamName" -> "B"
      
      streamB = env.withMetricGroup((mg) => mg.addGroup("streamName", "B").addSource(...).filter(new MyFilter).addSink(...)
      

       
      Is this possible? Would a new hook into StreamTransformation have to be added?

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              benmarini Ben Marini
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: