Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6301

Flink KafkaConnector09 leaks memory on reading compressed messages due to a Kafka consumer bug

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Duplicate
    • Affects Version/s: 1.2.0, 1.1.3, 1.1.4
    • Fix Version/s: None
    • Component/s: Kafka Connector
    • Labels:
      None

      Description

      Hi

      We are running Flink on a standalone cluster with 8 TaskManagers having 8 vCPUs and 8 slots each. Each host has 16 GB of RAM.

      In our jobs,

      1. We are consuming gzip compressed messages from Kafka using KafkaConnector09 and use rocksDB backend for checkpoint storage.
      2. To debug the leak, we used jemalloc and jprof to profile the sources of malloc calls from the java process and attached are the profiles generated at various stages of the job. As we can see, apart from the os.malloc and rocksDB.allocateNewBlock, there are additional malloc calls coming from inflate() method of java.util.zip.inflater. These calls are innocuous as long as the inflater.end() method is called after it's use.
      3. To look for sources of inflate() method, we used Btrace on the running process to dump caller stack on the method call. Following is the stackTrace we got:
        java.util.zip.Inflater.inflate(Inflater.java)
        java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
        java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117)
        java.io.DataInputStream.readFully(DataInputStream.java:195)
        org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:253)
        org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210)
        org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
        org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
        org.apache.kafka.common.record.MemoryRecords$RecordsIterator.innerDone(MemoryRecords.java:282)
        org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:233)
        org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210)
        org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
        org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
        org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:563)
        org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:69)
        org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:139)
        org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:136)
        org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
        org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
        org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
        org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
        org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
        org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
        org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:227)
        

        The end() method on Inflater is called inside the close() method of InflaterInputSteam (extended by GZIPInputStream) but looking through the Kafka consumer code, we found that RecordsIterator is not closing the compressor stream after use and hence, causing the memory leak:

      https://github.com/apache/kafka/blob/23c69d62a0cabf06c4db8b338f0ca824dc6d81a7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L210

      https://issues.apache.org/jira/browse/KAFKA-3937 was filed for this and the issue was fixed in 0.10.1.0 but not back-ported to previous versions.

      So, I would assume that we have to two paths from here:
      1. Wait for the changes to be back-ported to 0.9.x Kafka consumer and then, update the Kafka-clients dependency:
      https://github.com/apache/flink/blob/release-1.2/flink-connectors/flink-connector-kafka-0.9/pom.xml#L40

      2. Update the kafka-connector10 to use 0.10.1.0 clients library instead of 0.10.0.1.
      https://github.com/apache/flink/blob/release-1.2/flink-connectors/flink-connector-kafka-0.10/pom.xml#L40

      Apart from the master, also back-port the changes to 1.2.x for Kafka connector 10 and all the 1.x dependencies for Kafka connector 09.

        Attachments

        1. jeprof.24611.1228.i1228.heap.svg
          88 kB
          Rahul Yadav
        2. jeprof.24611.1695.i1695.heap.svg
          88 kB
          Rahul Yadav
        3. jeprof.24611.265.i265.heap.svg
          88 kB
          Rahul Yadav
        4. jeprof.24611.3138.i3138.heap.svg
          87 kB
          Rahul Yadav
        5. jeprof.24611.595.i595.heap.svg
          89 kB
          Rahul Yadav
        6. jeprof.24611.705.i705.heap.svg
          89 kB
          Rahul Yadav
        7. jeprof.24611.81.i81.heap.svg
          87 kB
          Rahul Yadav
        8. POSTFIX.jeprof.14880.1944.i1944.heap.svg
          93 kB
          Rahul Yadav
        9. POSTFIX.jeprof.14880.4129.i4129.heap.svg
          93 kB
          Rahul Yadav
        10. POSTFIX.jeprof.14880.9.i9.heap.svg
          91 kB
          Rahul Yadav
        11. POSTFIX.jeprof.14880.961.i961.heap.svg
          93 kB
          Rahul Yadav
        12. POSTFIX.jeprof.14880.99.i99.heap.svg
          93 kB
          Rahul Yadav

          Issue Links

            Activity

              People

              • Assignee:
                vidhu5269 Rahul Yadav
                Reporter:
                vidhu5269 Rahul Yadav
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: