|
I've thought about this one, and I think that after
For each rack and job: When we need to assign a non-rack-local task, we should find the rack that has the highest ratio of runnableSplits / mapSlots and take a task from that rack. Clearly as a special case, any input split on a rack with no task trackers will be preferred, but it will also take input splits from the most overloaded racks. of course since blocks are on multiple racks, pulling blocks from overloaded racks may be a bad strategy... Why not execute them locally on another rack...
Also in the interest of IO interleaving, you want to do some amount of off rack reading early and not wait until the job is guarantied to be the long pole. My bet is that some amount of pre planning is going to prove necessary to get a semi optimal plan. Maybe generating a sorted list of blocks to choose per spindle on a node and than one per rack and then a final remote list. That plus some sort of % off rack target might be good. You could store this very efficiently or on disk... I assume that this priority is valid across all the tasks, i.e we should also prefer data-local speculation over remote ones if at all there is a cached task that requires speculation else fall back to the current strategy.
Scenario Hosts : Cached Tasks H1 : T1, T2 H2 : T2, T3 H3 : T3, T4 H4 : T4 Stages 1. H1,H2,H3,H4 ask for task and get T1,T2,T3,T4 respectively. 2. H2, H4 are slow and require speculation 3. H3 finished and asks for more, gets T2 4. H1 finishes and asks for more, gets T4 Ideally H3 should get T4 and H1 should get T2, no? 1. Find a Runnable && ~Running task 1.1 Scan the cache but maintain the runnable local tasks. 1.2 Scan all the tasks to find out a task that has the lowest number of data-local trackers (and also some load/rack/io/map-slots considerations). 2. Find a task that has failed on all machines // fail early 3. Find a task for speculation 1.1 Check if there is a local task that can be speculated 1.2 Scan all the tasks with lowest number of data-local trackers (and also some load/rack/io/map-slots considerations). Thoughts?
Cant we just have a count of the maximum number of trackers that are having the split in the TIP itself?
Uh, both are very expensive to do on every heartbeat (i.e. the inner loop) isn't it?! The reasoning behind Owen's proposal considering the 'ratio' of runnableSplits / mapSlots is to get around the case where there are very few task-trackers in a rack. E.g. Lets say there are 200 splits on a rack1 with 10 task-trackers (4 slots each) on it, and 100 splits on rack2 with 2 task-trackers... then the ratio penalizes rack2 rather than rack1, which is the right call to make.
I should have mentioned it earlier. I was trying to link
Just wanted to make sure that we also do the same for speculation. From what I followed, the discussion is mainly for Runnable && ~Running. I think that Eric's proposal of making a reasonable guess at the start of execution sounds good. How about, for each rack we calculate:
rack load(R) = min(1000, # splits local to rack R / # map slots in rack R) where higher numbers are more "overloaded" racks. Then each split can be given a load score by something like: split load = avg(rack load of each replica) and at job creation, we sort a list on the split load (highest to lowest) for all of the map tips. When we need a non-local task, we take from the front of the list. Thoughts? Makes sense. I had some similar thoughts but not on task ordering (initially) but on task scheduling after cache miss (at rack level), see HADOOP-2812.
I opened a new issue since this issue explicitly specifies rack. On similar lines we can have 2) split-load = s(split) = min(loads of machine having this split locally) 3) rack-load = r(rack) = max( load of splits local to the rack)/ num-map-slots .. useful in inter-rack scheduling Would this be a better metric? Sorry amar, maybe I don't follow. Seems complicated. We can implement and load test a few strategies using gridmix.
Owen, we should pick tasks on each node and each rack using the same metric (split load) as well. All lists should be sorted by it? Rather than having another list to maintain, we could perhaps just keep the rack lists (and the node lists recursively) in heaps perhaps? The heap is ordered by split load and allows you to quickly choose the rack with the task with the highest split load. Less to maintain that way perhaps. Amar, the reason to focus on racks instead of nodes is that the read time for rack local is just a bit slower than node local. Since there are a bunch of nodes in each rack, the decisions are more stable and more efficient.
Eric, it isn't clear to me that split load is more important than split size when we are choosing between local tasks. I guess in the typical case, split load is better, because almost all of the splits are the block size. By prioritizing on split load, we should substantially increase the locality of most jobs. When I was talking about the list, I assume it is really just part of the data structure in from At some point we'll need to put a lot more work into this! How to trade off rack vs local vs size is very interesting.
That said, when you look at our current execution profiles, we have long tails because maps are not running locally. Anything we can do to reduce that should lead to speedups. We should run some experiments. We should not neglect the one rack case, where size should probably dominate. Maybe we can express rack load as the probability that a block will execute locally and then weight that by the cost to ship bytes? This varies of course based on your network... We could choose some constants for on and off rack to get started. |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
It would also make sense to place some no local work early, since these tasks run slowly, on nodes that are likely to run out of local work relatively early.
One could also pay attention to IO load on each source node...
At a minimum we should track maps that have no local option and schedule them first when a node has no local option. (As doug cutting suggested in HADOOP-2560)