|
Some details.
The limits on the initialized jobs are for waiting jobs only. Because of user quotas, we actually need only one limit: the # of initialized (waiting) jobs per user. That number should probably be 1, 2 or 3. Let's assume it's 2. User quotas decide how many concurrent users the queue can support at a given time, in terms of running jobs. If the user quota is 25%, for example, the queue can run jobs from up to 4 users. Suppose there are waiting jobs from 4 or more users. Then, we need to asynchronously initialize the first 2 waiting jobs from each user, for a total of 8 jobs. That's because any waiting job that runs next will come from one of these 8 jobs. If only 2 users have waiting jobs, then we just need to asynchronously initialize 2 jobs from each of these 2 users. So it doesn't make sense to have a per-queue limit on the total number of initialized jobs. Having such a limit can actually cause incorrect behavior, as this pre-configured limit may be small enough to prevent initialization of jobs from one or more users. Note also that because jobs can shift their position in the wait queue because of priorities, and that jobs can complete between the interval that this init thread (which is handling asynchronous inits) run, the total number of initialized jobs at any given time may be higher than what the limits specify. As an example, consider a limit of 2 jobs/user. Suppose three users have submitted jobs that are waiting. Our thread will initialize 6 jobs, two each from each of the three users. Now suppose that one of the user submits a high priority job which jumps to the head of the wait queue. The next time our init thread runs, it will have to initialize this high priority job, even though the user already has two jobs initialized. Ideally, the thread would un-initialize one of the 2 previously jobs. This is a nice optimization, but we probably don't need it right away. Some more information on the proposal above, based on my discussion with Vivek.
This means that we do not count jobs that are already running (and therefore, init'ed) in applying the limits. In that sense, it is easier for me to think about the limit as analogous to a cache pre-fetch limit, rather than a cap on the number of init'ed jobs. Maybe we should call this something like mapred.capacity-scheduler.queue.queue-name.max-waiting-jobs-inited-per-user.
To illustrate this point, suppose we had such a limit as 5 jobs in the example above, then we would never end up initializing any job from the 4th user. Hence though by virtue of user limits, he could have run, as the job is not inited until one of the other jobs becomes running, he does not. Even worse, if there are more jobs from the first three users ahead of the queue, he would have to wait until all of them become running before this job can run. This could take quite a while.
Reversing the initialization of a job looks like a good option to think about. We should have a thread (call it the init thread) within the Capacity Scheduler that initializes jobs (i.e., calls initTasks() on jobs even when the job is not ready to run, as per the scheduler). The following features are desirable:
After off-line discussion with Hemanth and Vivek, following is the proposal for implementing asynchronous initialization of jobs by capacity Scheduler:
Illustration: Consider a job queue : q which can support one con-current user (i.e. userlimits = 100%). Three users U1,U2,U3 are submittign jobs in following distribution: Maximum number of jobs to be initialized per user : 2 J1U1,J2U1,J3U1,J4U1,J1U2,J2U2,J3U3,J4U4,J1U3,J2U3,J3U3,J4U3. Jobs initialized by the Initialization threads would be: J1U1,J2U1,J1U2,J2U2,J1U3,J2U3. And all these are just initialized but not scheduled and a User U4 submits a very high priority Job and a normal priority, so our job queue in t+1 instance would look like : J1U4,J1U1,J2U1,J3U1,J4U1,J1U2,J2U2,J3U3,J4U4,J1U3,J2U3,J3U3,J4U3,J2U4. So next iteration poller would have initialized following : J1U4,J1U1,J2U1,J1U2,J2U2,J1U3,J2U3. Please note that U4's second job would not be initialized. If user1 had submitted the very high priority Job then he would be crossing the maximum limit of jobs which are allowed to be initialized per user. In above example if J1U1 is a job which takes long initialization time, the next job to be initialized would be the next highest priority or highest priority jobs (if the job is submitted late as above example). Any thoughts on the above approach? Attaching output of ant test-patch
[exec] +1 overall.
[exec]
[exec] +1 @author. The patch does not contain any @author tags.
[exec]
[exec] +1 tests included. The patch appears to include 3 new or modified tests.
[exec]
[exec] +1 javadoc. The javadoc tool did not generate any warning messages.
[exec]
[exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings.
[exec]
[exec] +1 findbugs. The patch does not introduce any new Findbugs warnings.
[exec]
[exec] +1 Eclipse classpath. The patch retains Eclipse classpath integrity.
Attaching new patch by changing booleans to volatile, in order to address JMM issues as mentioned in JIRA HADOOP-4671
Some comments:
JobInitializationPoller:
JobQueueManager:
CapacityTaskScheduler:
CapacitySchedulerConf:
Attaching the latest patch incorporating the comments from Hemanth.
Attaching a new patch fixing an issue while assigning queues to thread, in case of spill overs the queues were not actually added to hashmap.
This is looking good. Some of my previous comments aren't incorporated though. If you are ok with the suggestions, can you please incorporate them. If you have any reason not to make them, please mention that here.
I also looked at the test cases in detail. A few comments on them:
Attaching patch with all the changes incorporating the comments mentioned.
Sorry for the confusion with respect to test cases, attaching latest patch with changes to test case, which now waits for a condition to occur if it never occurs the test cases fail by timeout.
The test cases in the earlier patch were too complex and too non-deterministic given the asynchronous nature of the patch. So Sreekanth and I refactored the code to enable better testing, and modified the test cases to be run in a synchronous and deterministic fashion. We are also not dependent on any sleep calls. I updated the documentation on the patch quite a bit.
Results of test-patch:
[exec] +1 overall. [exec] +1 @author. The patch does not contain any @author tags. [exec] +1 tests included. The patch appears to include 6 new or modified tests. [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. [exec] +1 Eclipse classpath. The patch retains Eclipse classpath integrity. I also ran contrib tests on my box, and they all passed. As the patch only touches the capacity scheduler and no classes in core, I am not running core tests. Will commit this patch now, as the patch queue is quite long, and there are some important patches waiting on this one for merging (for e.g.
I just committed this. Thanks, Sreekanth !
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
HADOOP-4428. We can have a limit on the total number of waiting jobs initialized (maybe 10 per queue), as well a limit on initialized jobs/user/queue (maybe 3/per/queue). The modified EagerTaskInitializationListener thread enforces these limits and only initializes jobs as necessary.