Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22452

Support specifying custom transactional.id prefix in FlinkKafkaProducer

    XMLWordPrintableJSON

Details

    Description

      Currently, the "transactional.id"s of the Kafka producers in FlinkKafkaProducer are generated based on the task name. This mechanism has some limitations:

      • It will exceed Kafka's limitation if the task name is too long. (resolved in FLINK-17691)
      • They will very likely clash each other if the job topologies are similar. (discussed in FLINK-11654)
      • Only certain "transactional.id" may be authorized by Prefixed ACLs on the target Kafka cluster.

      Besides, the spring community has introduced the setTransactionIdPrefix method to their Kafka client.

      Therefore, I think it will be necessary to have this feature in the Flink Kafka connector. 

       

      As discussed in FLINK-11654, the possible solution will be,

      • either introduce an additional method called setTransactionalIdPrefix(String) in the FlinkKafkaProducer,
      • or use the existing "transactional.id" properties as the prefix.

       And the behavior of the "transactional.id" generation will be

      • keep the behavior as it was if absent,
      • use the one if present as the prefix for the TransactionalIdsGenerator.

      Attachments

        Issue Links

          Activity

            People

              zetaplusae Wenhao Ji
              zetaplusae Wenhao Ji
              Votes:
              2 Vote for this issue
              Watchers:
              10 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: