Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-36569

flink kafka connector do not close kafka produer when it checkpoint success

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.19.0, 1.20.0
    • None
    • Connectors / Kafka
    • None
    • flink: 1.20

      flink kafka connector: 3.3.0-1.20 

    Description

      flink kafka connector do't close FlinkKafkaInternalProducer when flink checkpoint success in flink 1.20/1.19 .  it will create one  FlinkKafkaInternalProducer per checkpoint.

       

      FlinkKafkaInternalProducer do not close automatic. so  kafka producer network thread will more and more 

       it create `getRecoveryProducer` each time,   `recyclable` object always null, so  `recyclable.ifPresent(Recyclable::close)` not work.

      `org.apache.flink.connector.kafka.sink.KafkaCommitter`

      producer =
      recyclable
      .<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject)
      .orElseGet(() -> getRecoveryProducer(committable));
      producer.commitTransaction();
      producer.flush();
      recyclable.ifPresent(Recyclable::close);

       

       

      Attachments

        1. image-2024-10-18-13-31-39-253.png
          222 kB
          Jake.zhang
        2. image-2024-10-21-14-02-41-823.png
          306 kB
          Jake.zhang

        Activity

          People

            Unassigned Unassigned
            ft20082 Jake.zhang
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: