Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
1.6.1
-
None
-
hdp 2.4.2.0-258
spark 1.6
Description
just in my way to know how stream , chunk , block works in netty found some nasty case.
process OpenBlocks message registerStream Stream in OneForOneStreamManager
org.apache.spark.network.server.OneForOneStreamManager#registerStream
fill with streamState with app & buber
process ChunkFetchRequest registerChannel
org.apache.spark.network.server.OneForOneStreamManager#registerChannel
fill with streamState with channel
In
org.apache.spark.network.shuffle.OneForOneBlockFetcher#start
OpenBlocks -> ChunkFetchRequest come in sequnce.
spark-1.6.1\network\common\src\main\java\org\apache\spark\network\server\OneForOneStreamManager.java
@Override
public void registerChannel(Channel channel, long streamId) {
if (streams.containsKey(streamId))
}
this is only chance associatedChannel is set
public void connectionTerminated(Channel channel) {
// Close all streams which have been associated with the channel.
for (Map.Entry<Long, StreamState> entry: streams.entrySet()) {
StreamState state = entry.getValue();
if (state.associatedChannel == channel) {
streams.remove(entry.getKey());
// Release all remaining buffers.
while (state.buffers.hasNext())
}
}
this is only chance state.buffers is released.
If network down in OpenBlocks process, no more ChunkFetchRequest message then.
So, channel can not be set.
So, we can see some leaked Buffer in OneForOneStreamManager
if org.apache.spark.network.server.OneForOneStreamManager.StreamState#associatedChannel is not set, then after search the code , it will remain in memory forever.
Which may lead to OOM in NodeManager.
Because the only way to release it was in channel close , or someone read the last piece of block.
OneForOneStreamManager#registerStream we can set channel in this method, just in case of this case.
We should set channel when we registerStream, so buffer can be released.