|
OK. my wrong interpretation about backoff. The reducer succeeded eventually.The build was off hadoop-0.17 trunk on thursday. How were the events of maps are delivered? Can you check the JT and the TT logs to find out for which map TIP was the reducer waiting for and what exactly happened to the TIP (from the JT logs). There could be a task failure or lost TT and the TIP might have got delayed/re-executed.
The events are stored in the jobtracker and fetched by the tasktrackers. This frequency of polling for map completion events is same as the heartbeat-interval (which depends on the cluster size). For e.g., if cluster size is of 500 nodes it is going to be 10 seconds. Now the reason for the order of minutes delay in getting map completion events could be that the map is not complete yet (it's still in COMMIT_PENDING or RUNNING), or, the JobTracker is busy and is discarding RPCs. To ascertain the latter, you should take a look at the reducer's host tasktracker logs.
In this particular case, all the maps had finished for sure since lots of other reducers had finished. What is the configuration (number of nodes, number of maps/reducers, number of jobs running simultaneously etc).
It seems that the log info is the main cause of confusion. This is what we think has happened as per the logs
1) The reducer gets the task completion event for a bunch of maps and schedules them. 2) All the map outputs get successfully copied except one. 3) Assume that the jetty that was supposed to serve the remaining map's output is busy. 4) After 3 mins the attempt fails, gets retried and succeeds. 3min is the timeout for a fetch attempt. This also explains the 2 min wait mentioned above. In the first 1 min other map outputs are fetched (i.e overlapped). In the remaining 2 mins (before timeout) the reducer is just waiting for the last map's output. The 'need 1 map output' info in the reducers logs should also mention how many of them are in progress. Also note that there were lot many jobs running simultaneously.
Attaching a patch that makes the log information (regarding remaining maps) clearer.
Amar, I think it is better to have more tries to connect with smaller timeout (say 30 secs) than fewer tries with large timeout (e.g. 3 minutes). Does 60 seconds look like a good compromise (that's used in many places in the code). Also it will be nice if we can tweak the backlog argument of jetty's listener to have a value of 128 (if not higher).
I am not sure if it was successful because of the 3 min timeout or if some smaller value would do. This needs testing. I think it makes sense from the utilization point of view to have a smaller timeout. We free up a thread sooner and it can potentially successfully fetch from some other host. This needs to be benchmarked. But it also means that we need to keep an eye on the self-healing aspect - we kill reducers after they fail to fetch for a certain number of times (and connection establishment failure is a sign of failure currently). We might end up killing reducers sooner than we do it today.
[For killing reducers, we probably should move to a model where we look at the global picture and use all information before killing a reducer (move this logic entirely to the JobTracker). So in the case of map output fetch failures the JT can decide whether to kill a reducer or not based on which map outputs the reducer is failing to fetch, and, whether those map nodes are healthy, etc.] From timing point view, the following two are equivalent: connect( , N_units_timeout); lastException=null; lastTime=-1; for (int i = 0; i < N; i++) { try { connect( , N_units_timeout); break; } catch (IOException e) { lastException = e; lastTime = i; } } if (lastTime==N-1) throw lastException; But the second one has a much stronger liveness. the connect statement in the second one should use unit_timeout, not N_units_timeout) Speaking of failing reducer because of failing to fetch map output, we got to do some careful analysis here. Also, we should differentiate between the early stage of shuffling (where the reducer may have thousands of map outputs to fetch)
fetch a lot of different map outputs. In the first case, it is better to re-execute the map. That's how the model works in case of maps - if too many reducers complain about fetch failure for a particular map, the map is killed. The change here should be to also consider the host the map ran on otherwise we run into issues like HADOOP-2175 (https://issues.apache.org/jira/browse/HADOOP-2175?focusedCommentId=12582425#action_12582425 Runping, could you please try things out with this patch? This implements the pseudo code you proposed..
Lot of reducers failed with the following message: Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out. I see a lot of the following exceptions in the log: 2008-04-04 13:50:03,796 WARN org.apache.hadoop.mapred.ReduceTask: task_200804041304_0005_r_000000_2 copy failed: task_200804041304_0005_m_000181_0 from xxxx.com Did you also change the timeout for read? what is the value for Exceeded MAX_FAILED_UNIQUE_FETCHES? Anyhow, we need to revisit the policy for failing a reducer during shuffling.
No.
5 (as in trunk)
I think 3min read timeout and exponential backoff work well. But yes it needs to be reworked (moving some logic to JT etc). Amar, in getInputStream, you set the read timeout to 30, which is not we want to have now. BTW, what is the unit for the timeout value? second or millisecond?
+1
Its milliseconds. Will change it and upload a patch soon. Thanks. Then 30 milliseconds timeout is too short for connection setup. Runping, could you please try the patch now. Incorporated the changes. The latest patch is here
You have to handle the case when timeout value becomes negative.
actually, I think the getInputStream method has logic error. I think we need to guard against 2 conditions
1) unit-timestamp > total-timestamp : might lead to negative values 2) unit-timestamp = 0 : infinite loop private variable unitReadTimeout seems never used. Otherwise, looks good. unitReadTimeout is used in the current api for MapOutputLocation.getFile(). I have overloaded MapOutputLocation.getFile() to accept read-timeouts too. In the default case unitReadTimeout is used.
The "private final static" fields should be in all caps.
Attaching a patch that Incorporates Devaraj's comments.
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12380077/HADOOP-3130-v3.1.patch against trunk revision 645773. @author +1. The patch does not contain any @author tags. tests included -1. The patch doesn't appear to include any new or modified tests. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new javac compiler warnings. release audit +1. The applied patch does not generate any new release audit warnings. findbugs +1. The patch does not introduce any new Findbugs warnings. core tests +1. The patch passed core unit tests. contrib tests +1. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2226/testReport/ This message is automatically generated. Overall looks good. A minor point. Since UNIT_CONNECT_TIMEOUT is private final, the following code segment seems redudant: + if (UNIT_CONNECT_TIMEOUT <= 0) { + throw new IOException("Invalid unit-timeout " + + "[unit-timeout = " + UNIT_CONNECT_TIMEOUT + + " ms]"); + } else { Also, you need to test whether the ioe is due to connection timeout. catch (IOException ioe) { + // update the total remaining connect-timeout + connectionTimeout -= unit; [code}
The reason for doing the check is that unit-connect-timeout = 0 and total-timeout > 0 will result into infinite loop. Since users can change unit-connect-timeout (and recompile), I think its safe to guard against such cases and fail early.
What should be the right behaviour in case of non connection-timeout exceptions? Surely retrying (w/o any penalty) is not a good option since that will lead to longer waits (may be infinite).
bg Since users can change unit-connect-timeout (and recompile),
How can you possibly prevent the problem caused by user changing code and recompile? In case of exception that is not connection timeout, I think the right behavior is to re-throw the exception.
+1, removed.
I think there is no good way of knowing whether its a connection-timeout exception or not. So keeping it as it is. I just committed this. Thanks Amar and Runping!
Integrated in Hadoop-trunk #463 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/463/
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
This looks like the reducer isn't getting the event for one map from its host tasktracker. If it had backed off, you would have seen non-zero "slow hosts".
Did the reducer finally succeed in getting the map output? Which version of hadoop are you on?