Issue Details (XML | Word | Printable)

Key: HADOOP-3130
Type: Bug Bug
Status: Closed Closed
Resolution: Fixed
Priority: Major Major
Assignee: Amar Kamat
Reporter: Runping Qi
Votes: 0
Watchers: 0
Operations

If you were logged in you would be able to see more operations.
Hadoop Common

Shuffling takes too long to get the last map output.

Created: 29/Mar/08 12:37 AM   Updated: 22/Aug/08 07:50 PM
Component/s: None
Affects Version/s: None
Fix Version/s: 0.18.0

Time Tracking:
Not Specified

File Attachments:
  Size
Text File Licensed for inclusion in ASF works HADOOP-3130-v2.patch 2008-04-05 05:28 PM Amar Kamat 3 kB
Text File Licensed for inclusion in ASF works HADOOP-3130-v2.patch 2008-04-03 07:22 PM Amar Kamat 3 kB
Text File Licensed for inclusion in ASF works HADOOP-3130-v3.1.patch 2008-04-14 05:05 PM Amar Kamat 6 kB
Text File Licensed for inclusion in ASF works HADOOP-3130-v3.2.patch 2008-04-15 09:14 AM Amar Kamat 6 kB
Text File Licensed for inclusion in ASF works HADOOP-3130-v3.patch 2008-04-08 06:19 AM Amar Kamat 6 kB
Text File Licensed for inclusion in ASF works HADOOP-3130.patch 2008-03-31 10:15 AM Amar Kamat 0.8 kB
Text File Licensed for inclusion in ASF works shuffling.log 2008-03-29 01:29 AM Runping Qi 53 kB

Hadoop Flags: Reviewed
Resolution Date: 16/Apr/08 01:43 PM


 Description  « Hide

I noticed that towards the end of shufflling, the map output fetcher of the reducer backs off too aggressively.
I attach a fraction of one reduce log of my job.
Noticed that the last map output was not fetched in 2 minutes.



 All   Comments   Work Log   Change History   Subversion Commits      Sort Order: Ascending order - Click to sort in descending order
Devaraj Das added a comment - 29/Mar/08 12:22 PM - edited
Runping, from the logs it is clear that the backoff strategy hasn't kicked in. I see the following lines repeating over and over again:
2008-03-29 00:24:16,243 INFO org.apache.hadoop.mapred.ReduceTask: task_200803282211_0482_r_000143_0 Need 1 map output(s)
2008-03-29 00:24:16,245 INFO org.apache.hadoop.mapred.ReduceTask: task_200803282211_0482_r_000143_0: Got 0 new map-outputs & 0 obsolete map-outputs from tasktracker and 0 map-outputs from previous failures
2008-03-29 00:24:16,245 INFO org.apache.hadoop.mapred.ReduceTask: task_200803282211_0482_r_000143_0 Got 0 known map output location(s); scheduling...
2008-03-29 00:24:16,245 INFO org.apache.hadoop.mapred.ReduceTask: task_200803282211_0482_r_000143_0 Scheduled 0 of 0 known outputs (0 slow hosts and 0 dup hosts)

This looks like the reducer isn't getting the event for one map from its host tasktracker. If it had backed off, you would have seen non-zero "slow hosts".

Did the reducer finally succeed in getting the map output? Which version of hadoop are you on?


Runping Qi added a comment - 29/Mar/08 01:32 PM

OK. my wrong interpretation about backoff.

The reducer succeeded eventually.The build was off hadoop-0.17 trunk on thursday.

How were the events of maps are delivered?
If the reducer did not get the event for one map quickly,
could it be due to some problem with the job tracker or the task tracker or both?


Amar Kamat added a comment - 30/Mar/08 08:28 AM
Can you check the JT and the TT logs to find out for which map TIP was the reducer waiting for and what exactly happened to the TIP (from the JT logs). There could be a task failure or lost TT and the TIP might have got delayed/re-executed.

Devaraj Das added a comment - 30/Mar/08 08:36 AM
The events are stored in the jobtracker and fetched by the tasktrackers. This frequency of polling for map completion events is same as the heartbeat-interval (which depends on the cluster size). For e.g., if cluster size is of 500 nodes it is going to be 10 seconds. Now the reason for the order of minutes delay in getting map completion events could be that the map is not complete yet (it's still in COMMIT_PENDING or RUNNING), or, the JobTracker is busy and is discarding RPCs. To ascertain the latter, you should take a look at the reducer's host tasktracker logs.

Runping Qi added a comment - 30/Mar/08 04:02 PM

In this particular case, all the maps had finished for sure since lots of other reducers had finished.
There was no map task failure.


Amar Kamat added a comment - 31/Mar/08 04:02 AM
What is the configuration (number of nodes, number of maps/reducers, number of jobs running simultaneously etc).

Amar Kamat added a comment - 31/Mar/08 06:39 AM
It seems that the log info is the main cause of confusion. This is what we think has happened as per the logs
1) The reducer gets the task completion event for a bunch of maps and schedules them.
2) All the map outputs get successfully copied except one.
3) Assume that the jetty that was supposed to serve the remaining map's output is busy.
4) After 3 mins the attempt fails, gets retried and succeeds. 3min is the timeout for a fetch attempt.
This also explains the 2 min wait mentioned above. In the first 1 min other map outputs are fetched (i.e overlapped). In the remaining 2 mins (before timeout) the reducer is just waiting for the last map's output. The 'need 1 map output' info in the reducers logs should also mention how many of them are in progress.

Amar Kamat added a comment - 31/Mar/08 07:06 AM
Also note that there were lot many jobs running simultaneously.

Amar Kamat added a comment - 31/Mar/08 10:15 AM
Attaching a patch that makes the log information (regarding remaining maps) clearer.

Runping Qi added a comment - 01/Apr/08 02:52 AM

Amar,

I think it is better to have more tries to connect with smaller timeout (say 30 secs) than fewer tries with large timeout (e.g. 3 minutes).
I saw cases that the fetcher got connected successfully right after a connection timeout.


Devaraj Das added a comment - 01/Apr/08 05:45 AM
Does 60 seconds look like a good compromise (that's used in many places in the code). Also it will be nice if we can tweak the backlog argument of jetty's listener to have a value of 128 (if not higher).

Amar Kamat added a comment - 01/Apr/08 06:17 AM

I saw cases that the fetcher got connected successfully right after a connection timeout.

I am not sure if it was successful because of the 3 min timeout or if some smaller value would do. This needs testing.


Devaraj Das added a comment - 01/Apr/08 01:35 PM
I think it makes sense from the utilization point of view to have a smaller timeout. We free up a thread sooner and it can potentially successfully fetch from some other host. This needs to be benchmarked. But it also means that we need to keep an eye on the self-healing aspect - we kill reducers after they fail to fetch for a certain number of times (and connection establishment failure is a sign of failure currently). We might end up killing reducers sooner than we do it today.
[For killing reducers, we probably should move to a model where we look at the global picture and use all information before killing a reducer (move this logic entirely to the JobTracker). So in the case of map output fetch failures the JT can decide whether to kill a reducer or not based on which map outputs the reducer is failing to fetch, and, whether those map nodes are healthy, etc.]

Runping Qi added a comment - 01/Apr/08 02:34 PM

From timing point view,

the following two are equivalent:

connect( , N_units_timeout);
lastException=null;
lastTime=-1;
for (int i = 0; i < N; i++) {
try {
    connect( , N_units_timeout);
    break;
}
catch (IOException e) {
    lastException = e;
    lastTime = i;
}
}
if (lastTime==N-1) throw lastException;

But the second one has a much stronger liveness.


Runping Qi added a comment - 01/Apr/08 02:36 PM

the connect statement in the second one should use unit_timeout, not N_units_timeout)


Runping Qi added a comment - 01/Apr/08 04:12 PM

Speaking of failing reducer because of failing to fetch map output, we got to do some careful analysis here.
At least, we have to differentiate between the case of failing to fetch one map output numerous times and the case of failing to
fetch a lot of different map outputs. In the first case, it is better to re-execute the map.
In the second case, maybe it makes sense to consider to fail the reducer.

Also, we should differentiate between the early stage of shuffling (where the reducer may have thousands of map outputs to fetch)
and the late stage where only a few map outputs are left for fetching. In the early stage, it does not matter to fail to connect to a
few mappers, since the reducer has plenty to do. In the late stage, failing the reducer is much costly than re-execute the maps.


Devaraj Das added a comment - 01/Apr/08 04:55 PM

At least, we have to differentiate between the case of failing to fetch one map output numerous times and the case of failing to

fetch a lot of different map outputs. In the first case, it is better to re-execute the map.
In the second case, maybe it makes sense to consider to fail the reducer.

That's how the model works in case of maps - if too many reducers complain about fetch failure for a particular map, the map is killed. The change here should be to also consider the host the map ran on otherwise we run into issues like HADOOP-2175 (https://issues.apache.org/jira/browse/HADOOP-2175?focusedCommentId=12582425#action_12582425). The problem in the reducers case is this that the numbers are hardcoded and the decision to kill is totally local. So if a reducer fails to fetch 5 unique map outputs it kills itself. This should be augmented with your suggestion on accounting for the shuffle progress.


Amar Kamat added a comment - 04/Apr/08 12:07 PM
Runping, could you please try things out with this patch? This implements the pseudo code you proposed..

Runping Qi added a comment - 04/Apr/08 02:24 PM

Lot of reducers failed with the following message:

Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.

I see a lot of the following exceptions in the log:

2008-04-04 13:50:03,796 WARN org.apache.hadoop.mapred.ReduceTask: task_200804041304_0005_r_000000_2 copy failed: task_200804041304_0005_m_000181_0 from xxxx.com
2008-04-04 13:50:03,823 WARN org.apache.hadoop.mapred.ReduceTask: java.net.SocketTimeoutException: Read timed out
at sun.reflect.GeneratedConstructorAccessor3.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
at sun.net.www.protocol.http.HttpURLConnection$6.run(HttpURLConnection.java:1298)
at java.security.AccessController.doPrivileged(Native Method)
at sun.net.www.protocol.http.HttpURLConnection.getChainedException(HttpURLConnection.java:1292)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:948)
at org.apache.hadoop.mapred.MapOutputLocation.getInputStream(MapOutputLocation.java:125)
at org.apache.hadoop.mapred.MapOutputLocation.getFile(MapOutputLocation.java:165)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:815)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:764)
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:129)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:258)
at java.io.BufferedInputStream.read(BufferedInputStream.java:317)
at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:632)
at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:577)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1004)
... 4 more

Did you also change the timeout for read?

what is the value for Exceeded MAX_FAILED_UNIQUE_FETCHES?
Should that be some percentage of the total num of maps?

Anyhow, we need to revisit the policy for failing a reducer during shuffling.


Amar Kamat added a comment - 04/Apr/08 03:05 PM

Did you also change the timeout for read?

No.

what is the value for Exceeded MAX_FAILED_UNIQUE_FETCHES?

5 (as in trunk)

Should that be some percentage of the total num of maps?

I think 3min read timeout and exponential backoff work well. But yes it needs to be reworked (moving some logic to JT etc).


Runping Qi added a comment - 04/Apr/08 04:20 PM

Amar,

in getInputStream, you set the read timeout to 30, which is not we want to have now.
instead, you shoud do:
connection.setConnectTimeout(unit);
connection.setReadTimeout(timeout);

BTW, what is the unit for the timeout value? second or millisecond?


Amar Kamat added a comment - 04/Apr/08 04:48 PM

connection.setReadTimeout(timeout);

+1

BTW, what is the unit for the timeout value? second or millisecond?

Its milliseconds. Will change it and upload a patch soon. Thanks.


Runping Qi added a comment - 04/Apr/08 05:43 PM

Then 30 milliseconds timeout is too short for connection setup.

Amar Kamat added a comment - 05/Apr/08 05:31 PM
Runping, could you please try the patch now. Incorporated the changes. The latest patch is here

Runping Qi added a comment - 05/Apr/08 07:34 PM
You have to handle the case when timeout value becomes negative.

Runping Qi added a comment - 05/Apr/08 08:01 PM

actually, I think the getInputStream method has logic error.
You should update timeout when catching exception, not the other way around.
The easist way to implement the logic is to measure the elapse time difference when you catch the exception.
If the elapse time is bigger than the given timeout, then throw the exception.


Amar Kamat added a comment - 07/Apr/08 05:16 AM
I think we need to guard against 2 conditions
1) unit-timestamp > total-timestamp : might lead to negative values
2) unit-timestamp = 0 : infinite loop

Runping Qi added a comment - 08/Apr/08 06:28 AM

private variable unitReadTimeout seems never used.

Otherwise, looks good.


Amar Kamat added a comment - 08/Apr/08 07:56 AM
unitReadTimeout is used in the current api for MapOutputLocation.getFile(). I have overloaded MapOutputLocation.getFile() to accept read-timeouts too. In the default case unitReadTimeout is used.

Devaraj Das added a comment - 11/Apr/08 01:44 PM
The "private final static" fields should be in all caps.

Amar Kamat added a comment - 14/Apr/08 05:05 PM
Attaching a patch that Incorporates Devaraj's comments.

Hadoop QA added a comment - 14/Apr/08 07:40 PM
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12380077/HADOOP-3130-v3.1.patch
against trunk revision 645773.

@author +1. The patch does not contain any @author tags.

tests included -1. The patch doesn't appear to include any new or modified tests.
Please justify why no tests are needed for this patch.

javadoc +1. The javadoc tool did not generate any warning messages.

javac +1. The applied patch does not generate any new javac compiler warnings.

release audit +1. The applied patch does not generate any new release audit warnings.

findbugs +1. The patch does not introduce any new Findbugs warnings.

core tests +1. The patch passed core unit tests.

contrib tests +1. The patch passed contrib unit tests.

Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2226/testReport/
Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2226/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2226/artifact/trunk/build/test/checkstyle-errors.html
Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2226/console

This message is automatically generated.


Runping Qi added a comment - 14/Apr/08 07:58 PM

Overall looks good.

A minor point. Since UNIT_CONNECT_TIMEOUT is private final, the following code segment seems redudant:

+      if (UNIT_CONNECT_TIMEOUT <= 0) {
+        throw new IOException("Invalid unit-timeout "
+                              + "[unit-timeout = " + UNIT_CONNECT_TIMEOUT 
+                              + " ms]");
+      } else {

Runping Qi added a comment - 14/Apr/08 10:51 PM

Also, you need to test whether the ioe is due to connection timeout.


catch (IOException ioe) { + // update the total remaining connect-timeout + connectionTimeout -= unit; [code}


Amar Kamat added a comment - 15/Apr/08 04:46 AM

A minor point. Since UNIT_CONNECT_TIMEOUT is private final, the following code segment seems redudant: ...

The reason for doing the check is that unit-connect-timeout = 0 and total-timeout > 0 will result into infinite loop. Since users can change unit-connect-timeout (and recompile), I think its safe to guard against such cases and fail early.

Also, you need to test whether the ioe is due to connection timeout. ...

What should be the right behaviour in case of non connection-timeout exceptions? Surely retrying (w/o any penalty) is not a good option since that will lead to longer waits (may be infinite).

  • One way would be to decrement the total-time left (so that the loop termination is guaranteed) and LOG the type of exception encountered. That is treat it like a connection-timeout exception.
  • A bit more complex way would be to discriminate the penalty incurred in each case. For example, decrement unit-connect-timeout/2 in case of non connect-timeout exceptions and decrement unit-connect-timeout otherwise.
  • Another more complex way would be to tolerate some failures (w/o penalty) for the non-connect-timeout exceptions.

    For now I think its okay to keep it simple. Note that the reducer will not get killed if one meta-connect attempt fails, it requires a bunch of them.


Runping Qi added a comment - 15/Apr/08 05:27 AM
bg Since users can change unit-connect-timeout (and recompile),

How can you possibly prevent the problem caused by user changing code and recompile?

In case of exception that is not connection timeout, I think the right behavior is to re-throw the exception.


Amar Kamat added a comment - 15/Apr/08 09:14 AM
Removed the check.

Amar Kamat added a comment - 15/Apr/08 09:18 AM

A minor point. Since UNIT_CONNECT_TIMEOUT is private final, the following code segment seems redudant: ...

+1, removed.

In case of exception that is not connection timeout, I think the right behavior is to re-throw the exception.

I think there is no good way of knowing whether its a connection-timeout exception or not. So keeping it as it is.


Runping Qi added a comment - 15/Apr/08 02:35 PM

+1


Devaraj Das added a comment - 16/Apr/08 01:43 PM
I just committed this. Thanks Amar and Runping!

Hudson added a comment - 17/Apr/08 12:11 PM