Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4544

TaskManager metrics are vulnerable to custom JMX bean installation

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.1.2
    • Fix Version/s: 1.2.0
    • Component/s: Metrics
    • Labels:
      None

      Description

      The TaskManager's CPU load magic may fail when JMX providers are overwritten.

      The TaskManager logic checks if the class com.sun.management.OperatingSystemMXBean is available. If yes, it assumes that the ManagementFactory.getOperatingSystemMXBean() is of that type. That is not necessarily the case.

      This is visible in the Cassandra tests, as Cassandra overrides the JMX provider - every heartbeat causes an exception that is logged (See below), flooding the log, killing the heartbeat message.

      I would also suggest to move the entire metrics code out of the TaskManager class into a dedicated class TaskManagerJvmMetrics. That one can, with a static method, install the metrics into the TaskManager's metric group.

      Sample stack trace when default platform beans are overridden:

      23914 [flink-akka.actor.default-dispatcher-3] WARN  org.apache.flink.runtime.taskmanager.TaskManager  - Error retrieving CPU Load through OperatingSystemMXBean
      java.lang.IllegalArgumentException: object is not an instance of declaring class
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:497)
      	at org.apache.flink.runtime.taskmanager.TaskManager$$anon$3$$anonfun$getValue$2.apply(TaskManager.scala:2351)
      	at org.apache.flink.runtime.taskmanager.TaskManager$$anon$3$$anonfun$getValue$2.apply(TaskManager.scala:2351)
      	at scala.Option.map(Option.scala:145)
      	at org.apache.flink.runtime.taskmanager.TaskManager$$anon$3.getValue(TaskManager.scala:2351)
      	at org.apache.flink.runtime.taskmanager.TaskManager$$anon$3.getValue(TaskManager.scala:2348)
      	at com.codahale.metrics.json.MetricsModule$GaugeSerializer.serialize(MetricsModule.java:32)
      	at com.codahale.metrics.json.MetricsModule$GaugeSerializer.serialize(MetricsModule.java:20)
      	at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeFields(MapSerializer.java:616)
      	at com.fasterxml.jackson.databind.ser.std.MapSerializer.serialize(MapSerializer.java:519)
      	at com.fasterxml.jackson.databind.ser.std.MapSerializer.serialize(MapSerializer.java:31)
      	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130)
      	at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:2444)
      	at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:355)
      	at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1442)
      	at com.codahale.metrics.json.MetricsModule$MetricRegistrySerializer.serialize(MetricsModule.java:186)
      	at com.codahale.metrics.json.MetricsModule$MetricRegistrySerializer.serialize(MetricsModule.java:171)
      	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130)
      	at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3631)
      	at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:3022)
      	at org.apache.flink.runtime.taskmanager.TaskManager.sendHeartbeatToJobManager(TaskManager.scala:1278)
      	at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:309)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
      	at org.apache.flink.runtime.testingUtils.TestingTaskManagerLike$$anonfun$handleTestingMessage$1.applyOrElse(TestingTaskManagerLike.scala:65)
      	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
      	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
      	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
      	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
      	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
      	at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
      	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zentol opened a pull request:

          https://github.com/apache/flink/pull/2445

          FLINK-4544 Refactor old CPU metric initialization

          This PR refactors the old CPU metric initialization code to no longer rely on reflection.

          Instead it eagerly casts the `OperatingSystemMXBean` returned by `ManagementFactory#getOperationSystemMXBean()` to `com.sun.management.OperatingSystemMXBean`. Should this succeed it is now guaranteed that the gauge will not throw a `ClassCastException` since it doesn't have to cast anything; if it fails a dummy Gauge is registered that returns -1 in order to retain existing behavior.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zentol/flink 4544_tm_cpu

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2445.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2445


          commit 51ff47711cde8d32d8d0b6d2abf3bd264d207342
          Author: zentol <chesnay@apache.org>
          Date: 2016-08-31T15:46:10Z

          FLINK-4544 Refactor old CPU metric initialization


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/2445 FLINK-4544 Refactor old CPU metric initialization This PR refactors the old CPU metric initialization code to no longer rely on reflection. Instead it eagerly casts the `OperatingSystemMXBean` returned by `ManagementFactory#getOperationSystemMXBean()` to `com.sun.management.OperatingSystemMXBean`. Should this succeed it is now guaranteed that the gauge will not throw a `ClassCastException` since it doesn't have to cast anything; if it fails a dummy Gauge is registered that returns -1 in order to retain existing behavior. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 4544_tm_cpu Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2445.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2445 commit 51ff47711cde8d32d8d0b6d2abf3bd264d207342 Author: zentol <chesnay@apache.org> Date: 2016-08-31T15:46:10Z FLINK-4544 Refactor old CPU metric initialization
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2445

          Does this work? The reason for the initial reflection-based initialization was that the sun-specific class may not be available in some environments. I am unsure when the "ClassNotFound" error will be thrown. Upon loading of the TaskManager? Or upon running the code that installs the Gauge. I fear it may be thrown earlier than when the Gauge is created.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2445 Does this work? The reason for the initial reflection-based initialization was that the sun-specific class may not be available in some environments. I am unsure when the "ClassNotFound" error will be thrown. Upon loading of the TaskManager? Or upon running the code that installs the Gauge. I fear it may be thrown earlier than when the Gauge is created.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/2445

          This is the pretty-much same code that the new CPU metric uses which has not caused issues so far.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2445 This is the pretty-much same code that the new CPU metric uses which has not caused issues so far.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/2445

          But i see the issue that could arise, will rework the solution.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2445 But i see the issue that could arise, will rework the solution.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/2445

          Version 2 is up. Essentially, we now attempt to use the methods before registering the metrics. I've also adjusted the new CPU metrics.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2445 Version 2 is up. Essentially, we now attempt to use the methods before registering the metrics. I've also adjusted the new CPU metrics.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2445

          Can we move this code out of the TaskManager as a whole, into a metrics Utility?
          We could make it reusable for the JobManager as well, by passing the metric group where that should be added.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2445 Can we move this code out of the TaskManager as a whole, into a metrics Utility? We could make it reusable for the JobManager as well, by passing the metric group where that should be added.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/2445

          that should be doable, yes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2445 that should be doable, yes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/2445

          @StephanEwen I moved the code into a separate class.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2445 @StephanEwen I moved the code into a separate class.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2445

          The approach looks good, but I think it would be good to adjust a few things:

          • We are migrating most of the Scala code in `flink-runtime` to Java (for example in FLIP-6), so would be good if this new code was also Java
          • Some of the Gauges use some form of JMX querying (`getAttribute(ObjectName)`) that looks more expensive then directly accessing a bean (calling a method)
          • Some metrics may not be available on all JVMs (like mapped direct memory). Can this make sure we don't log an exception on each access to the Gauge, when the JMX query fails?
          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2445 The approach looks good, but I think it would be good to adjust a few things: We are migrating most of the Scala code in `flink-runtime` to Java (for example in FLIP-6), so would be good if this new code was also Java Some of the Gauges use some form of JMX querying (`getAttribute(ObjectName)`) that looks more expensive then directly accessing a bean (calling a method) Some metrics may not be available on all JVMs (like mapped direct memory). Can this make sure we don't log an exception on each access to the Gauge, when the JMX query fails?
          Hide
          Zentol Chesnay Schepler added a comment -

          Fixed in 5b54009ebdf1602bdc9860b46ee34e65ef74246a.

          This issue was extended to a more thorough refactoring of the JM/TM metrics.

          The initial issue regarding the OperatorSystemMXBean was fixed. Other changes include

          • the metrics were reqritten in java due to FLIP-6
          • the BufferPool metrics are no longer fetched through JMX and instead use BufferPoolMXBeans
          • the entire JM/TM metrics initialization was moved into a common util class
          Show
          Zentol Chesnay Schepler added a comment - Fixed in 5b54009ebdf1602bdc9860b46ee34e65ef74246a. This issue was extended to a more thorough refactoring of the JM/TM metrics. The initial issue regarding the OperatorSystemMXBean was fixed. Other changes include the metrics were reqritten in java due to FLIP-6 the BufferPool metrics are no longer fetched through JMX and instead use BufferPoolMXBeans the entire JM/TM metrics initialization was moved into a common util class
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol closed the pull request at:

          https://github.com/apache/flink/pull/2445

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol closed the pull request at: https://github.com/apache/flink/pull/2445

            People

            • Assignee:
              Zentol Chesnay Schepler
              Reporter:
              StephanEwen Stephan Ewen
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development