After off-line discussion with Hemanth and Vivek, following is the proposal for implementing asynchronous initialization of jobs by capacity Scheduler:
- Modify CapacityTaskScheduler to look only at the Run-queue maintained by JobQueueManager. This queue contains all initialized jobs.
- Modify JobQueueManager to change semantics of waiting job queue to a list of jobs which with are waiting to be scheduled. Please note that when a job is waiting to be scheduled it means, that there is a possibility that a Job J1 would be in both running and job queue at same time. When the first map or reduce of the job is scheduled, the job would be removed from the job queue which JobQueueManager maintains.
- Introduce a new poller class, which looks at the JobQueueManager.getJobs(queue) and picks up tasks to initialize for that queue.
- Following will be parameters which would be parameters which would be used for selecting jobs for eager initialization:
- Maximum jobs which can be initialized per user. This would be a configuration parameter which would be introduced in capacity_scheduler.xml
- Number of concurrent users supported by the queue, so the initialization poller would initialize ((userlimits/100) + 2 ) user jobs.
- The selected jobs would be passed on to worker threads, which can be assigned duty of initializing jobs from one or more queues.
- The worker thread maintains separate lists for jobs from different queues sorted by priority as same as JobQueueManager
- The worker thread then initializes the jobs from queues in a round robin fashion amongst the job queues assigned to it, i.e. it initializes first job from q1 and then first job from q2.
Consider a job queue : q which can support one con-current user (i.e. userlimits = 100%). Three users U1,U2,U3 are submittign jobs in following distribution:
Maximum number of jobs to be initialized per user : 2
Jobs initialized by the Initialization threads would be:
And all these are just initialized but not scheduled and a User U4 submits a very high priority Job and a normal priority, so our job queue in t+1 instance would look like :
So next iteration poller would have initialized following :
Please note that U4's second job would not be initialized.
If user1 had submitted the very high priority Job then he would be crossing the maximum limit of jobs which are allowed to be initialized per user.
In above example if J1U1 is a job which takes long initialization time, the next job to be initialized would be the next highest priority or highest priority jobs (if the job is submitted late as above example).
Any thoughts on the above approach?