|
By the way, sorry Doug, on the long description in this Jira. I'd already created the Jira when you suggested having shorter description fields in HADOOP-3421. Next time.
I'm attaching the first patch (3445.1.patch). This is a partial patch that has support for:
The purpose of this patch is to get the basic code reviewed, and there is a non-trivial amount of it. This is not a complete patch. The following remains to be done:
This patch assumes that the patch for Here's some explanation of what this patch incorporates:
Please note that the algorithms in the code are a bit different than what was detailed in the description of this Jira as they've evolved over time. Please also note that these are not the best algorithms, and that it is assumed that over time, we will get a lot better at refining them. But they enable us to get started. Vivek, this is looking good. Some early comments, I'll post more once I take a detailed look at the core scheduling algorithms:
1. I'd suggest we move the queue-related infrastructure to it's own files/packages. E.g. QueueScheudlingInfo (should we just call in Queue?). This will help ensure that the already bloated JobTracker...
There will be some non-trivial refactoring of the code to support
That's a valid suggestion. But, state machines are, in general, hard to code and understand, and I personally avoid them if I can. I don't see a need for a state machine here - we have 5 distinct sets of queues that we compute when we redistribute capacity, and we only compute these sets periodically and only for the purpose of redistribution, so state machines seem like an overkill. We also don't care what the state of a queue was, during the last redistribution. You simply drop a queue into one or more of the 5 buckets, then you transfer/reclaim capacities between queues in various buckets. State machines don't seem like a natural fit. If you disagree, what would help is if you can point out sections of the code which are hard to understand and perhaps provide examples of how DFAs would simplify either writing or reading the code, or both. I spent time on the core scheduling algorithms...
1. The core algorithms seem to be coming along fairly well. Overall we have 2 kinds of scheduling algorithms: one for balancing needs/capacities of the queues and other for picking a queue and then a task from a job of that queue. We just need to test them both, especially the former, to make sure they get baked in. Thanks Arun.
What exactly are you worried about, in terms of maintainability? Will the code be hard to maintain? How so? Regarding refactoring, can be you be more specific? the algorithm for redistributing capacity has a few steps. I've placed some code in helper methods. How differently do you recommend this code be refactored?
The code to maintain the QueueSchedulingInfo structures actively is a bit complex. You need to catch every instance where tasks are updated. Building the structures only when needed is a nice compromise, IMO. They're pretty quick to build, and the code's a lot smaller and focussed in one place. Plus, you build them when you need them, so your algorithms are still correct. Furthermore, you don't need to track all events - you only need to keep track of some cumulatiev numbers, so you might actually do less computation if you build them lazily. Is there a specific reason why you'd like to see information being maintained accurately rather than being rebuilt? Having said this, things might change once we look at porting this to 3412.
Fair point. These really are indexes into an array, and its' extremely unlikely we'll add more enum values. I suspect they will become private to the scheduler, so this may be a moot issue. I'm hoping we can use the 'taskUpdated' interface offered by
Attaching a new patch (3445.2.patch). I've refactored the scheduler code to support
This patch also implements functionality to reclaim resources within a fixed period of time by preempting tasks. This is done through the TaskScheduler3421.ReclaimedResource class. What's left:
ACLs are handled in another Jira. I still update queue scheduling info lazily, during heartbeat processing. These structures can be kept up-to-date during any task change, by providing and implementing a TaskChangeListener, but I haven't done that for two reasons:
I just started going through the patch, but a few preliminary comments:
1. Remove the 3445 from the class name of the scheduler. The name should be descriptive. 2. The JavaDoc of the class should describe the scheduler directly rather than referencing the jira. 3. killTasks needs to be in the scheduler, rather than the generic JobInProgress. Picking which tasks to kill, is clearly in the domain of the scheduler... 4. The data structures and code seem very complex. I think you need to introduce more abstractions to make the code easier to understand.
The current scheduler patch in 3445 is very unmaintainable and would not be straight forward to fix. The current design involves:
quota = guaranteed capacity for queue the current patch defines linked lists of queues:
Every 5 seconds, the lists are cleared and rebuilt on the current information and slots allocated to different queues. (There are linked lists both for maps and reduces and a lot of code that switches between working on maps and reduces based on values of 0 or 1.) The code to implement this is very complex and would be very difficult to maintain. I would propose a much simpler design that has a single priority queue where the key is running / quota for each queue. When a new slot is available, it is given to the queue with the lowest queue with pending > 0. Note that there is no different between allocated and running in this proposal, which makes it much easier. If you need to reclaim slots, you start from the queue with the largest allocated / quota and take slots until it is equal to the second highest, and then take slots from both of them. This guarantees that you will have a stable schedule and that the queues with the most excess capacity are reclaimed first. Having a stable scheduler that doesn't change the allocations every 5 seconds is very important for managing throughput. Other changes: As with
Attaching a patch (3445.3.patch).
The scheduler code has been simplified a fair bit, based on a couple of assumptions: I have also moved the scheduler code to contrib (under contrib/capacity-scheduler). Oh yeah, this scheduler is called 'Capacity Scheduler'. I've used the same format in the README file that is used in I need to add test cases. That will happen soon.
A single queue as you describe it, is too simplistic. we need to keep track of queues that have to reclaim capacity, as these queues should be given first preference when assigning a task. Patch 3445.3.patch keeps track of such queues, as well as queues based on #running/capacity. It does keep them in a single queue, though they're really two separate queues. >>>>2. when killing tasks, we can simplify things by killing as many as we can from the queue that is the most over capacity, then the next one, and so on. My earlier code used a more fair algorithm where it decided how many tasks would we kill from each queue, based on a ratio of their GCs. There are better ways to kill tasks, as noted in the documentation, but for now, this should be OK.
<<<<<< The preemption and allocation of tasks need to be consistent to avoid trashing. Hence in a sense I am suggesting you need to bring the "fairer" feature of you earlier algorithm to your new algorithm; my primary motivation is consistency and hence stability (ie no trashing) and secondary motivation is fairness. Sanjay, your comments are valid. Killing all tasks from one queue makes that queue more likely to receive a free slot faster than if we killed from more than one queue. As I've written earlier, there are better ways. However, this is not a big concern. For one, killing tasks will be a rare event, not something commonplace. In most cases, queues will be running at full capacity. Most, if not all, queues will have jobs waiting. Given a fair division of capacity among queues, it's very rare that we'll need to preempt. Furthermore, preemption happens only if a queue does not get enough resources within a period of time (usually, minutes). It seems very rare that the entire cluster is filled with tasks that take minutes to complete, and hence need to be killed.
What I'm really arguing here is that this solution, though far from perfect, is OK for now. It's highly unlikely to cause the system to thrash (there are just too many restrictions that need to happen before the same queue is subjected to a cycle of its tasks being run and killed, then run again, killed again, and so on). But your point is taken. We do need to make this better, and we'll get to it soon, once we finish with the more critical stuff. Submitting patch (3445.4.patch) with unit test cases, better documentation, and a few small bug fixes. I've used part of the unit test framework Matei developed in
This is not a justification. Killing too many tasks from the wrong queue is disruptive to the system and will lose progress, which is the the antithesis of the goal.
I don't believe this is true. The usage graph on the current cluster is straight at 100% utilization. Furthermore, it is almost certain that all organization's work will not be constant. In any case, it is a very dangerous assumption to base your design on.
Note, that I said a priority queue. You are currently using a list that is resorted on every use, which is much much less efficient. Vivek - some of the unit test code I took was from the test for the default scheduler in
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12389145/3445.4.patch against trunk revision 690142. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 4 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. -1 core tests. The patch failed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3144/testReport/ This message is automatically generated. You can use the new method TaskInProgress.getActiveTasks instead of
numRunningAttempts. TaskInProgress.killAnyTask isn't appropriate to add to the The default configuration for this scheduler should not be in the I don't understand the meaning of "getFirstQueue in JobInProgress.killTasks doesn't belong in the JobInProgress. It is That said, I think the approach is wrong. The queues should kill from The myDaddy field should be renamed to parent. We don't need to be Lots of the comments are block comments with "/////////////" that ReclaimedResource.countdown would be better expressed as the time of Chris and I were talking and he came up with a really simple approach that seems to have all of the properties of your preemption design.
You keep the priority queue keyed by: long deadlineForPreemption; float runningOverQuota; If a queue is either at or over its quota or has no pending tasks, the deadline is set to Long.MAX_VALUE. Therefore, you are guaranteed to satisfy all underserved queues in the order that will violate their SLA. If the deadline for the first item in the queue is < 2 * heartbeat, then you start shooting min(pending, quota - running) tasks from the most over quota queues. (Keeping the running/quota ratio balanced for the over extended queues.) This way you give priority to the most urgent under served queue and you know exactly when to start killing tasks. With the current patch, each queue can end up with a new ReclaimedResource object added to the list every 5 seconds for the entire SLA period. The one downside to this proposal is that if Q1 is under and gets a new job, and the timer starts at T1 and then gets another job at T2, we will make sure that they get up to quota (or pending = 0) before the timer from T1 expires rather than the T2 timer. This is a little more aggressive that yours, but the implementation is much simpler and easier to reason about. It isn't clear which of the semantics is actually better/fairer, but if we discover that it upsets lots of users, we could extend this model relatively easily.
You're mixing up issues here. A single queue as you suggest (it being a priority queue makes no difference) will not work. I have explained why, in an earlier comment. You need two conceptually separate collections - one for queues that need to reclaim capacity (which should be satisfied first), and one for the others (which can be sorted similar to what you've suggested). As for re-sorting the list that I have:
This was to be done by
This is a remnant of a change that was introduced earlier. Initially, the first queue was considered the default queue, but now we expect a queue called 'default' to be present. I have removed this method.
The code does do that. It picks the job that ran the last.
It is expressed as a deadline. It represents how many cycles of the thread need to happen. I only decrement once in a while to keep the counter low, and as indicated in my comments, that may also not be necessary. The latest patch (3445.5.patch) incorporates some of the feedback suggestions. I have also made a couple of bigger changes, as these seem to be causing some grief among folks.
I don't know why some core tests are failing, but it is not because of this patch. A couple of core tests for TestResourceManagerConf are failing. Will fix that shortly.
Attaching patch (3445.6.patch) which fixes some test cases that were failing earlier.
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12389326/3445.6.patch against trunk revision 691099. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 10 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. -1 core tests. The patch failed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3159/testReport/ This message is automatically generated. The test that is failing is an hdfs test, that should not have anything to do with this patch.
As the comment in the patch suggests, queueName should move from
JobInProgress to JobProfile. Actually, this has already been done in HADOP-3698, which we should probably go ahead and commit first. The waitingMaps and waitingReduces should be renamed to getRunningMapCache should return the map instead of the values. getRunningTaskWithLeastProgress should be moved to the capacity The ReclaimedResource list still has countdowns instead of Thanks for staying on top of this, Owen and Vivek.
I think you need to keep track of speculative tasks. JobInProgress keeps track of running tasks (runningMapTasks and runningReduceTasks). Running tasks include speculative tasks as well. JobInProgress also keeps track of total number of tasks (which is set initially), finished tasks, and failed tasks. So, if I want to compute the number of pending tasks in a job, it has to be #total-#running-#finished-#failed. Except that #running contains speculative tasks as well. So #pending has to be (#total-#running-#finished-#failed)+#speculative, because #total does not include speculative tasks. Optionally, I could add up the number of tasks in nonRunningMapCache and nonLocalMaps, perhaps.
If JobInProgress exposes APIs to return number of running and finished tasks, shouldn't the API for returning the number of pending tasks also be in JobInProgress? pendingMaps() shouldn't be any different than runningMaps() or finishedMaps(), right? New patch (3445.7.patch) submitted.
New patch (3445.8.patch) which fixes a bad log statement (which I had forgotten to correct earlier).
Is there a use case where the guarantee to a queue is different for map and reduce?
I think there should be a configured default for the queues that don't have an attribute set. mapred.capacity-scheduler.default.foo = 12 and any queue including the default will pick up that value, but the default queue can be changed separately. mapred.capacity-scheduler.queue.default.foo = 23 <property>
<name>mapred.capacity-scheduler.queue.default.minimum-user-limit-percent</name>
<value>100</value>
<description>The minimum percentage of the cluster which can
be utilized by a single user.
</description>
</property>
This is a confusing variable name. We really need a better one or at the very least a better documentation string. It also seems like WHEN_TO_KILL_TO_RECLAIM should be set to a multiple of JobTracker heartbeats, rather than a percentage of the expiry time. And it should be set to 3. A fraction of the expiry time is not necessarily enough time for the tasks to be killed.
I think the configuration file should be changed to reflect that it is specific to this scheduler and it should be a template like hadoop-site.xml is. So the resource-manager-conf.xml would become capacity-scheduler.xml.template and the build.xml would copy the capacity-scheduler.xml.template over to .xml, if it doesn't exist.
Since the code to reclaim capacity runs in a separate thread, I think what you mean is that it should start reclaiming tasks when there is just enough time for 3 heartbeats before the queue's SLA kicks in. That's fine, but it requires the hearbeat interval to be exposed to the scheduler, which means that the TaskTrackerManager interface needs to expose getNextHeartbeatInterval(), which currently is a private method in JobTracker. Changing the TaskTrackerManager interface will cause some other projects to fail. The test classes for the Fair Share Scheduler, for example, would fail. Do we really want to do that at this stage? And if it's OK to do that, am I supposed to make the changes to the code in the Fairshare Scheduler in this patch as well?
Vivek, I did change the test classes that were committed as part of the Fairshare scheduler when I introduced the getQueueManager API. I think it is OK to modify test case classes of other projects to keep trunk compiling and tests running. OK. Then I'll go ahead and change the TaskTrackerManager interface and update the affected files.
New patch (3445.10.patch) attached
What size cluster has the scheduler been tested on?
We need another run through findbugs. I think we should have a single configuration parameter that defines the queue capacity, rather than one for maps and one for reduces. You have a "new Integer(1)" that will likely cause a findbugs warning. It should either just use auto-boxing or use Integer.valueOf(1). Things that we can address if follow up bugs:
I didn't get any findbugs warning because of this line. There have been no findbugs warnings introduced by this patch.
You can configure different number of slots for Maps and Reduces per TT. It then stands to reason that the cluster capacity, and hence a queue capacity, can be different for maps and reduces. The ClusterStatus class keeps different values for max map and reduce tasks.
Which values are you referring to? They're all configurable, as far as I know, unless I've missed something.
Queue capacities are expressed as percentages of the grid capacities. Why would they be floats? Sanity checks are something we need to add more of.
A single queue capacity expressed as a percentage will convert to different absolute numbers of map and reduce slots, no?
Yes, but if you're configuring TTs to have different slots for maps and reduces, and hence allowing the cluster to have different numbers of map and reduce slots, then why take away that flexibility from queues? It makes no difference in the code, and I don't think it markedly simplifies configuration. We don't have enough use cases yet to see whether this flexibility will make a difference, but I don't see why we need to place that constraint. The purpose of allowing different numbers of map and reduce slots on TTs is to efficiently use resource on a node. What is the purpose of allowing different numbers in queues? A queue gets a fraction of the clusters resource. Having it get a different fractions of the map and reduce slots is just odd.
Having two different variables does not seem like flexibility to me. It feels like interface complexity whose merit is far from clear.
I'm referring to the default values: public static final String SCHEDULER_CONF_FILE = "capacity-scheduler.xml"; public static final int DEFAULT_GUARANTEED_CAPACITY_MAPS = 100; public static final int DEFAULT_GUARANTEED_CAPACITY_REDUCES = 100; public static final int DEFAULT_RECLAIM_TIME_LIMIT = 300; public static final int DEFAULT_MIN_USER_LIMIT_PERCENT = 100;
The capacities for some queues may be measured in 1/1000 of a cluster. Don't forget that 0.1% of 3000 machines is still 3 machines. If we can only assign capacity at 1% (30 machines) at a time, that is larger than many jobs need. The sanity checks are particularly important because the default guaranteed capacity is 100%. If you define two queues, and don't assign a capacity, it will be over.
Do you mean introducing another batch of config variables? Something like: mapred.capacity-scheduler.queue.{guaranteed-capacity, reclaim-time-limit, supports-priority, minimum-user-limit-percent} which would apply to any queue for which the values weren't specified.
Yes, it is for the queues that don't specify values. The sanity checks are pretty important. The defaults for queues are nice to have but not essential immediately. So yes they can be introduced in a subsequent JIRA, possibly even after 0.19, since it appears to be a back compatible change.
I have a modification of the patch that adds:
1. makes a single capacity parameter 2. makes the capacity a float 3. requires each capacity be between 0 and 100 4. requires the sum to be under 100 5. removes some unneeded imports It is working, but one of the unit tests was misconfigured. I'm trying to work through it. Otherwise, I'll get it tomorrow. Here is the updated patch. I figured out how to fix the test case that was configuring the cluster with 110% of the available capacity.
I forgot to fix the config and README to reflect the changed configuration parameters.
Added a new patch (3445.13.patch).
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
These multiple lists, plus the hashmap that maps queue names to each list, is really what the JobQueueManager component is, in the arch diagram in HADOOP-3444. One question is, should we have a separate class called JobQueueManager that manages the multiple lists and the hashmap, or should we just leave the hashmap as is in the JT code. While there are benefits to having a separate class, I personally prefer the latter approach - code changes will be minimal and we don't really need a level of encapsulation yet. Any preferences?