Details

    Description

      Problem description

      Our Flink streaming job TaskManager heap gets full when the job has nothing to consume and process.

      It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. When there are no messages in the source topic the TaskManager heap usage starts increasing until the job exits after receiving a SIGTERM signal. We are running the job on AWS EMR with YARN.

      The problems with the TaskManager heap usage do not occur when there is data to process. It's also worth noting that sending a single message to the source topic of a streaming job that has been sitting idle and suffers from the memory leak will cause the heap to be cleared. However it does not resolve the problem since the heap usage will start increasing immediately after processing the message.

      TaskManager heap used percentage is calculated by 

       

      flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / flink.taskmanager.Status.JVM.Memory.Heap.Max

       

       

       I was able to take heap dumps of the TaskManager processes during a high heap usage percentage. Heap dump analysis detected 912,355 instances of java.util.HashMap empty collections retaining >= 43,793,040 bytes.

      The retained heap seemed to be located at:

       

      org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit

       

       

      Possible hints:

      An empty HashMap is added during the snapshotState method to offsetsToCommit map if it does not already exist for the given checkpoint. KafkaSourceReader line 107

       

      Map<TopicPartition, OffsetAndMetadata> offsetsMap =
              offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); 

       

      If the startingOffset for the given split is >= 0 then a new entry would be added to the map from the previous step. KafkaSourceReader line 113

      if (split.getStartingOffset() >= 0) {
          offsetsMap.put(
              split.getTopicPartition(),
              new OffsetAndMetadata(split.getStartingOffset()));
      }

      If the starting offset is smaller than 0 then this would leave the offsetMap created in step 1 empty. We can see from the logs that the startingOffset is -3 when the splits are added to the reader.

       

      Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, StoppingOffset: -9223372036854775808], [Partition: source-events-44, StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: source-events-36, StartingOffset: 1, StoppingOffset: -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: source-events-28, StartingOffset: -3, StoppingOffset: -9223372036854775808]]

       

       

      The offsetsToCommit map is cleaned from entries once they have been committed to Kafka which happens during the callback function that is passed to the KafkaSourceFetcherManager.commitOffsets method in KafkaSourceReader.notifyCheckpointComplete method.

      However if the committedPartitions is empty for the given checkpoint, then the KafkaSourceFetcherManager.commitOffsets method returns.  KafkaSourceFetcherManager line 78

      if (offsetsToCommit.isEmpty()) {
          return;
      } 

      We can observe from the logs that indeed an empty map is encountered at this step:

      Committing offsets {}

      Conclusion

      It seems that an empty map gets added per each checkpoint to offsetsToCommit map. Since the startingOffset in our case is -3 then the empty map never gets filled. During the offset commit phase the offsets for these checkpoints are ignored, since there is nothing to commit, however there isn't any cleanup either so the empty maps keep accumulating. 

       

      Attachments

        Activity

          Lauri Suurväli great debugging!

          I think the fix is basically, in KafkaSourceFetcherManager#commitOffsets, if the provided offsetsToCommitMap is empty, the callback (where the logic for truncating the map) should be used as well. Currently, it just returns without calling the callback at all. Code link: https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L78-L80

          tzulitai Tzu-Li (Gordon) Tai added a comment - Lauri Suurväli great debugging! I think the fix is basically, in KafkaSourceFetcherManager#commitOffsets, if the provided offsetsToCommitMap is empty, the callback (where the logic for truncating the map) should be used as well. Currently, it just returns without calling the callback at all. Code link: https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L78-L80

          Making this a blocker for the upcoming Kafka connector releases.

          tzulitai Tzu-Li (Gordon) Tai added a comment - Making this a blocker for the upcoming Kafka connector releases.

          Tzu-Li (Gordon) Tai thank you for the comment!
          Would removing the code that you linked, which returns in case of an empty offsetsToCommit, be an option to solve this issue? The remaining code would end up in org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#sendOffsetCommitRequest which would return a successful response locally in case of an empty offsets map. kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java at master · a0x8o/kafka (github.com)

          Since the request is handled locally then perhaps this is a good way to ensure that the callback function isn't discarded. Would this sort of an approach bring any additional overhead that we would like to avoid or perhaps I am missing something?

          lauri.suurvali Lauri Suurväli added a comment - Tzu-Li (Gordon) Tai thank you for the comment! Would removing the code that you linked, which returns in case of an empty offsetsToCommit, be an option to solve this issue? The remaining code would end up in org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#sendOffsetCommitRequest which would return a successful response locally in case of an empty offsets map. kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java at master · a0x8o/kafka (github.com) Since the request is handled locally then perhaps this is a good way to ensure that the callback function isn't discarded. Would this sort of an approach bring any additional overhead that we would like to avoid or perhaps I am missing something?

          Lauri Suurväli I think that would work, but the issue is that in the callback, on success we log that a commit was successful, and also source reader metrics is bumped. Which can be confusing if no offsets were actually committed. Moreoever, with that approach we would be relying on internal details of the Kafka client that is hard to cover with tests (i.e. things might silently change such that a remote request is issued even if provided offsets are empty, which is not ideal).

          So, I think we can be a bit cleaner by short-cutting the notifyCheckpointComplete method such that is the offsets for a checkpoint is empty, we don't even attempt to use the fetcher manager to try to commit offsets.

          tzulitai Tzu-Li (Gordon) Tai added a comment - Lauri Suurväli I think that would work, but the issue is that in the callback, on success we log that a commit was successful, and also source reader metrics is bumped. Which can be confusing if no offsets were actually committed. Moreoever, with that approach we would be relying on internal details of the Kafka client that is hard to cover with tests (i.e. things might silently change such that a remote request is issued even if provided offsets are empty, which is not ideal). So, I think we can be a bit cleaner by short-cutting the notifyCheckpointComplete method such that is the offsets for a checkpoint is empty, we don't even attempt to use the fetcher manager to try to commit offsets.
          martijnvisser Martijn Visser added a comment -

          Fixed in apache/flink-connector-kafka

          main b0f15f279915a3862d353c9d2726a52741c248ba
          v3.0 b316d8ce25cc63bc6cdb858831e8566164a16b81

          martijnvisser Martijn Visser added a comment - Fixed in apache/flink-connector-kafka main b0f15f279915a3862d353c9d2726a52741c248ba v3.0 b316d8ce25cc63bc6cdb858831e8566164a16b81

          People

            tzulitai Tzu-Li (Gordon) Tai
            lauri.suurvali Lauri Suurväli
            Votes:
            0 Vote for this issue
            Watchers:
            Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack