Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10455

Potential Kafka producer leak in case of failures

    XMLWordPrintableJSON

Details

    Description

      If the Kafka brokers' timeout is too low for our checkpoint interval [1], we may get an ProducerFencedException. Documentation around ProducerFencedException explicitly states that we should close the producer after encountering it.

      By looking at the code, it doesn't seem like this is actually done in FlinkKafkaProducer011. Also, in case one transaction's commit in TwoPhaseCommitSinkFunction#notifyCheckpointComplete fails with an exception, we don't clean up (nor try to commit) any other transaction.
      -> from what I see, TwoPhaseCommitSinkFunction#notifyCheckpointComplete simply iterates over the pendingCommitTransactions which is not touched during close()

      Now if we restart the failing job on the same Flink cluster, any resources from the previous attempt will still linger around.

      [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011

      Attachments

        1. image-2019-06-04-14-30-55-985.png
          473 kB
          sunjincheng
        2. image-2019-06-04-14-25-16-916.png
          804 kB
          sunjincheng

        Issue Links

          Activity

            People

              becket_qin Jiangjie Qin
              nkruber Nico Kruber
              Votes:
              1 Vote for this issue
              Watchers:
              14 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1h 10m
                  1h 10m