Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34554

Using EXACTLY_ONCE with KafkaSink cause broker's OOM due to newly created transactionalId per checkpoint

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Improvement
    • Status: In Progress
    • Major
    • Resolution: Unresolved
    • 1.16.3, 1.17.2, 1.18.1
    • None
    • Connectors / Kafka
    • None

    Description

      Flink version: 1.17.1
      Kafka Broker version: 2.7.1 * 4 GB heap memory for each

      Hi, We recently had an outage in our production system after we perform a Flink kafka-connector API upgrade. To give a brief context, our application is a simple kafka-to-kafka pipeline with minimal processing. We run in EXACTLY_ONCE mode, thus kafka transaction is involved.

      Our application runs with total around 350 sink subtask. Checkpoint period was set to 5 seconds to avoid blocking read_committed consumers too long. We recently performed an upgrade with the following details:

      Previous state:

      • Flink version: 1.14.4
      • Broker version: 2.7.1
      • kafka connector API: FlinkKafkaProducer

      Update to:

      • Flink version: 1.17.1
      • Broker version: 2.7.1
      • kafka connector API: KafkaSink

      Around 10 hours after the deployment, our kafka broker started to failing with OOM error. Heap dump entries are dominated by the ProducerStateEntry records.
      Our investigation leads to finding the total implementation change between FlinkKafkaProducer and KafkaSink.

      • KafkaSink generate different transactionalId for each checkpoint,
      • FlinkKafkaProducer uses constant set of transactionalId pool.

      With this behavior, KafkaSink seemed to exhaust our broker heap very fast and the ProducerStateEntry will only expire after transactional.id.expiration.ms , which by default is set to 7 days.  (ref1ref2ref3)

      For our job, it means it creates roughly:

      • 10 hour running) 350 ids * 12 times/minute * 60 min/hour * 10 hour ~ 2,520,000
      • 7 days) ~ 42mil entries.

      Attached below is the number of ProducerStateEntry entries of heap dump when it is OOM:

      • 505,000 (6.5%), in total it would be roughly ~ 7,000,000 entries.

      There are several things that come up in our mind to mitigate the drawbacks such as:

      • reduce the number of subtasks, so it reduces the number of transactionalId
      • Enlarge the checkpoint period to reduce the newly generated transactionalId rate.
      • Shorten transactional.id.expiration.ms to expire the unused transactionalId soon.
      • Increase the broker heap

      However, above mitigation might be too cumbersome and need careful tuning which harm our flexibility.In addition, due to the lack of maintaining lingering transaction state, TransactionAborter seems to abort old transaction naively. We might be accidentally (or purposefully) reuse the same transactionalIdPrefix and start the counter from 0. In that case, if the old transactionalId happens to have epoch >0, it will keep looping aborting the nonexistent transactions up to the latest checkpoint counter (which may be too big) and make the job stuck.

      Btw, I am aware that in Flink 2.0, you guys are putting a lot of effort on creating better integration with Kafka transaction (FLIP-319). In FLIP-319, it mentions something about TID pooling. However, it is seem that there is no relevant page yet for it, so I wonder whether there are any concrete plan already that I can follow, or if there is something I can contribute to, I will be really happy to help.
       
       

      Attachments

        1. image (4).png
          1.08 MB
          Hilmi Al Fatih
        2. image (5).png
          356 kB
          Hilmi Al Fatih
        3. image-2024-03-16-17-17-16-152.png
          553 kB
          Hilmi Al Fatih

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            arvid Arvid Heise
            hilmialf Hilmi Al Fatih

            Dates

              Created:
              Updated:

              Slack

                Issue deployment