Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4157

FlinkKafkaMetrics cause TaskManager shutdown during cancellation

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 1.0.3
    • 1.1.0
    • Connectors / Kafka
    • None

    Description

      The following issue was reported by a user:

      2016-07-05 01:32:25,113 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Sink: KafkaOutput (59/72)
      2016-07-05 01:32:25,113 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: KafkaOutput (53/72) switched to CANCELED
      2016-07-05 01:32:25,113 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Sink: KafkaOutput (53/72)
      2016-07-05 01:32:25,144 ERROR akka.actor.OneForOneStrategy                                  - 
      java.util.ConcurrentModificationException
      	at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
      	at java.util.HashMap$ValueIterator.next(HashMap.java:1458)
      	at org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:106)
      	at org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:211)
      	at org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:383)
      	at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:57)
      	at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
      	at org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator.writeObject(DefaultKafkaMetricAccumulator.java:152)
      	at sun.reflect.GeneratedMethodAccessor20859.invoke(Unknown Source)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:497)
      	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
      	at java.util.HashMap.internalWriteEntries(HashMap.java:1777)
      	at java.util.HashMap.writeObject(HashMap.java:1354)
      	at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:497)
      	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
      	at java.util.Collections$SynchronizedMap.writeObject(Collections.java:2691)
      	at sun.reflect.GeneratedMethodAccessor226.invoke(Unknown Source)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:497)
      	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
      	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
      	at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:48)
      	at org.apache.flink.runtime.accumulators.AccumulatorSnapshot.<init>(AccumulatorSnapshot.java:58)
      	at org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:78)
      	at org.apache.flink.runtime.taskmanager.TaskManager.unregisterTaskAndNotifyFinalState(TaskManager.scala:1150)
      	at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:407)
      	at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:265)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
      	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
      	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
      	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
      	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
      	at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:119)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
      	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      

      An immediate workaround is to disable the Kafka metrics by setting "flink.disable-metrics" to "true".
      I'll make the metrics serialization more robust by catching and logging the exceptions. Metrics should never cause system failures, or even worse shutdowns.

      Attachments

        Issue Links

          Activity

            People

              rmetzger Robert Metzger
              rmetzger Robert Metzger
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: