|
Hi devaraj, thanks for the info. The problem we saw was with 0.17.1. The number of completed jobs in memory has been reduced from the default of 100 to 20. When the problem occured there were about 400 total jobs in the JT (running + completed + failed). I do not know how many jobs were run by the JobTracker since it was last started. This particular job had 60,000 mappers. About half of these tasks had finished before the problem started being acute.
I have the GC log enabled via -verbose:gc -XX:+PrintGCTimeStamps -XX:+PrintGCDetails -Xloggc:/var/hadoop/logs/jobtracker1.gc.log This log (as well as jconsole) showed that JT was busy running full GC. I agree that moving to later releases till help mitigate this problem to a certain extent, but as a system adminstrator, I would like to set the upper limit of number of tasks for a single job. This is not a cure-all but could serve as a guardpost to prevent non-conforming jobs form running. This should be completary to all the JIRAs you mentioned, isn't it? Do you see a downside in this approach? Configure the maximum number of splits that a job can have. This helps in eliminating rouge jobs from permanently stalling the Job Tracker. This patch is for review purposes only.
dhruba borthakur made changes - 29/Aug/08 01:04 AM
Sorry Dhruba, I somehow missed the jira mail with your earlier comment.
I think we should do this in the server (if at all we want to do it) since this should be a admin configured param. Also proper warning should be given as response to the user if the JT discards a job because of too many splits.. Maybe we should maintain a global count of the tasks across all jobs as well and if that is exceeded we don't accept any job till some job(s) complete.. By the way, configuring the mapred.jobtracker.completeuserjobs.maximum to a lower value should help your case. Hi Devaraj, Excellent ideas. I will work on your feedback and will upload a new patch soon. Thanks for giving me quick feedback!
This patch fails a job if the jobtracker detects that the number of splits for the job exceeds a configures upper limit. Please let me know of any gotchas that this patch might have.
I like the idea of having an upper limit of all tasks for the entire jobtracker. But I have not yet implemented it. Do you think I should implement it as part of this patch? BTW, in our cluster, we have set mapred.jobtracker.completeuserjobs.maximum to 20 already.
dhruba borthakur made changes - 29/Aug/08 08:18 AM
I think you should handle the overall #tasks case as well.
There are two new configuration parameters
1. mapred.max.tasks.per.jobtracker determines the maximum limit of the number of tasks that a jobtracker can keep in memory. 2. mapred.max.tasks.per.job The maximum number of mappers/reducers that a task can have. By default, these limits are switched off.
dhruba borthakur made changes - 30/Aug/08 01:01 AM
Few comments,
1) If the job fails on init(), JobTracker invokes JobInProgress.kill(). So ideally you should simply throw an exception if the limit is crossed 2) The api totalNumTasks() is not used anywhere and can be removed. 3) The count returned by totalNumTasks() will be erroneous as it also takes into consideration jobs which exceed the limit. numMapTasks should be changed only when the job passes these limit checks. 4) You could also check the limit in the JobInProgress constructor where the numMapTasks/numReduceTasks is obtained from the jobConf. This might help us catch the violation early and return to user some useful message why the job was not accepted. 5) Since reduce tasks also add to numTasks, i think (numMapTasks + numReduceTasks) is a better metric. Hi Amar, thanks for your comments.
>1. If the job fails on init(), JobTracker invokes JobInProgress.kill(). So ideally you should simply throw an exception if the limit is crossed Can you pl explain which potion of code you are referring to here? >2. The api totalNumTasks() is not used anywhere and can be removed. Regarding 3 and 4 i agree with you that it is better if I can check these limits in the constructor of JobInProgress. But, the number of splits for this current jobis not yet available when the constructor is invoked. That's the reason I do these checks in initTasks. Does it make sense? regarding point 5, my latest patch has this fix.
dhruba borthakur made changes - 03/Sep/08 01:04 AM
Incorporated all of Amar's suggestions.
dhruba borthakur made changes - 03/Sep/08 06:28 AM
Look at how the job is inited. If the init fails then there is a cleanup process associated with it. So simply throwing an exception would work and there is no need to explicitly set the job state and finish-time.
Oops! I missed that. But its still flawed as I have mentioned in comment #3. Plz check.
I just checked and it seems that the job client never overwrites the number of maps to be spawned. Since the num-maps passed by the user is just a hint to the jobclient while calculating the splits, this information is of no use to the jobtracker and hence the job-client can overwrite the num-maps parameter before uploading the job.xml on the dfs. With this the job that should fail will fail fast (i.e in the constructor itself) and the user will be informed as to why the job failed. Comment #3 just states that totalNumTasks() will also count tasks from non-running (i.e killed/completed/failed) jobs. So totalNumTasks() should only take RUNNING jobs into consideration which calculating total tasks. Thanks Amar for ur comments. I purposely kept totalNumTasks() to count all tasks (failed, completed, etc) because I am interested in limiting memory usage on the JT. All these tasks will occupy some memory in the JT, isn't it?
But not the jobs that were failed because they exceeded the limit. Remember that you are setting the numMaps before throwing the exception and hence they will also counted. The problem with this is that any job submitted after the limit-exceeding job might not pass this test even when the job should. You could simply reset the value to 0 before throwing the exception. Why can't org.apache.hadoop.mapred.LimitTasksPerJobTaskScheduler along with the mapred.jobtracker.completeuserjobs.maximum limit suffice? I agree that we still need a global limit over all the tasks, but, may be, that can be built into a scheduler too.
Thanks to Amar for your comments. I am attaching a new patch that looks at the allocated tasks for each job and matches that with the specified limit. Amar: can you pl review this latest patch? Thanks.
@Vinod: the proposed mapred.max.tasks.per.jobtracker is used to limit the memory usage of the jobtracker. We need to count how many tasks (failed, completed, running, etc) are resident in memory. I believe that mapred.jobtracker.completeuserjobs.maximum does not satifsy that requirement. I do not know much about org.apache.hadoop.mapred.LimitTasksPerJobTaskScheduler, maybe Amar can comment on that,
dhruba borthakur made changes - 04/Sep/08 06:09 AM
Ok, guess I was wrong. LimitTasksPerJobTaskScheduler limits the tasks running per job in the cluster. It doesn't help preventing any jobtracker stall. But still, this looks something that should be handled by a scheduler to me - perhaps the scheduler should bail out while initializing a job with huge number of tasks.
I think the goal is to avoid making the JobTracker allocate memory for tasks for a job that causes the limit to exceed a certain quantity. if the limit-check is in the scheduler, all schedulers have to implement this check. Also, by the time the scheduler gets to look at the task, the JT might have already allocated some memory for some task-related status.
I am assuming that it is better to check this in the JT rather than in the scheduler, but please let me know if my assumption seems invalid.
Scheduler should be least concerned about jobtracker's memory. Its just a pluggable code that should decide on what goes next. Whether or not the jobtracker can support some stuff should better be decided by the jobtracker. Few comments
1) JobInProgress.totalAllocatedTasks() should also consider nonLocalMaps and nonLocalRunningMaps. Applications like random-writer use these structures. 2) There is an extra '-' diff in JobTracker.java 3) You might need to synchronize totalAllocatedTasks() api in both places. Consider a case where job1 is in init stage while job2 is newly submitted. Assume both cannot run in parallel on the jobtracker. Assume job1 is not yet seen the splits and job2, which is getting constructed, checks for totalAllocatedTasks(). In such a case job2 will succeed and will move for init. job1 will create its cache and continue while job2 will fail in init. Also there is no guarantee for inits to be sequential. They might happen in parallel as its upto the scheduler. So now all the jobs that call totalTasks might see some stale value and hence might end up expanding themselves. 4) Plz add a test case. I have a question regarding item 1 above. The totalAllocatedTasks current use runningMapCache, noinRunningMapcache, runningReduces and nonRunningReduces.
Over and above these four, should it also use nonLocalMaps and nonLocalRunning Maps? I thought thet nonLocalRunningMaps are already contained in runningMaoCache. Please let me know if this is not true. I like the idea of making JobInProgress.totalAlloctedTasks be a synchronized method. But making the JobTracker.totalAlloctedTasks be a synchronized method might be bad because it violates locking hierarcy, right? I am assuming that one cannot acquire the jobTracker lock is one already has the JobInProgress lock. Do we really need to make JobTracker.totalAllocatedTasks be a synchronized method?
They are mutually exclusive. For data local tasks runningMapCache, noinRunningMapcache, runningReduces and nonRunningReduces are used. For non-data local tasks nonLocalMaps and nonLocalRunningMaps are used.
Yes. One thing you could do is keep a global count of all allocated tasks in the JobTracker. Jobs getting constructed will check the value before bailing out. Once the job inits, it updates the value of this count. Any access/update to the count should be guarded. Since the count will be updates only on passing the init tests, we can be sure that limit-exceeding jobs never get inited. So something like In init : 1. Get the count lock 2. Check if the count + self-tasks > limit 2.1 If yes then throw an exception 2.2 else update the count and release the lock Since the value of allocated tasks never change once inited, there is no point in iterating over the jobs everytime. Once the job is removed from the jobtracker (see RetireJobs), then update the count to reflect the change. Incorprated locking changes to protect the global counter of number of allocated tasks in job tracker. Also wrote a unit test.
dhruba borthakur made changes - 06/Sep/08 12:18 AM
Submitting the patch to detect hadoopQA test results.
dhruba borthakur made changes - 06/Sep/08 12:59 AM
+1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12389601/maxSplits7.patch against trunk revision 692597. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 3 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3195/testReport/ This message is automatically generated. More comments
1) Computing allocated tasks based on internal datastructures of JobInProgress is incorrect as their count will always be missing. garbageCollect() for the job frees up the memory taken up by the caches. Note that garabgeCollect() will be invoked before retireJob(). I think we should use numMapTasks and numReduceTasks instead of the sizes of the caches. Also reset the values of numMapTasks/numReduceTasks if the limit is crossed. If someone introduces partial expansion of tasks then we can think about it later. 2) I am not sure if changing the log level of one class is sufficient. It would be nice if we can set the log level of the complete framework in the testcase. But again is it required? 3) Did not check the test case completely but should take care of the following cases
4) I dont see in the test case where the mapred.jobtracker.completeuserjobs.maximum is set. By default it will be 100 and the test case might never test the cleanup process. I think the global limits are too complex to be worthwhile without a better model of the memory. I am ok with max maps and reduces. I would have preferred having an empty/undefined value mean unlimited.
-1 except for the very simple per job limits.
Owen O'Malley made changes - 18/Sep/08 01:53 AM
Owen,
I think Dhruba's concern here is of many small/avg sized jobs collectively overloading the jobtracker, see here I like Owen's idea of putting a limit on the max number of tasks for a job only. This is the first step in preventing bad things occuring in a cluster. I would like to defer implementing the global limits to later. Does it sound reasonable?
Hi Amar, do you agree with Owen's view (and mine too) that the global limits are too complex to be worthwhile? Also, it might not even be feasible for an admin to determine what the global limit should be. If you agree, then the attached maxSplits2.patch is the code we need.
+1 on having the simple thing first (i think i had suggested the global limits earlier but hadn't realized that it would be so complicated).
Fail a job if the number of tasks for a single job exceeds a configured maximum.
dhruba borthakur made changes - 23/Sep/08 07:21 PM
dhruba borthakur made changes - 23/Sep/08 08:16 PM
dhruba borthakur made changes - 23/Sep/08 08:23 PM
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12390777/maxSplits8.patch against trunk revision 698385. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 3 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. -1 core tests. The patch failed core unit tests. -1 contrib tests. The patch failed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3356/testReport/ This message is automatically generated.
I meant fixed limit per job. Few comments :
Main : 1. There is no need to do the cleanup in init tasks. Simply throwing an exception in init tasks should do the cleanup and kill/fail the job. There seems to be a bug in the framework and I have opened Testcase : Cancelling patch while Amar's comments are being accomodated...
Minor nit: the variable maxSplits in JobInProgress should probably be renamed to 'maxTasks' - it's misleading.
Arun C Murthy made changes - 30/Sep/08 11:34 PM
We use -1 in
Incorporated all review comments except one.
If the limit check fails, the JT raises an exception after marking the job as failed. Amar had commented that it is not required to mark the job status as "failed" before raising the exception. But I have seen that unless this status is set, a job that is expected to fail does not fail. the unit test will then fail.
dhruba borthakur made changes - 03/Oct/08 06:42 AM
dhruba borthakur made changes - 03/Oct/08 05:14 PM
dhruba borthakur made changes - 03/Oct/08 05:15 PM
dhruba borthakur made changes - 03/Oct/08 05:15 PM
Hi amar, I marked this jira as being blocked by
Few comments
1) int maxTasks = conf.getInt("mapred.jobtracker.maxtasks.per.job", -1); This will change per job as conf is configured per job. Hence all the jobs should get this value from the jobtracker. Something like -----JobTracker------------- int getMaxTasksPerJob() { return conf.get("mapred.jobtracker.maxtasks.per.job", -1); } -----JobInProgress--------- int maxTasks = jobtracker.getMaxTasks(); 2) if (maxTasks != -1 && numMapTasks + numReduceTasks > maxTasks) {
What is I pass -2? So the check should be if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) {
3) if (maxTasks != -1 && numMapTasks + numReduceTasks > maxTasks) { long now = System.currentTimeMillis(); this.finishTime = now; status.setRunState(JobStatus.FAILED); JobHistory.JobInfo.logFailed(profile.getJobID(), now, 0, 0); // Special case because the Job is not queued JobEndNotifier.registerNotification(this.getJobConf(), this.getStatus()); throw new IOException( "The number of tasks for this job " + (numMapTasks + numReduceTasks) + " exceeds the configured limit " + maxTasks); } can be written as if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) { throw new IOException( "The number of tasks for this job " + (numMapTasks + numReduceTasks) + " exceeds the configured limit " + maxTasks); } Note that whatever you have done should be done by the job upon failure i.e
4) One minor nit. mapred.jobtracker.maxtasks.per.job could be mapred.job.tasks.maximum The test case looks fine. Plz check with everyone if passing -1 for UNLIMITED is fine. Wondering what is 0 is passed? Should be bail out immediately or should we consider any non-positive number as UNLIMITED? Incorporated most of Amar's comments. I left the name of the config parameter as it was earlier.
dhruba borthakur made changes - 07/Oct/08 09:33 PM
dhruba borthakur made changes - 07/Oct/08 09:34 PM
Also, anything less than or equal to zero as value of the config parameter indicates an INVALID value. In this case, the limit checking is not triggered.
+1.
The reason why I proposed the new name is because it becomes easier to relate. mapred.job.tasks.maximum would mean that there is an attribute maximum for a component tasks of an entity job under the mapred umbrella. But I am ok with the current name too. Thanks Amar for reviewing it. I am marking it for 0.19 because this limit is very necessary for clusters that have permanent JobTrackers (not using HOD). Otherwise a single erroneous job could swamp the entire cluster. The fix is very low-risk. I am proposing that this fix gets into 0.19 branch.
dhruba borthakur made changes - 09/Oct/08 06:34 AM
dhruba borthakur made changes - 09/Oct/08 06:34 AM
+1 @author. The patch does not contain any @author tags.
+1 tests included. The patch appears to include 1 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings.
dhruba borthakur made changes - 09/Oct/08 11:52 PM
Integrated in Hadoop-trunk #630 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/630/
Robert Chansler made changes - 22/Oct/08 12:02 AM
dhruba borthakur made changes - 20/Nov/08 08:16 AM
Nigel Daley made changes - 20/Nov/08 11:38 PM
Owen O'Malley made changes - 08/Jul/09 04:52 PM
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
HADOOP-3670. There is a discussion there on using the profiler and tuning the GC parameters.Also, could you please minimize the number of completed jobs kept in memory per user. Specify a very low value for mapred.jobtracker.completeuserjobs.maximum (defaults to 100 jobs per user).
HADOOP-3150will further help to reduce the amount of memory consumed by the JT since it removes the task promotion queue.Could you please give some more details - like the number of tasks the job had, how many such jobs could you run before the JT started to exhibit the problem, etc?