Details
-
Bug
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
1.3.2, 1.10.0
-
None
-
flink 1.3.2, kafka 0.9.1
Description
one taskmanager has multiple taskslot, one task fail because of create kafkaProducer fail,the reason for create kafkaProducer fail is “javax.management.InstanceAlreadyExistsException: kafka.producer:type=producer-metrics,client-id=producer-3”。 the detail trace is :
2017-11-04 19:41:23,281 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Filter -> Map -> Filter -> Sink: dp_client_**_log (7/80) (99551f3f892232d7df5eb9060fa9940c) switched from RUNNING to FAILED. org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:181) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getKafkaProducer(FlinkKafkaProducerBase.java:202) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.open(FlinkKafkaProducerBase.java:212) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.KafkaException: Error registering mbean kafka.producer:type=producer-metrics,client-id=producer-3 at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:159) at org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77) at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:288) at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:255) at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:239) at org.apache.kafka.clients.producer.internals.RecordAccumulator.registerMetrics(RecordAccumulator.java:137) at org.apache.kafka.clients.producer.internals.RecordAccumulator.<init>(RecordAccumulator.java:111) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:261) ... 9 more Caused by: javax.management.InstanceAlreadyExistsException: kafka.producer:type=producer-metrics,client-id=producer-3 at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:157) ... 16 more
I doubt that task in different taskslot of one taskmanager use different classloader, and taskid may be the same in one process。 So this lead to create kafkaProducer fail in one taskManager。
Does anybody encountered the same problem?
Attachments
Issue Links
- is fixed by
-
FLINK-28842 Add client.id.prefix for the KafkaSink
- Open