Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10455

Potential Kafka producer leak in case of failures

    Details

      Description

      If the Kafka brokers' timeout is too low for our checkpoint interval [1], we may get an ProducerFencedException. Documentation around ProducerFencedException explicitly states that we should close the producer after encountering it.

      By looking at the code, it doesn't seem like this is actually done in FlinkKafkaProducer011. Also, in case one transaction's commit in TwoPhaseCommitSinkFunction#notifyCheckpointComplete fails with an exception, we don't clean up (nor try to commit) any other transaction.
      -> from what I see, TwoPhaseCommitSinkFunction#notifyCheckpointComplete simply iterates over the pendingCommitTransactions which is not touched during close()

      Now if we restart the failing job on the same Flink cluster, any resources from the previous attempt will still linger around.

      [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                azagrebin Andrey Zagrebin
                Reporter:
                NicoK Nico Kruber
              • Votes:
                1 Vote for this issue
                Watchers:
                7 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: