Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10455

Potential Kafka producer leak in case of failures

    XMLWordPrintableJSON

    Details

      Description

      If the Kafka brokers' timeout is too low for our checkpoint interval [1], we may get an ProducerFencedException. Documentation around ProducerFencedException explicitly states that we should close the producer after encountering it.

      By looking at the code, it doesn't seem like this is actually done in FlinkKafkaProducer011. Also, in case one transaction's commit in TwoPhaseCommitSinkFunction#notifyCheckpointComplete fails with an exception, we don't clean up (nor try to commit) any other transaction.
      -> from what I see, TwoPhaseCommitSinkFunction#notifyCheckpointComplete simply iterates over the pendingCommitTransactions which is not touched during close()

      Now if we restart the failing job on the same Flink cluster, any resources from the previous attempt will still linger around.

      [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011

        Attachments

        1. image-2019-06-04-14-30-55-985.png
          473 kB
          sunjincheng
        2. image-2019-06-04-14-25-16-916.png
          804 kB
          sunjincheng

          Issue Links

            Activity

              People

              • Assignee:
                becket_qin Jiangjie Qin
                Reporter:
                NicoK Nico Kruber
              • Votes:
                1 Vote for this issue
                Watchers:
                14 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1h 10m
                  1h 10m