Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10455

Potential Kafka producer leak in case of failures

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

      Description

      If the Kafka brokers' timeout is too low for our checkpoint interval [1], we may get an ProducerFencedException. Documentation around ProducerFencedException explicitly states that we should close the producer after encountering it.

      By looking at the code, it doesn't seem like this is actually done in FlinkKafkaProducer011. Also, in case one transaction's commit in TwoPhaseCommitSinkFunction#notifyCheckpointComplete fails with an exception, we don't clean up (nor try to commit) any other transaction.
      -> from what I see, TwoPhaseCommitSinkFunction#notifyCheckpointComplete simply iterates over the pendingCommitTransactions which is not touched during close()

      Now if we restart the failing job on the same Flink cluster, any resources from the previous attempt will still linger around.

      [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011

        Attachments

          Activity

            People

            • Assignee:
              becket_qin Jiangjie Qin
              Reporter:
              NicoK Nico Kruber

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 1h 10m
                1h 10m

                  Issue deployment