Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.19.0, 1.20.0
-
None
-
None
-
flink: 1.20
flink kafka connector: 3.3.0-1.20
Description
flink kafka connector do't close FlinkKafkaInternalProducer when flink checkpoint success in flink 1.20/1.19 . it will create one FlinkKafkaInternalProducer per checkpoint.
FlinkKafkaInternalProducer do not close automatic. so kafka producer network thread will more and more
it create `getRecoveryProducer` each time, `recyclable` object always null, so `recyclable.ifPresent(Recyclable::close)` not work.
`org.apache.flink.connector.kafka.sink.KafkaCommitter`
producer = recyclable .<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject) .orElseGet(() -> getRecoveryProducer(committable)); producer.commitTransaction(); producer.flush(); recyclable.ifPresent(Recyclable::close);