Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-712

Kafka OutOfMemoryError

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 0.7.0
    • 0.7.3, 0.8.0
    • DStreams
    • None

    Description

      Using KafkaInputDStream causes OutOfMemoryErrors after running for a period of time (1-2 minutes in my case). The bug is not within Spark Streaming, but rather has something to do with how the provided jar under /streaming/lib/ was packaged.

      If it is helpful- We have packaged Kafka 0.7.2 /w scala 2.9.2 that we run in many prod environments that we would be happy to provide.

      The issue is easily reproducible, here is code I ran using the kafka jar that is bundled with Spark Streaming-

      import java.util.concurrent.Executors
      import java.util.Properties
      import kafka.consumer._
      import kafka.message.{Message, MessageSet, MessageAndMetadata}
      import kafka.serializer.StringDecoder
      import kafka.utils.{Utils, ZKGroupTopicDirs}
      
      
      private class MessageHandler(stream: KafkaStream[String]) extends Runnable {
      
        var index = 0
      
        def run() {
          stream.takeWhile { msgAndMetadata =>
            if (index%1000 == 0) {
              println("got: " + index)
            }
            index += 1
            true
          }
        }
      }
      object KafkaTest {
        def main(args: Array[String]) {
          val props = new Properties()
          props.put("zk.connect", "ozoo01.staging.dmz,ozoo02.staging.dmz,ozoo03.staging.dmz")
          props.put("groupid", "my-cool-consumer-group")
          props.put("zk.connectiontimeout.ms", "100000")
      
          val executorPool = Executors.newFixedThreadPool(1)
      
      
      
          val consumerConfig = new ConsumerConfig(props)
          val consumerConnector: ZookeeperConsumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector]
          val topicMessageStreams = consumerConnector.createMessageStreams(Map("las_01_scsRawHits" -> 1), new StringDecoder())
      
          topicMessageStreams.values.foreach { streams =>
            streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
          }
        }
      }
      

      Here is the output:
      got: 0
      got: 1000
      got: 2000
      got: 3000
      got: 4000
      ...
      got: 158000
      got: 159000
      got: 160000
      got: 161000
      got: 162000
      13/03/12 22:47:14 ERROR network.BoundedByteBufferReceive: OOME with size 4194330
      java.lang.OutOfMemoryError: Java heap space
      at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
      at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
      at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
      at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
      at kafka.network.Receive$class.readCompletely(Transmission.scala:55)
      at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
      at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177)
      at kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117)
      at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115)
      at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60)
      13/03/12 22:47:14 ERROR consumer.FetcherRunnable: error in FetcherRunnable
      java.lang.OutOfMemoryError: Java heap space
      at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
      at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
      at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
      at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
      at kafka.network.Receive$class.readCompletely(Transmission.scala:55)
      at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
      at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177)
      at kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117)
      at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115)
      at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60)
      13/03/12 22:47:14 INFO consumer.FetcherRunnable: stopping fetcher FetchRunnable-1 to host oagg01.staging.dmz
      got: 163000
      13/03/12 22:47:16 ERROR network.BoundedByteBufferReceive: OOME with size 4194330
      java.lang.OutOfMemoryError: Java heap space
      at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
      at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
      at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
      at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
      at kafka.network.Receive$class.readCompletely(Transmission.scala:55)
      at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
      at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177)
      at kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117)
      at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115)
      at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60)
      13/03/12 22:47:16 ERROR consumer.FetcherRunnable: error in FetcherRunnable
      java.lang.OutOfMemoryError: Java heap space
      at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
      at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
      at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
      at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
      at kafka.network.Receive$class.readCompletely(Transmission.scala:55)
      at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
      at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177)
      at kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117)
      at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115)
      at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60)
      13/03/12 22:47:18 INFO consumer.FetcherRunnable: stopping fetcher FetchRunnable-0 to host oagg02.staging.dmz

      Attachments

        Activity

          People

            seanm SeanM
            seanm SeanM
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: