Details
-
Improvement
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
None
-
None
Description
This issue came out of debuging a warning we're seeing in our Flink logs. We're running Flink 1.18 and have an application that uses Kafka topics as a source and a sink. We're running with several tasks. The warning we're seeing in the logs is:
WARN org.apache.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=kafka producer client id
I've spent a bit of time debugging, and it looks like the root cause of this warning is the Flink KafkaSink creating multiple KafkaWriters that, in turn, create multiple KafkaProducers with the same Kafka producer `client.id`. Since the value for client.id is used when registering the AppInfo MBean — when multiple KafkaProducers with the same client.id are registered we get the above InstanceAlreadyExistsException. Since we're running with several tasks and we get a Kafka producer per task this duplicate registration exception makes sense to me.
I'm wondering if the fix would be to update the KafkaSink.builder by adding a setClientIdPrefix method, similar to what we have already on the KafkaSource.builder.
Attachments
Issue Links
- links to