Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4544

TaskManager metrics are vulnerable to custom JMX bean installation

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 1.1.2
    • 1.2.0
    • Runtime / Metrics
    • None

    Description

      The TaskManager's CPU load magic may fail when JMX providers are overwritten.

      The TaskManager logic checks if the class com.sun.management.OperatingSystemMXBean is available. If yes, it assumes that the ManagementFactory.getOperatingSystemMXBean() is of that type. That is not necessarily the case.

      This is visible in the Cassandra tests, as Cassandra overrides the JMX provider - every heartbeat causes an exception that is logged (See below), flooding the log, killing the heartbeat message.

      I would also suggest to move the entire metrics code out of the TaskManager class into a dedicated class TaskManagerJvmMetrics. That one can, with a static method, install the metrics into the TaskManager's metric group.

      Sample stack trace when default platform beans are overridden:

      23914 [flink-akka.actor.default-dispatcher-3] WARN  org.apache.flink.runtime.taskmanager.TaskManager  - Error retrieving CPU Load through OperatingSystemMXBean
      java.lang.IllegalArgumentException: object is not an instance of declaring class
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:497)
      	at org.apache.flink.runtime.taskmanager.TaskManager$$anon$3$$anonfun$getValue$2.apply(TaskManager.scala:2351)
      	at org.apache.flink.runtime.taskmanager.TaskManager$$anon$3$$anonfun$getValue$2.apply(TaskManager.scala:2351)
      	at scala.Option.map(Option.scala:145)
      	at org.apache.flink.runtime.taskmanager.TaskManager$$anon$3.getValue(TaskManager.scala:2351)
      	at org.apache.flink.runtime.taskmanager.TaskManager$$anon$3.getValue(TaskManager.scala:2348)
      	at com.codahale.metrics.json.MetricsModule$GaugeSerializer.serialize(MetricsModule.java:32)
      	at com.codahale.metrics.json.MetricsModule$GaugeSerializer.serialize(MetricsModule.java:20)
      	at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeFields(MapSerializer.java:616)
      	at com.fasterxml.jackson.databind.ser.std.MapSerializer.serialize(MapSerializer.java:519)
      	at com.fasterxml.jackson.databind.ser.std.MapSerializer.serialize(MapSerializer.java:31)
      	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130)
      	at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:2444)
      	at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:355)
      	at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1442)
      	at com.codahale.metrics.json.MetricsModule$MetricRegistrySerializer.serialize(MetricsModule.java:186)
      	at com.codahale.metrics.json.MetricsModule$MetricRegistrySerializer.serialize(MetricsModule.java:171)
      	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130)
      	at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3631)
      	at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:3022)
      	at org.apache.flink.runtime.taskmanager.TaskManager.sendHeartbeatToJobManager(TaskManager.scala:1278)
      	at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:309)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
      	at org.apache.flink.runtime.testingUtils.TestingTaskManagerLike$$anonfun$handleTestingMessage$1.applyOrElse(TestingTaskManagerLike.scala:65)
      	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
      	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
      	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
      	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
      	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
      	at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
      	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            chesnay Chesnay Schepler
            sewen Stephan Ewen
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment