Since the issue of dealing with memory-intensive and badly behaved jobs has spanned more than one Jira, here's the latest summary on the overall proposal (following some offline discussions).
The problem, as stated originally in
HADOOP-3581, is that certain badly-behaved jobs end up using too much memory on a node and can bring down that node. We need to prevent this. A related requirement, as described in HADOOP-3759, is that the system respects different, and legitimate, memory requirements of different jobs.
There are two independent parts to solving this problem: monitoring and scheduling. Let's look at monitoring first.
We want to ensure that the sum total of virtual memory (VM) usage by all tasks does not go over a limit (call this the max-VM-per-node limit). That's really what brings down a machine. To detect badly behaved jobs, we want to associate a limit with each task (call this the max-VM-per-task limit) such that a task is considered badly behaved if its VM usage goes over this limit. Think of the max-VM-per-task limit as a kill limit. A TT monitors each task for its memory usage (this includes the memory used by the task's descendants). If a task's memory usage goes over its max-VM-per-task limit, that task is killed. This monitoring has been implemented in
HADOOP-3581. In addition, a TT monitors the total memory usage of all tasks spawned by the TT. If this value goes over the max-VM-per-node limit, the TT needs to kill one or more tasks. As a simple solution, the TT can kill one or more tasks that started most recently. This approach has been suggested in HADOOP-4523. Tasks that are killed because they went over their memory limit should be treated as failed, since they violated their contract. Tasks that are killed because the sum total of memory usage was over a limit should be treated as killed, since it's not really their fault.
How do we specify these limits?
- for max-VM-per-node:
HADOOP-3581 provides a config option, mapred.tasktracker.tasks.maxmemory , which acts as the max-VM-per-node limit. As per discussions in this Jira, and in HADOOP-4523, this needs to be enhanced. mapred.tasktracker.tasks.maxmemory should be replaced by mapred.tasktracker.virtualmemory.reserved, which indicates an offset (in MB?). max-VM-per-node is then the total VM on the machine, minus this offset. How do we get the total VM on the machine? This can be done by the plugin interface that Owen proposed earlier.
- for max-VM-per-task:
HADOOP-3759 and HADOOP-4439 define a cluster-wide configuration, mapred.task.default.maxmemory, that describes the default maximum VM associated per task. Rename it to mapred.task.default.maxvm for consistency. This is the default max-VM-per-task limit associated with a task. To support jobs that need higher or lower limits, this value can be overridden by individual jobs. A job can set a config value, mapred.task.maxvm, which overrides mapred.task.default.maxvm for all tasks for that job.
- Furthermore, as described earlier in this Jira, we want to prevent users from setting mapred.task.maxvm to an arbitrarily high number and thus gaming the system. To do this, there should be a cluster-wide setting, mapred.task.limit.maxvm, that limits the value of mapred.task.maxvm. If mapred.task.maxvm is set to a value higher than mapred.task.limit.maxvm, the job should not run. Either this check can be done in the JT when a job is submitted, or a scheduler can fail the job if it detects this situation.
Note that the monitoring process can be disabled if mapred.tasktracker.virtualmemory.reserved is not present, or has some default negative value.
In order to prevent tasks using too much memory, a scheduler can ensure that it limits the number of tasks running on a node based on how much free memory is available and how much a task needs. The Capacity Scheduler will do this, though we cannot enforce all schedulers to support this feature. As per
HADOOP-3759, TTs report, in each heartbeat, how much free VM they have (which is equal to max-VM-per-node minus the sum of max-VM-per-task for each running task). The Capacity Scheduler needs to ensure that:
- there is enough VM for a new task to run. This it does by comparing the task's requirement (its max-VM-per-task limit) to the free VM available in the TT.
- there is enough RAM available for a task so that there is not a lot of page swapping and thrashing when tasks run. This is much harder to figure out and it's not even clear what it means to have 'enough RAM available' for a task. A simple proposal, to get us started, is to assume a fraction of the max-VM-per-task limit as the 'RAM limit' for a task. Call this the max-RAM-per-task limit, and think of it as a scheduling limit. For a task to be scheduled, its max-RAM-per-task limit should be less than the total RAM on a TT minus the sum of max-RAM-per-task limits of tasks running on the TT. This also implies that a TT should report its free RAM (the total RAM on the node minus the sum of the max-RAM-per-task limits for each running task.
Just as with the handling of VM, we may want to use a part of the RAM for scheduling TT tasks, and not all of it. If so, we can introduce a config value, mapred.tasktracker.ram.reserved, which indicates an offset (in MB?). The amount of RAM available to the TT tasks is then the total RAM on the machine, minus this offset. How do we get the total RAM on the machine? By the same plugin interface through which we obtain total VM.
How do we specify a task's max-RAM-per-task limit? There is a system-wide default value, mapred.capacity-scheduler.default.ramlimit, expressed as a percentage. A task's default max-RAM-per-task limit is equal to the task's max-VM-per-task limit times this value. We may start by setting mapred.capacity-scheduler.default.ramlimit to 50 or 33%. In order to let individual jobs override this default, a job can set a config value, mapred.task.maxram, expressed in MB, which then becomes the task's max-RAM-per-task limit. Furthermore, as with VM settings, we want to prevent users from setting mapred.task.maxram to an arbitrarily high number and thus gaming the system. To do this, there should be a cluster-wide setting, mapred.task.limit.maxram, that limits the value of mapred.task.maxram. If mapred.task.maxram is set to a value higher than mapred.task.limit.maxram, the job should not run. Either this check can be done in the JT when a job is submitted, or a scheduler can fail the job if it detects this situation.
The Capacity Scheduler, when it picks a task to run, will check if both the task's RAM limit and VM limit can be satisfied. If so, the task is given to the TT. If not, nothing is given to the TT (i.e., the cluster blocks till at least one TT has enough memory). We will not block forever because we limit what the task can ask for, and these limits should be set lower than the RAM and VM on each TT. In order to tax users on their job's requirements, we may charge them for what the value they set per task, but for now, there is no penalty associated with the value set for mapred.task.maxmemory by a user for a job.
Based on the writeup above, I'm summarizing a few of the open issues (mostly minor):
- Should the memory-related config values be expressed in MB or GB or KB or just bytes? MB sounds good to me.
- If a job's specified VM or RAM task limit is higher than the max limit, that job shouldn't be allowed to run. Should the JT reject the job when it is submitted, or should the scheduler do it, by failing the job? The argument for the former is that these limits apply to all schedulers, but then again, they are scheduling-based limits, so they maybe they should be done in each of the schedulers. In the latter case, if a scheduler does not support scheduling based on memory limits, it can just ignore these settings and run the job. So the latter option seems better.
- Should the Capacity Scheduler use the entire RAM of a TT when making a scheduling decision, or an offset? Given that the RAM fractions are not very precise (they're based on fractions of the VM), an offset doesn't make much of a difference (you could tweak mapred.capacity-scheduler.default.ramlimit to achieve what the offset would), and adds an extra config value. At the same time, part of the RAM is blocked for non-Hadoop stuff, and an offset does make things symmetrical.