Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35283

Add support unique Kafka producer client ids

    XMLWordPrintableJSON

Details

    Description

      This issue came out of debuging a warning we're seeing in our Flink logs. We're running Flink 1.18 and have an application that uses Kafka topics as a source and a sink. We're running with several tasks. The warning we're seeing in the logs is:

      WARN org.apache.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean
      javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=kafka producer client id

      I've spent a bit of time debugging, and it looks like the root cause of this warning is the Flink KafkaSink creating multiple KafkaWriters that, in turn, create multiple KafkaProducers with the same Kafka producer `client.id`. Since the value for client.id is used when registering the AppInfo MBean — when multiple KafkaProducers with the same client.id are registered we get the above InstanceAlreadyExistsException. Since we're running with several tasks and we get a Kafka producer per task this duplicate registration exception makes sense to me.

      I'm wondering if the fix would be to update the KafkaSink.builder by adding a setClientIdPrefix method, similar to what we have already on the KafkaSource.builder.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              faltomare Francis
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated: