Details
-
Improvement
-
Status: Patch Available
-
Minor
-
Resolution: Unresolved
-
2.7.0, 3.2.1
-
None
-
Patch
Description
With the cluster size become more and more big.The cost time on Reduce fetch Map's result from NodeManager become more and more long.We often see the WARN logs in the reduce's logs as follow.
2020-06-19 15:43:15,522 WARN fetcher#8 org.apache.hadoop.mapreduce.task.reduce.Fetcher: Failed to connect to TX-196-168-211.com:13562 with 5 map outputs
java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:735)
at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1587)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
at org.apache.hadoop.mapreduce.task.reduce.Fetcher.verifyConnection(Fetcher.java:434)
at org.apache.hadoop.mapreduce.task.reduce.Fetcher.setupConnectionsWithRetry(Fetcher.java:400)
at org.apache.hadoop.mapreduce.task.reduce.Fetcher.openShuffleUrl(Fetcher.java:271)
at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:330)
at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:198)
We check the NodeManager server find that the disk IO util and connections became very high when the read timeout happened.We analyze that if we have 20,000 maps and 1,000 reduces which will make NodeManager generate 20 million times IO stream operate in the shuffle phase.If the reduce fetch data size is very small from map output files.Which make the disk IO util become very high in big cluster.Then read timeout happened frequently.The application finished time become longer.
We find ShuffleHandler have IndexCache for cache file.out.index file.Then we want to change the small IO to big IO which can reduce the small disk IO times. So we try to cache all the small file data(file.out) in memory when the first fetch request come.Then the others fetch request only need read data from memory avoid disk IO operation.After we cache data to memory we find the read timeout disappeared.