Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-3552

New Consumer: java.lang.OutOfMemoryError: Direct buffer memory

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 0.9.0.1
    • None
    • consumer
    • None

    Description

      I'm running Kafka's new consumer with message handlers that can sometimes take a lot of time to return, and combining that with manual offset management (to get at-least-once semantics). Since poll() is the only way to heartbeat with the consumer, I have a thread that runs every 500 milliseconds that does the following:

      1) Pause all partitions
      2) Call poll(0)
      3) Resume all partitions

      For the record, all accesses to KafkaConsumer are protected by synchronized blocks. This generally works, but I'm occasionally seeing messages like this:

      java.lang.OutOfMemoryError: Direct buffer memory
              at java.nio.Bits.reserveMemory(Bits.java:658)
              at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
              at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
              at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
              at sun.nio.ch.IOUtil.read(IOUtil.java:195)
              at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
              at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:108)
              at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
              at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
              at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
              at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
              at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
              at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
              at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
              at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
              at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
              at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
              at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
      

      In addition, when I'm reporting offsets, I'm seeing:

      java.lang.OutOfMemoryError: Direct buffer memory
              at java.nio.Bits.reserveMemory(Bits.java:658)
              at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
              at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
              at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
              at sun.nio.ch.IOUtil.read(IOUtil.java:195)
              at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
              at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:108)
              at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
              at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
              at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
              at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
              at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
              at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
              at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
              at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
              at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
              at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
              at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358)
              at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
      

      Given that I'm just calling the library, this behavior is unexpected.

      Attachments

        1. Screen Shot 2016-04-13 at 2.17.48 PM.png
          152 kB
          Kanak Biscuitwala
        2. Screen Shot 2016-04-13 at 11.56.05 AM.png
          40 kB
          Kanak Biscuitwala

        Activity

          People

            liquanpei Liquan Pei
            kanak Kanak Biscuitwala
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated: