Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-12103

Extend builtin operator metrics with user defined scope and variables

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Minor
    • Resolution: Abandoned
    • 1.7.2
    • None
    • Runtime / Metrics
    • None

    Description

      I'm looking for a way to extend the builtin throughput and latency metrics for operators with my own metric variables.

      My specific use case:

      I have a job that defines a list of independent source -> sink streams. I would like to add my own metric variables to each of these independent streams. For example, something like this:

      class MyFilter extends RichFilterFunction {
        override def open(parameters: Configuration): Unit = {
          val mg = getRuntimeContext.getMetricGroup // Includes "streamName" -> "A|B"
      
          // Init some user defined metrics here...
        }
      }
      
      // Stream A
      // Native operator metrics and user defined metrics in rich operators include "streamName" -> "A"
      streamA = env.withMetricGroup((mg) => mg.addGroup("streamName", "A").addSource(...).filter(new MyFilter).addSink(...)
      
      // Stream B
      // Native operator metrics and user defined metrics in rich operators include "streamName" -> "B"
      
      streamB = env.withMetricGroup((mg) => mg.addGroup("streamName", "B").addSource(...).filter(new MyFilter).addSink(...)
      

       
      Is this possible? Would a new hook into StreamTransformation have to be added?

      Attachments

        Activity

          People

            Unassigned Unassigned
            benmarini Ben Marini
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: