Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
2.0.0, 2.0.1, 2.0.2
-
None
-
None
Description
We have a Spark streaming application, that processes data from Kinesis.
In our application we are observing a memory leak at the Executors with Netty buffers not being released properly, when the Spark BlockManager tries to replicate the input blocks received from Kinesis stream. The leak occurs, when we set Storage Level as MEMORY_AND_DISK_2 for the Kinesis input blocks. However, if we change the Storage level to use MEMORY_AND_DISK, which avoids creating a replica, we do not observe the leak any more. We were able to detect the leak, and obtain the stack trace by running the executors with an additional JVM option: -Dio.netty.leakDetectionLevel=advanced.
Here is the stack trace of the leak:
16/12/06 22:30:12 ERROR ResourceLeakDetector: LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 0
Created at:
io.netty.buffer.CompositeByteBuf.<init>(CompositeByteBuf.java:103)
io.netty.buffer.Unpooled.wrappedBuffer(Unpooled.java:335)
io.netty.buffer.Unpooled.wrappedBuffer(Unpooled.java:247)
org.apache.spark.util.io.ChunkedByteBuffer.toNetty(ChunkedByteBuffer.scala:69)
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$replicate(BlockManager.scala:1182)
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:997)
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:702)
org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:80)
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:158)
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:129)
org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:133)
org.apache.spark.streaming.kinesis.KinesisReceiver.org$apache$spark$streaming$kinesis$KinesisReceiver$$storeBlockWithRanges(KinesisReceiver.scala:282)
org.apache.spark.streaming.kinesis.KinesisReceiver$GeneratedBlockHandler.onPushBlock(KinesisReceiver.scala:352)
org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:297)
org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:269)
org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:110)
We also observe a continuous increase in off heap memory usage at the executors. Any help would be appreciated.
Attachments
Issue Links
- duplicates
-
SPARK-18375 Upgrade netty to 4.0.42.Final
- Resolved