Issue Details (XML | Word | Printable)

Key: MAPREDUCE-93
Type: Bug Bug
Status: Open Open
Priority: Major Major
Assignee: Devaraj Das
Reporter: Runping Qi
Votes: 0
Watchers: 8
Operations

If you were logged in you would be able to see more operations.
Hadoop Map/Reduce

Job Tracker should prefer input-splits from overloaded racks

Created: 09/Oct/07 02:08 PM   Updated: 20/Jun/09 07:50 AM
Return to search
Component/s: None
Affects Version/s: None
Fix Version/s: None

Time Tracking:
Not Specified

Issue Links:
Reference
 


 Description  « Hide
Currently, when the Job Tracker assigns a mapper task to a task tracker and there is no local split to the task tracker, the
job tracker will find the first runable task in the mast task list and assign the task to the task tracker.
The split for the task is not local to the task tracker, of course. However, the split may be local to other task trackers.
Assigning the that task, to that task tracker may decrease the potential number of mapper attempts with data locality.
The desired behavior in this situation is to choose a task whose split is not local to any task tracker.
Resort to the current behavior only if no such task is found.

In general, it will be useful to know the number of task trackers to which each split is local.
To assign a task to a task tracker, the job tracker should first try to pick a task that is local to the task tracker and that has minimal number of task trackers to which it is local. If no task is local to the task tracker, the job tracker should try to pick a task that has minimal number of task trackers to which it is local.

It is worthwhile to instrument the job tracker code to report the number of splits that are local to some task trackers.
That should be the maximum number of tasks with data locality. By comparing that number with the the actual number of
data local mappers launched, we can know the effectiveness of the job tracker scheduling.

When we introduce rack locality, we should apply the same principle.



 All   Comments   Work Log   Change History   Subversion Commits      Sort Order: Ascending order - Click to sort in descending order
eric baldeschwieler added a comment - 11/Jan/08 10:41 PM - edited
An ideal solution would maintain some sort of prioritized list of maps / node / rack so that we execute work first that is unlikely to find another efficient location to execute.

It would also make sense to place some no local work early, since these tasks run slowly, on nodes that are likely to run out of local work relatively early.

One could also pay attention to IO load on each source node...

At a minimum we should track maps that have no local option and schedule them first when a node has no local option. (As doug cutting suggested in HADOOP-2560)


Owen O'Malley added a comment - 07/Feb/08 09:58 PM
I've thought about this one, and I think that after HADOOP-1985, we should do the following:

For each rack and job:
Keep the number of runnable (but not running) input splits that are local to that rack
Keep the total number of map slots in each rack

When we need to assign a non-rack-local task, we should find the rack that has the highest ratio of runnableSplits / mapSlots and take a task from that rack. Clearly as a special case, any input split on a rack with no task trackers will be preferred, but it will also take input splits from the most overloaded racks.


eric baldeschwieler added a comment - 08/Feb/08 04:36 AM
of course since blocks are on multiple racks, pulling blocks from overloaded racks may be a bad strategy... Why not execute them locally on another rack...

Also in the interest of IO interleaving, you want to do some amount of off rack reading early and not wait until the job is guarantied to be the long pole.

My bet is that some amount of pre planning is going to prove necessary to get a semi optimal plan. Maybe generating a sorted list of blocks to choose per spindle on a node and than one per rack and then a final remote list. That plus some sort of % off rack target might be good. You could store this very efficiently or on disk...


Amar Kamat added a comment - 08/Feb/08 12:22 PM
I assume that this priority is valid across all the tasks, i.e we should also prefer data-local speculation over remote ones if at all there is a cached task that requires speculation else fall back to the current strategy.
Scenario
Hosts : Cached Tasks
H1 : T1, T2
H2 : T2, T3
H3 : T3, T4
H4 : T4

Stages

1. H1,H2,H3,H4 ask for task and get T1,T2,T3,T4 respectively.
2. H2, H4 are slow and require speculation
3. H3 finished and asks for more, gets T2
4. H1 finishes and asks for more, gets T4

Ideally H3 should get T4 and H1 should get T2, no?
So, the algorithm would be

1. Find a Runnable && ~Running task
    1.1 Scan the cache but maintain the runnable local tasks.
    1.2 Scan all the tasks to find out a task that has the lowest number of data-local trackers (and also some load/rack/io/map-slots considerations).
2. Find a task that has failed on all machines // fail early
3. Find a task for speculation
    1.1 Check if there is a local task that can be speculated
    1.2 Scan all the tasks with lowest number of data-local trackers (and also some load/rack/io/map-slots considerations).

Thoughts?


Amar Kamat added a comment - 08/Feb/08 12:37 PM

In general, it will be useful to know the number of task trackers to which each split is local.

Cant we just have a count of the maximum number of trackers that are having the split in the TIP itself?


Arun C Murthy added a comment - 10/Feb/08 05:37 PM

1.2 Scan all the tasks to find out a task that has the lowest number of data-local trackers (and also some load/rack/io/map-slots considerations).

1.2 Scan all the tasks with lowest number of data-local trackers (and also some load/rack/io/map-slots considerations).

Uh, both are very expensive to do on every heartbeat (i.e. the inner loop) isn't it?!

The reasoning behind Owen's proposal considering the 'ratio' of runnableSplits / mapSlots is to get around the case where there are very few task-trackers in a rack. E.g. Lets say there are 200 splits on a rack1 with 10 task-trackers (4 slots each) on it, and 100 splits on rack2 with 2 task-trackers... then the ratio penalizes rack2 rather than rack1, which is the right call to make.


Amar Kamat added a comment - 11/Feb/08 08:34 AM

Uh, both are very expensive to do on every heartbeat (i.e. the inner loop) isn't it?!

I should have mentioned it earlier. I was trying to link HADOOP-2119 and HADOOP-2014 since HADOOP-2119 deals with JobInProgress.findNewTask(). The algorithm mentioned above is for JobInProgress.findNewTask(). I got Owen's point thats why I mentioned

(and also some load/rack/io/map-slots considerations)

Just wanted to make sure that we also do the same for speculation. From what I followed, the discussion is mainly for Runnable && ~Running.


Owen O'Malley added a comment - 14/Feb/08 12:32 AM
I think that Eric's proposal of making a reasonable guess at the start of execution sounds good. How about, for each rack we calculate:

rack load(R) = min(1000, # splits local to rack R / # map slots in rack R)

where higher numbers are more "overloaded" racks. Then each split can be given a load score by something like:

split load = avg(rack load of each replica)

and at job creation, we sort a list on the split load (highest to lowest) for all of the map tips. When we need a non-local task, we take from the front of the list.

Thoughts?


Amar Kamat added a comment - 14/Feb/08 05:54 AM
Makes sense. I had some similar thoughts but not on task ordering (initially) but on task scheduling after cache miss (at rack level), see HADOOP-2812.
I opened a new issue since this issue explicitly specifies rack.

On similar lines we can have
1) machine load = f(machine) = some_function_of(num-splits-local) ... useful in intra-rack scheduling,
f should give some indication of the expected time to process all the maps i.e
f(machine) = (num-local-splits - num-processed ) * avg-processing-time / MAX
initial value of avg-processing-time = MAX

2) split-load = s(split) = min(loads of machine having this split locally)

3) rack-load = r(rack) = max( load of splits local to the rack)/ num-map-slots .. useful in inter-rack scheduling
This gives priority to the rack with the highest loaded split.
avg can be used instead of max in (3) which will give priority to the rack with highest avg load.

Would this be a better metric?
Thoughts?


eric baldeschwieler added a comment - 14/Feb/08 07:07 AM
Sorry amar, maybe I don't follow. Seems complicated. We can implement and load test a few strategies using gridmix.

Owen, we should pick tasks on each node and each rack using the same metric (split load) as well. All lists should be sorted by it?

Rather than having another list to maintain, we could perhaps just keep the rack lists (and the node lists recursively) in heaps perhaps? The heap is ordered by split load and allows you to quickly choose the rack with the task with the highest split load. Less to maintain that way perhaps.


Owen O'Malley added a comment - 14/Feb/08 08:43 AM
Amar, the reason to focus on racks instead of nodes is that the read time for rack local is just a bit slower than node local. Since there are a bunch of nodes in each rack, the decisions are more stable and more efficient.

Eric, it isn't clear to me that split load is more important than split size when we are choosing between local tasks. I guess in the typical case, split load is better, because almost all of the splits are the block size. By prioritizing on split load, we should substantially increase the locality of most jobs.

When I was talking about the list, I assume it is really just part of the data structure in from HADOOP-2119.


eric baldeschwieler added a comment - 14/Feb/08 07:10 PM
At some point we'll need to put a lot more work into this! How to trade off rack vs local vs size is very interesting.

That said, when you look at our current execution profiles, we have long tails because maps are not running locally. Anything we can do to reduce that should lead to speedups.

We should run some experiments. We should not neglect the one rack case, where size should probably dominate. Maybe we can express rack load as the probability that a block will execute locally and then weight that by the cost to ship bytes? This varies of course based on your network... We could choose some constants for on and off rack to get started.