If the Kafka brokers' timeout is too low for our checkpoint interval , we may get an ProducerFencedException. Documentation around ProducerFencedException explicitly states that we should close the producer after encountering it.
By looking at the code, it doesn't seem like this is actually done in FlinkKafkaProducer011. Also, in case one transaction's commit in TwoPhaseCommitSinkFunction#notifyCheckpointComplete fails with an exception, we don't clean up (nor try to commit) any other transaction.
-> from what I see, TwoPhaseCommitSinkFunction#notifyCheckpointComplete simply iterates over the pendingCommitTransactions which is not touched during close()
Now if we restart the failing job on the same Flink cluster, any resources from the previous attempt will still linger around.