Description
Using KafkaInputDStream causes OutOfMemoryErrors after running for a period of time (1-2 minutes in my case). The bug is not within Spark Streaming, but rather has something to do with how the provided jar under /streaming/lib/ was packaged.
If it is helpful- We have packaged Kafka 0.7.2 /w scala 2.9.2 that we run in many prod environments that we would be happy to provide.
The issue is easily reproducible, here is code I ran using the kafka jar that is bundled with Spark Streaming-
import java.util.concurrent.Executors import java.util.Properties import kafka.consumer._ import kafka.message.{Message, MessageSet, MessageAndMetadata} import kafka.serializer.StringDecoder import kafka.utils.{Utils, ZKGroupTopicDirs} private class MessageHandler(stream: KafkaStream[String]) extends Runnable { var index = 0 def run() { stream.takeWhile { msgAndMetadata => if (index%1000 == 0) { println("got: " + index) } index += 1 true } } } object KafkaTest { def main(args: Array[String]) { val props = new Properties() props.put("zk.connect", "ozoo01.staging.dmz,ozoo02.staging.dmz,ozoo03.staging.dmz") props.put("groupid", "my-cool-consumer-group") props.put("zk.connectiontimeout.ms", "100000") val executorPool = Executors.newFixedThreadPool(1) val consumerConfig = new ConsumerConfig(props) val consumerConnector: ZookeeperConsumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector] val topicMessageStreams = consumerConnector.createMessageStreams(Map("las_01_scsRawHits" -> 1), new StringDecoder()) topicMessageStreams.values.foreach { streams => streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } } } }
Here is the output:
got: 0
got: 1000
got: 2000
got: 3000
got: 4000
...
got: 158000
got: 159000
got: 160000
got: 161000
got: 162000
13/03/12 22:47:14 ERROR network.BoundedByteBufferReceive: OOME with size 4194330
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
at kafka.network.Receive$class.readCompletely(Transmission.scala:55)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177)
at kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117)
at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115)
at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60)
13/03/12 22:47:14 ERROR consumer.FetcherRunnable: error in FetcherRunnable
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
at kafka.network.Receive$class.readCompletely(Transmission.scala:55)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177)
at kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117)
at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115)
at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60)
13/03/12 22:47:14 INFO consumer.FetcherRunnable: stopping fetcher FetchRunnable-1 to host oagg01.staging.dmz
got: 163000
13/03/12 22:47:16 ERROR network.BoundedByteBufferReceive: OOME with size 4194330
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
at kafka.network.Receive$class.readCompletely(Transmission.scala:55)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177)
at kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117)
at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115)
at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60)
13/03/12 22:47:16 ERROR consumer.FetcherRunnable: error in FetcherRunnable
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
at kafka.network.Receive$class.readCompletely(Transmission.scala:55)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:177)
at kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:117)
at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:115)
at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:60)
13/03/12 22:47:18 INFO consumer.FetcherRunnable: stopping fetcher FetchRunnable-0 to host oagg02.staging.dmz