|
Opps. Sorry, I messed the description - I meant
Corrected typo in description on JIRA number.
This issue is related to what is discussed in HADOOP-2776 in the sense that the task tracker should decide how many tasks based on Here's a summary of the approach that is implemented in the patch, so we can have a discussion around it.
There are 2 requirements we are trying to address:
To handle the first requirement we obviously have to consider what happens when the job in the front of the scheduler's list does not match the tasktracker's memory availability. We considered the following choices:
In order to solve the second requirement, we need to match tasks to slots in the scheduler. One simplifying assumption made to achieve this is mentioned in the comments above. We assume that cluster wide, the default memory per slot is a fixed value, and specified in configuration. Note that this does not preclude heterogenity of cluster nodes. By tweaking the maximum memory on the tasktracker and the number of slots, it is possible to have the same value for all nodes in a cluster. Using this configuration variable, it is possible to map tasks to slots as follows: Clearly, this issue requires discussion and consensus. It likely will not make 0.19. But we are hoping we can reach a consensus on the approach. We need this for 0.19. The existing HOD set up allows users to submit high RAM jobs. Thus, the capacity scheduler (a HOD replacement) will actually regress this functionality.
I think that leaving the slot empty is probably correct. I'd suggest two additions:
1. If the task requires more memory than any TT, fail it immediately. 2. If the task requires more memory than the current TT, go to the next job. I don't think that forcing all of the slots across the cluster to have the same amount of memory makes sense. I think that taking the physical ram for the TT / # slots on that TT = size of slots on that TT. That would let you calculate the number of slots that would be required on that TT. How do you deal with map slots versus reduce slots? If you have 8GB of ram, can you run a 6GB map task? Or is 4GB the limit because the other 4GB is for reduces?
Actually, I think it makes a lot more sense to make this act like
Following an offline discussion with Owen, his proposal was the following:
Thinking about this, I think the only disadvantage with this approach is that a user who submits a job with high memory requirements could essentially block other users, atleast until his limit is hit. So, I would suggest we change the above proposal to not block, but instead move over to the next job. This way, a user with high RAM requirements cannot block other users, and cannot game the system in that way. Note that:
Can we agree on this ? I think it is important that we not starve jobs, so I think we should not take the next job's task, if the current job's task doesn't fit. I've also filed HADOOP-4306 to change the disk space monitoring the same way.
It probably makes sense in the future, to have a special case if the TaskTracker is job's task will never fit on this TaskTracker and let other low priority jobs use it. But, it should be a different jira. The consensus on the implementation is as follows:
We understand point 3 means that the system is less fair than it should (because users of high RAM jobs can cause more slots to go free than other jobs), but in the interest of keeping things simple, we will follow this approach for Hadoop 0.19, and take it up in the future. Note that if HADOOP-4306 is addressed, we will be handling scheduling w.r.to disk and memory uniformly. >A reasonable assumption to make while computing used capacity is to assume that for all TTs in a cluster, the amount of memory per slot is configured to be the same value
I am a little confused about the above statement. It is possible to have two different types of machine in the same cluster.... the only difference being the amount if memory on these types. Since the CPU capacity is the same, I would ideally configure both types of machines to have the same number of slots. However, the memory capacity per slot on one type of machine would be larger than the memory capacity per slot of the other type of machine. It would be nice if the JT/TT can compute the memory capacity per slot and then schedule tasks accordingly. Also, the JT scheduler can generate more affinity of reduce tasks to slots with larger memory-capacity-per-slot because reduce tasks possibly take more memory than map tasks.
Dhruba, agreed. This was also indicated in Owen's comments above. So, this assumption is no longer valid. We had this assumption to help us map tasks to slots easily. This in turn was to meet the 2nd requirement I'd put up above:
But as I described above, we would like to keep things simple, and not do this mapping for now. It is a little less fair, but we can try out how it works.
We do not differentiate specifications between memory requirements of map and reduce slots currently. Does this seem vital to have ? Attaching a patch.
TestHighRAMJobs needs a major rewrite because of the separate free memory values for map and reduce tasks. It doesn't even compile now; will work on that. Negative values for memory on TT or in job may adversely effect the working of this patch. Will investigate on that and see if it needs some fix. This patch can be reviewed irrespective of these two (side) issues, they will add code but won't change what is already done by the patch. The separate treatment of free memory for maps and for reduces makes it easy for us to extend the job configuration to be different for maps and reduces. It is an easy extension from here. But I don't know for sure whether this is a requirement or not, or if it is a requirement, whether we wish to do it in this patch or not.
Attaching a new patch. This
To kill an invalid job, the patch uses JobInProgress.kill() which will just initiate the killing but won't finalize jobStatus as killed. It waits for a cleanUp task to be scheduled and run to completion(successfully or not) before marking the jobStatus as killed.
So, this issue is blocked by I am also blocking this by
New patch.
Blocking this JIRA by Some comments:
JobConf:
CapacityTaskScheduler:
I need to review the changes to the testcases still. This patch is a big risky patch that isn't a bug fix and thus I don't think should go into 0.19.
The test cases look good. Just minor nits:
There is code such as in the patch:
+ if (!jip.isKillInProgress()) { + jip.kill(); + } Here, you need not check killingInProgress for the job, because a kill while killingInProgress will be no-op. And also if the user has to see the failure as framework killing the job, it should be jip.fail() (sothat job state is FAILED).
While working on this patch, we found out that we need to report free memory per number of map slots and reduce slots as two separate variables from the task tracker as part of the ResourceStatus object in TaskTrackerStatus. I think at least this must get into Hadoop 0.19, as otherwise, there may be backwards compatibility issues that come up later. Also, some documentation describing the feature should be removed, in order to not confuse users about the availability of the feature. Does this make sense ? After a detailed offline conversation with Owen, we decided to not do this for Hadoop 0.19. However, we also reviewed the current solution, and where we want to go, and filed
Here's a summary of what this patch should be doing.
TTs report the amount of free memory available on their node (as described in In order to make sure that no job asks for memory that is more than what a TT has available, we should have a cluster-wide limit on the amount of VM a job can ask for its tasks. If this limit is set, and a job asks for too much, the job should not be accepted by the JT in submitJob(). If the limit is not set, jobs cannot be rejected based on their memory requirements.
Irrespective of the method to specify the maximum limits on job requirements to prevent gaming of the system, either via a configuration or by calculating it dynamically, the job rejection should not be done by JT, but by our scheduler itself. Clearly, this maximum limit is owned by our scheduler. Now that we are doing this for 0.20, the memory configuration should be changed to be a fraction for physical RAM. Ie. mapred.tasktracker.memory.max-fraction which takes a float and computes the maximum. Operations teams don't want to specify the memory on each machine. They want to have a global configuration for all of the slaves.
It is important to report this total to the JT, so that it can be displayed and/or used by the scheduler. If you want to include the redundant information about the allocated information that is ok. But the max available should be sent too. Defining a cluster configurable limit for the max memory for any task is fine. Hi Owen, If the cluster has machines with different memory sizes (e.g. some TTs have 8GB memory while other TTs have 16GB), then specifying a memory percentage might not work well.
That is exactly the case that I'm worried about. It will be a pain to have some configs that say 9G and some that say 17G. It would be easier to configure 1.1 and let the TT scale it to the appropriate amount of ram.
Maybe we can specify an amount of memory to leave free (e.g. 512 MB), rather than a percentage?
I guess that would work, but in general it works better if we have ratios instead since they automatically scale as hardware improves. It would also need to support negative values, if you want to allocate more than ram (obviously into swap or assuming that the tasks won't actually use it). I suspect that in most cases, it would be negative, which seems less user-friendly.
In
Note that this value is suggested to be linked to total VM on the machine, not RAM. > I guess that would work, but in general it works better if we have ratios instead since they automatically
> scale as hardware improves. I disagree. Matei is right on. This value needs to be an offset of the total amount of memory on the machine ("hadoop may use all but 4g"). Percentages don't really work well here because any ops team worth its salt knows exactly how much they need to reserve for its own stuff, the OS, monitoring probes, etc., all the background processes that sort of run in the background as noise. That size is almost guaranteed to be a constant on similar gear with the same OS. [.. and if one is doing a radically heterogeneous cluster, they've got other problems besides this one!] Setting this to a percentage is actually going to leave memory on the table. In our real-world grids, every rack has a node with 16g phys ram in it with the rest of the nodes being 8g phys ram. All nodes have 24g of swap. So our numbers are 32g and 40g. Setting this to 87% (I think?--the fact that I'm not sure I have this value right should be another hint!) to reserve 4g of VM means that we lose 2g of mem that could be available to Hadoop on the 16g RAM nodes! > dhruba borthakur - 27/Oct/08 10:02 AM Hi Owen, If the cluster has machines with different memory sizes (e.g. some TTs have 8GB
> memory while other TTs have 16GB), then specifying a memory percentage might not work well. I too agree with Allen. If machines have different memory sizes, then specifying a percentage of memory might not work very well. I guess I'm ok with it as a delta from total virtual memory, although how to detect the virtual memory in a generic manner is an interesting question. Maybe as I proposed over in
Note that if we are using virtual memory, then we absolutely need a different configuration for the amount of virtual memory that we'd like to schedule to. We do not want the scheduler to put 4 10G tasks on a machine with 8G ram and 32G swap. That number should be based on RAM. So, I'd propose that we extend the plugin interface as: public abstract class MemoryPlugin { public abstract long getVirtualMemorySize(Configuration conf); public abstract long getRamSize(Configuration conf); } I'd propose that these values be the real values and that we have a configured offset for both values. mapred.tasktracker.virtualmemory.reserved (subtracted off of virtual memory) Jobs should then define a soft and hard limit for their memory usage. If a task goes over the hard limit, it should be killed immediately. The scheduler should only allocate tasks if Thoughts? Since the issue of dealing with memory-intensive and badly behaved jobs has spanned more than one Jira, here's the latest summary on the overall proposal (following some offline discussions).
The problem, as stated originally in There are two independent parts to solving this problem: monitoring and scheduling. Let's look at monitoring first. Monitoring We want to ensure that the sum total of virtual memory (VM) usage by all tasks does not go over a limit (call this the max-VM-per-node limit). That's really what brings down a machine. To detect badly behaved jobs, we want to associate a limit with each task (call this the max-VM-per-task limit) such that a task is considered badly behaved if its VM usage goes over this limit. Think of the max-VM-per-task limit as a kill limit. A TT monitors each task for its memory usage (this includes the memory used by the task's descendants). If a task's memory usage goes over its max-VM-per-task limit, that task is killed. This monitoring has been implemented in How do we specify these limits?
Note that the monitoring process can be disabled if mapred.tasktracker.virtualmemory.reserved is not present, or has some default negative value. Scheduling In order to prevent tasks using too much memory, a scheduler can ensure that it limits the number of tasks running on a node based on how much free memory is available and how much a task needs. The Capacity Scheduler will do this, though we cannot enforce all schedulers to support this feature. As per
Just as with the handling of VM, we may want to use a part of the RAM for scheduling TT tasks, and not all of it. If so, we can introduce a config value, mapred.tasktracker.ram.reserved, which indicates an offset (in MB?). The amount of RAM available to the TT tasks is then the total RAM on the machine, minus this offset. How do we get the total RAM on the machine? By the same plugin interface through which we obtain total VM. How do we specify a task's max-RAM-per-task limit? There is a system-wide default value, mapred.capacity-scheduler.default.ramlimit, expressed as a percentage. A task's default max-RAM-per-task limit is equal to the task's max-VM-per-task limit times this value. We may start by setting mapred.capacity-scheduler.default.ramlimit to 50 or 33%. In order to let individual jobs override this default, a job can set a config value, mapred.task.maxram, expressed in MB, which then becomes the task's max-RAM-per-task limit. Furthermore, as with VM settings, we want to prevent users from setting mapred.task.maxram to an arbitrarily high number and thus gaming the system. To do this, there should be a cluster-wide setting, mapred.task.limit.maxram, that limits the value of mapred.task.maxram. If mapred.task.maxram is set to a value higher than mapred.task.limit.maxram, the job should not run. Either this check can be done in the JT when a job is submitted, or a scheduler can fail the job if it detects this situation. The Capacity Scheduler, when it picks a task to run, will check if both the task's RAM limit and VM limit can be satisfied. If so, the task is given to the TT. If not, nothing is given to the TT (i.e., the cluster blocks till at least one TT has enough memory). We will not block forever because we limit what the task can ask for, and these limits should be set lower than the RAM and VM on each TT. In order to tax users on their job's requirements, we may charge them for what the value they set per task, but for now, there is no penalty associated with the value set for mapred.task.maxmemory by a user for a job. Open issues Based on the writeup above, I'm summarizing a few of the open issues (mostly minor):
On the scheduling, wouldn't it be nice if all schedulers could use this feature? One option there is to implement the policy in all the schedulers. But given this issue is targetted for 19.1, the other option is to do the check within the JobInProgress just like we do the check for blacklisted TTs (where we don't give a task to a blacklisted TT). Specifically, couldn't the JobInProgress.shouldRunOnTaskTracker method do this check and not assign the TT a task taking into account the memory parameters (all the information related to memory parameters are available at this point via TaskTrackerStatus and the jobconf of the job)?
One more line of argument could be that we are actually just doing greedy scheduling w.r.t the memory related parameters. So this base level greedy scheduling should be in a place that is in the code path of all schedulers, i.e., in the JobInProgress.shouldRunOnTaskTracker. If some scheduler tries to do something better than that, they always can do so since control is given to the scheduler code first (assignTasks). Thoughts? +1 on putting it in JobInProgress.shouldRunOnTaskTracker, this sounds like the right place for it if there aren't any technical obstacles.
While I agree that you want to make it easier for various schedulers to share common functionality, keep in mind that different schedulers may choose to behave differently if a TT does not have enough memory for a task. The Capacity Scheduler, for example, chooses to block, i.e., it prefers returning nothing to the TT, so that it doesn't starve a job with high-mem requirements. However, another Scheduler might choose to look at the next job, or find some other task to give the TT.
For this reason, you don't want to put the memory checks in shouldRunOnTaskTracker(). Schedulers will behave differently if a TT is blacklisted, versus if the TT doesn't have enough free memory. Now, you could argue that we can add a new method to JobInProgress, something like isMemoryAvailable(), that decides if the TT has enough free memory. You could call it from obtainNewMapTask() or obtainNewReduceTask(), but again, you'd have to modify what these methods return to the schedulers. If obtainNewMapTask, for example, returns no task, the Scheduler needs to know why. It can behave differently if there was no task to run, or if the TT was blacklisted, or if the TT didn't have any free memory. This will make things messy. I think there is a lot of scheduling code in the JobInProgress object that needs to be moved into the schedulers. IMO, JobInProgress, and other objects, should expose their data structures, and maybe simpler methods that decide whether a TT is blacklisted or if a TT has enough free memory (methods that return the same response irrespective of schedulers). The various Schedulers should compose these methods as they seem fit. One may check for user quotas before it checks for memory fit (maybe the former is a faster check) while another may do something else. This does imply a fair bit of refactoring, and could be a longer term effort. This will also help share common code across Schedulers. Short term, I think it's better that each scheduler decide if it wants to support memory checks when scheduling, what it does if the TT does not have enough free mem, and implement that individually. To ease this, maybe you can put the logic of deciding whether a TT has enough free memory for a task in a separate method in JobInProgress, but call that from each Scheduler, not from another JobInProgress method. That makes sense Vivek. I'd be very glad to see more scheduling moved out of JobInProgress (and hopefully into some utility class that different people can use). Right now it's hard to implement some scheduling strategies, such as changing the speculative execution policy or the locality policy, without rewriting a lot of the code in JobInProgress. Also, as more scheduling functionality gets added, JobInProgress just grows larger. It would be nice if it was more of a data object than logic + data.
The other parameter we have in hadoop related to memory is mapred.child.ulimit which is specified in KB. I think expressing these values in KB would keep things consistent.
I think scheduler failing the job is more consistent if the scheduling decisions are being made in the scheduler.
I am not really sure either way. Given earlier discussions we've had that virtual memory is what really matters, I am guessing we don't need it. Regarding the config variable names, a few concerns/suggestions:
Does this make sense ?
Having the offset value also for RAM will make configuration as well as scheduling logic symmetric w.r.t both vmem and ram, IMO.
Or as Owen/Sameer proposed, can we call them mapred.task.memory.hardlimit.default and mapred.task.memory.hardlimit? And substituting softlimit for RAM? > I think expressing these values in KB would keep things consistent.
I think the system should be smart enough that if I specify "4g" or "4m" or "4k" it knows what I'm talking about. Are we really going to make ops people write massively long integers in 2008? It annoys me when apps aren't "smart" about values. [He says, having just had to set some memory related values in another application that ended up looking like 1073741824 . Nice and readable.... ] >[He says, having just had to set some memory related values in another application that ended up looking like 1073741824 . Nice and readable.... ]
There's a lot to be said for configuration languages with basic math operators, though then you end up worrying about order of precedence so end up bracketing everything just to be sure dfs.namenode.decommission.interval (5 * 60); [He says, editing a .spec file where the ${} ant properties get expanded during build time but the RPM may be rebuilt on a separate machine with the %{} properties expanded either at rpmbuild time or on the target machine, and which trial and error seems the only way to be sure what happens] Allen> I think the system should be smart enough that if I specify "4g" or "4m" or "4k" it knows what I'm talking about.
Steve> There's a lot to be said for configuration languages with basic math operators [ ... ] These sound like great enhancements to the Configuration API. The getInt() and getLong() methods could implement these. But that should probably be in a separate issue. And, if it's to be implemented, the parameter in this issue should then be raw bytes, no? Some details about configuration:
Job Configuration:Already addressed cases:
Cases not addressed:
TT Configuration:Already addressed cases:
Cases not addressed:
I prefer the first solutions to the later in both the above cases, as we are making our best efforts to prevent thrashing and task overflowing in those solutions. Thoughts? As Vinod has brought up, there are some edge cases and details missing in the summary
We want monitoring to work independent of scheduler support, i.e., even if the scheduler you're using does not support memory-based scheduling, you may still want to make sure the TTs monitor memory usage on their machines and kill tasks if too much memory is used. Based on what we've described in the summary, the following three configuration settings are required for the TT to do monitoring: mapred.tasktracker.virtualmemory.reserved (the offset of total VM on the machine), mapred.task.default.maxvm (the default for maximum VM per task), and mapred.task.limit.maxvm (the upper limit on the max VM per task). It is proposed that:
The TT also needs to report memory information to the schedulers. As per
I propose the latter. TTs always report how much VM&RAM they have on their system, and what offset settings they have. They're the only ones who have this information, and this approach gives a lot of flexibility to the schedulers in terms of how to use that information. What about schedulers? The Capacity Scheduler should do the following:
It's possible that other schedulers may choose a different algorithm. What's important is that they have all the available information, which they should as per this proposal. Attaching a new patch.
if (qi.runningJobs.remove(oldInfo) != null) { qi.waitingJobs.remove(oldInfo); } Should be if (qi.runningJobs.remove(oldInfo) == null) { qi.waitingJobs.remove(oldInfo); } This definitely needs to be fixed in previous versions too.
Vinod, good catch. Thanks for finding this out. I have filed I've started looking at this patch. Here are a few initial comments. I am still to look at scheduling and test cases:
Configuration:
Monitoring:
Comments on the scheduling portion:
Test cases seem fine. Only one enhancement request is that in testClusterBlockingForLackOfMemory(), you can submit a normal job after submitting the high ram jobs, and then verify that no task assignment is done. This way we can be sure that even if there are jobs that fit in memory that can run, we don't schedule anything.
Attaching a new patch.
ant test-patch gave all +1 except a -1 for findBugs. This is regarding unsynchronized access to TaskTrackerStatus object. This patch does the right thing, but there are other earlier unsynchronized accesses. The changes in this patch triggered an old findBugs warning. I've run core and contrib tests. They both built successfully. Things to be done later in other jiras
Looking good. Few comments:
Attaching new patch.
I had done changes to capacity-scheduler.xml.template earlier itself, somehow that got missed in the patch, adding it now. The testcase TestCapacityScheduler.testHighMemoryJobWithInvalidRequirements was failing intermittently because of timing issues with JobInitializationPoller. Fixed that now by using the ControlledJobIntializationPoller. Incorporated the rest of the review comments too. New patch which updates documentation a bit. Also moved DummyMemoryCalculatorPlugin to test code. Otherwise patch is looking good. +1
I am running some sanity tests to make sure things are fine. There was a mistake in the patch related to the configuration in hadoop-defaults. Cancelling. Will upload a new patch after fixing that.
This patch fixes the following:
test-patch output with this patch is: [exec] +1 @author. The patch does not contain any @author tags. [exec] +1 tests included. The patch appears to include 24 new or modified tests. [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. [exec] +1 Eclipse classpath. The patch retains Eclipse classpath integrity. Running core and contrib tests. I will commit this patch if they pass. All core and contrib tests passed on my machine.
So, I committed this. Thanks, Vinod ! Integrated in Hadoop-trunk #677 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/677/
. Support memory based scheduling in capacity scheduler. Contributed by Vinod Kumar Vavilapalli. The list of tests that I ran on a real cluster:
Enable both TT memory management and scheduling based on memory for the remaining tests.
The following currently work only on Linux. mapred.task.limit.maxvmem and mapred.task.limit.maxpmem should be considerably high so as to not interfere with jobs.
Edit release note for publication.
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Based on this assumption, the total number of slots each task is virtually taking is (number of running tasks for the job * the number of slots each task of the job is taking). The latter is a something like (amount of per task memory the job has requested / the memory per slot for the cluster).
Makes sense ?