Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.4.0
-
Reviewed
Description
HADOOP-15327 introduced some regressions in the ShuffleHandler.
1. a memory leak
ERROR io.netty.util.ResourceLeakDetector: LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
The Shuffle's channelRead didn't release the message properly, the fix would be this:
try { // .... } finally { ReferenceCountUtil.release(msg); }
Or even simpler:
extends SimpleChannelInboundHandler<FullHttpRequest>
1. a bug in SSL mode with more than 1 reducers
It manifested in multiple errors:
ERROR org.apache.hadoop.mapred.ShuffleHandler: Future is unsuccessful. Cause:
java.io.IOException: Broken pipe
ERROR org.apache.hadoop.mapred.ShuffleHandler: Future is unsuccessful. Cause:
java.nio.channels.ClosedChannelException
// if the reducer memory was not enough, then even this:
Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#2
at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:136)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:377)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.io.compress.BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:123)
at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:98)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:210)
at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.doShuffle(InMemoryMapOutput.java:91)
Configuration - mapred-site.xml
mapreduce.shuffle.ssl.enabled=true
Alternative is to build a custom jar where FadvisedFileRegion is replaced with FadvisedChunkedFile in sendMapOutput.
Reproduction
hdfs dfs -rm -r -skipTrash /tmp/sort_input
hdfs dfs -rm -r -skipTrash /tmp/sort_output
yarn jar hadoop-3.4.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0-SNAPSHOT.jar randomwriter "-Dmapreduce.randomwriter.totalbytes=10000000000" /tmp/sort_input
yarn jar hadoop-3.4.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0-SNAPSHOT.jar sort -Dmapreduce.job.reduce.slowstart.completedmaps=1 -r 40 /tmp/sort_input /tmp/sort_output | tee sort_app_output.txt
ShuffleHandler's protocol
// HTTP Request GET /mapOutput?job=job_1672901779104_0001&reduce=0&map=attempt_1672901779104_0001_m_000003_0,attempt_1672901779104_0001_m_000002_0,attempt_1672901779104_0001_m_000001_0,attempt_1672901779104_0001_m_000000_0,attempt_1672901779104_0001_m_000005_0,attempt_1672901779104_0001_m_000012_0,attempt_1672901779104_0001_m_000009_0,attempt_1672901779104_0001_m_000010_0,attempt_1672901779104_0001_m_000007_0,attempt_1672901779104_0001_m_000011_0,attempt_1672901779104_0001_m_000008_0,attempt_1672901779104_0001_m_000013_0,attempt_1672901779104_0001_m_000014_0,attempt_1672901779104_0001_m_000015_0,attempt_1672901779104_0001_m_000019_0,attempt_1672901779104_0001_m_000018_0,attempt_1672901779104_0001_m_000016_0,attempt_1672901779104_0001_m_000017_0,attempt_1672901779104_0001_m_000020_0,attempt_1672901779104_0001_m_000023_0 HTTP/1.1 + keep alive headers // HTTP Response Headers content-length=sum(serialised ShuffleHeader in bytes + MapOutput size) + keep alive headers // Response Data (transfer-encoding=chunked) serialised ShuffleHeader content of the MapOutput file (start offset - length) serialised ShuffleHeader content of the MapOutput file (start offset - length) serialised ShuffleHeader content of the MapOutput file (start offset - length) serialised ShuffleHeader content of the MapOutput file (start offset - length) ... LastHttpContent // close socket if no keep-alive
Issues
- setResponseHeaders: did not always set the the content-length, also the transfer-encoding=chunked header was missing.
- ReduceMapFileCount.operationComplete: messed up the futures on the LastHttpContent
- ChannelGroup accepted: is only used to close the channels, no need for that magic 5. See the details here.
- bossGroup: should have only 1 thread for accepting connections.
- Shuffle: is unnecessarily Sharable, the 3 async sendMap / channel (see below) caused future errors when using FadvisedChunkedFile
Max session open files is not an optimisation, it's actually wasting resources
// by default maxSessionOpenFiles = 3 for (int i = 0; i < Math.min(handlerCtx.maxSessionOpenFiles, mapIds.size()); i++) { ChannelFuture nextMap = sendMap(reduceContext); if(nextMap == null) { return; } }
At the end of the day, we create a http chunked stream, there is no need to run 3 sendMap async, the futures will finish one-by-one sequentially. The osCache magic from the FAdvised classes won't happen either, because the first readChunk will be called only later.
So this can be simplified a lot:
sendMap(reduceContext);
My proposal
Some refactoring: ShuffleHandler is split into multiple classes to make it possible to remove the sharable annotation.
- ShuffleChannel
- ShuffleChannelInitializer
- ShuffleChannelHandlerContext
- ShuffleChannelHandler
TODO:
- fix/drop/refactor the existing unit tests
- add proper unit test that tests SSL/non-SSL mode where the response data is properly verified
- documentation about the protocol
WIP: github.com/tomicooler/hadoop
Netty useful docs
- User guide for 4.x
- New and noteworthy in 4.0
- Reference counted objects (it will be changed in Netty 5)
- HttpStaticFileServer example
Attachments
Attachments
Issue Links
- causes
-
MAPREDUCE-7434 Fix ShuffleHandler tests
- Resolved
-
MAPREDUCE-7433 Remove unused mapred/LoggingHttpResponseEncoder.java
- Resolved
- is caused by
-
HADOOP-15327 Upgrade MR ShuffleHandler to use Netty4
- Resolved
- links to