Hadoop Common
  1. Hadoop Common
  2. HADOOP-4513

Capacity scheduler should initialize tasks asynchronously

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.19.0
    • Fix Version/s: 0.20.0
    • Component/s: None
    • Labels:
      None
    • Hadoop Flags:
      Reviewed

      Description

      Currently, the capacity scheduler initializes tasks on demand, as opposed to the eager initialization technique used by the default scheduler. This is done in order to save JT memory footprint. However, the initialization is done in the assignTasks API which is not a good idea as task initialization could be a time consuming operation. This JIRA is to move out the initialization outside the assignTasks API and do it asynchronously.

      1. HADOOP-4513-1.patch
        42 kB
        Sreekanth Ramakrishnan
      2. HADOOP-4513-2.patch
        43 kB
        Sreekanth Ramakrishnan
      3. HADOOP-4513-3.patch
        49 kB
        Sreekanth Ramakrishnan
      4. HADOOP-4513-4.patch
        49 kB
        Sreekanth Ramakrishnan
      5. HADOOP-4513-5-NOBUSYWAITING.patch
        58 kB
        Sreekanth Ramakrishnan
      6. HADOOP-4513-6.patch
        58 kB
        Sreekanth Ramakrishnan
      7. HADOOP-4513-8.patch
        62 kB
        Hemanth Yamijala

        Activity

        Hide
        Hemanth Yamijala added a comment -

        I just committed this. Thanks, Sreekanth !

        Show
        Hemanth Yamijala added a comment - I just committed this. Thanks, Sreekanth !
        Hide
        Hemanth Yamijala added a comment -

        I also ran contrib tests on my box, and they all passed. As the patch only touches the capacity scheduler and no classes in core, I am not running core tests. Will commit this patch now, as the patch queue is quite long, and there are some important patches waiting on this one for merging (for e.g. HADOOP-4035)

        Show
        Hemanth Yamijala added a comment - I also ran contrib tests on my box, and they all passed. As the patch only touches the capacity scheduler and no classes in core, I am not running core tests. Will commit this patch now, as the patch queue is quite long, and there are some important patches waiting on this one for merging (for e.g. HADOOP-4035 )
        Hide
        Hemanth Yamijala added a comment -

        Results of test-patch:

        [exec] +1 overall.

        [exec] +1 @author. The patch does not contain any @author tags.

        [exec] +1 tests included. The patch appears to include 6 new or modified tests.

        [exec] +1 javadoc. The javadoc tool did not generate any warning messages.

        [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings.

        [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings.

        [exec] +1 Eclipse classpath. The patch retains Eclipse classpath integrity.

        Show
        Hemanth Yamijala added a comment - Results of test-patch: [exec] +1 overall. [exec] +1 @author. The patch does not contain any @author tags. [exec] +1 tests included. The patch appears to include 6 new or modified tests. [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. [exec] +1 Eclipse classpath. The patch retains Eclipse classpath integrity.
        Hide
        Hemanth Yamijala added a comment -

        The test cases in the earlier patch were too complex and too non-deterministic given the asynchronous nature of the patch. So Sreekanth and I refactored the code to enable better testing, and modified the test cases to be run in a synchronous and deterministic fashion. We are also not dependent on any sleep calls. I updated the documentation on the patch quite a bit.

        Show
        Hemanth Yamijala added a comment - The test cases in the earlier patch were too complex and too non-deterministic given the asynchronous nature of the patch. So Sreekanth and I refactored the code to enable better testing, and modified the test cases to be run in a synchronous and deterministic fashion. We are also not dependent on any sleep calls. I updated the documentation on the patch quite a bit.
        Hide
        Sreekanth Ramakrishnan added a comment -

        Sorry for the confusion with respect to test cases, attaching latest patch with changes to test case, which now waits for a condition to occur if it never occurs the test cases fail by timeout.

        Show
        Sreekanth Ramakrishnan added a comment - Sorry for the confusion with respect to test cases, attaching latest patch with changes to test case, which now waits for a condition to occur if it never occurs the test cases fail by timeout.
        Hide
        Sreekanth Ramakrishnan added a comment -

        Attaching patch with all the changes incorporating the comments mentioned.

        Show
        Sreekanth Ramakrishnan added a comment - Attaching patch with all the changes incorporating the comments mentioned.
        Hide
        Hemanth Yamijala added a comment -

        This is looking good. Some of my previous comments aren't incorporated though. If you are ok with the suggestions, can you please incorporate them. If you have any reason not to make them, please mention that here.

        • In JobInitializationPoller, the constructor is still redundantly calling super.
        • The purpose of Data structures such as jobQueues, initializedJobs and threadsToQueueMap should be documented. This is moderately complex code and documentation will help to maintain it easier.
        • In an offline discussion with Devaraj, we figured that there is no need to join these threads. Hence, I think we should make these daemon threads, and not need to join. We can have a separate JIRA to address the same issue in the EagerTaskInitializer later.
        • assignThreadsToQueue should be assignThreadsToQueues.
        • The comments on getJobsToInitialize are helpful. I think it would help even more if they are inline with the code, so one can read them while looking at the code itself.
        • In JobInitializationThread.run, the code {{ if (!startIniting) { break; }

          }} is redundant, as this is already handled above.

        • JobInitializationThread's shutdown is still called after interrupting. It should be before.
        • Documentation is needed for getSleepInterval and getMaxWorkerThreads

        I also looked at the test cases in detail. A few comments on them:

        • Broadly, we should remove the sleep calls everywhere. Instead, let us test by waiting for the first condition that we want to verify to occur. For e.g. in testJobInitializationPoller1(), after the first set of jobs is submitted, we will wait until the size of initialized jobs reaches 6. After this, the test code will verify that the correct six jobs are initialized. This way, we remove all timing issues. In case the condition never reaches, the test will timeout and fail. Hence our purpose will still be served.
        • There are a few methods which are being used exclusively for testing. Request that there be a comment on these methods that they are meant for testing. This will help others to take an extra look at this code if they want to use it in non-test code.
        • For FakeJobInProgressWithLongInit, use signalling rather than busy waiting to signal the jobs to finish initing.
        • initingJobId is not synchronized consistently.
        • instead of isIniting, a better helper API is getInitingJob(queue), and do the verification in the tests. I can think of other reasons this API can be useful, e.g. for logging.
        • Make capacity 100.0 in the tests if there's only one queue to prevent confusion. Currently it is set to 50.
        • In testJobInitializationPoller3, before you set priority of the job for u4 as high, verify it is not in the initialized list already. Also verify that the size of the initialized jobs exceeds the limit, i.e. it is 7 and not 6.
        • I think there should also be a steps to verify that compeleted and running jobs are being removed from the jobQueue in JobQueueManager.
        • Finally, can we rename the tests as testJobInitialization(), testMultiQueueJobInitialization(), testHighPriorityJobInitialization(). I think that conveys more meaning.
        Show
        Hemanth Yamijala added a comment - This is looking good. Some of my previous comments aren't incorporated though. If you are ok with the suggestions, can you please incorporate them. If you have any reason not to make them, please mention that here. In JobInitializationPoller, the constructor is still redundantly calling super. The purpose of Data structures such as jobQueues, initializedJobs and threadsToQueueMap should be documented. This is moderately complex code and documentation will help to maintain it easier. In an offline discussion with Devaraj, we figured that there is no need to join these threads. Hence, I think we should make these daemon threads, and not need to join. We can have a separate JIRA to address the same issue in the EagerTaskInitializer later. assignThreadsToQueue should be assignThreadsToQueues . The comments on getJobsToInitialize are helpful. I think it would help even more if they are inline with the code, so one can read them while looking at the code itself. In JobInitializationThread.run, the code {{ if (!startIniting) { break; } }} is redundant, as this is already handled above. JobInitializationThread's shutdown is still called after interrupting. It should be before. Documentation is needed for getSleepInterval and getMaxWorkerThreads I also looked at the test cases in detail. A few comments on them: Broadly, we should remove the sleep calls everywhere. Instead, let us test by waiting for the first condition that we want to verify to occur. For e.g. in testJobInitializationPoller1() , after the first set of jobs is submitted, we will wait until the size of initialized jobs reaches 6. After this, the test code will verify that the correct six jobs are initialized. This way, we remove all timing issues. In case the condition never reaches, the test will timeout and fail. Hence our purpose will still be served. There are a few methods which are being used exclusively for testing. Request that there be a comment on these methods that they are meant for testing. This will help others to take an extra look at this code if they want to use it in non-test code. For FakeJobInProgressWithLongInit, use signalling rather than busy waiting to signal the jobs to finish initing. initingJobId is not synchronized consistently. instead of isIniting, a better helper API is getInitingJob(queue), and do the verification in the tests. I can think of other reasons this API can be useful, e.g. for logging. Make capacity 100.0 in the tests if there's only one queue to prevent confusion. Currently it is set to 50. In testJobInitializationPoller3, before you set priority of the job for u4 as high, verify it is not in the initialized list already. Also verify that the size of the initialized jobs exceeds the limit, i.e. it is 7 and not 6. I think there should also be a steps to verify that compeleted and running jobs are being removed from the jobQueue in JobQueueManager . Finally, can we rename the tests as testJobInitialization(), testMultiQueueJobInitialization(), testHighPriorityJobInitialization(). I think that conveys more meaning.
        Hide
        Sreekanth Ramakrishnan added a comment -

        Attaching a new patch fixing an issue while assigning queues to thread, in case of spill overs the queues were not actually added to hashmap.

        Show
        Sreekanth Ramakrishnan added a comment - Attaching a new patch fixing an issue while assigning queues to thread, in case of spill overs the queues were not actually added to hashmap.
        Hide
        Sreekanth Ramakrishnan added a comment -

        Attaching the latest patch incorporating the comments from Hemanth.

        Show
        Sreekanth Ramakrishnan added a comment - Attaching the latest patch incorporating the comments from Hemanth.
        Hide
        Hemanth Yamijala added a comment -

        Some comments:

        • Please expose variables like initalizationPoller, that are needed in tests via accessor methods, and make the variable itself private.

        JobInitializationPoller:

        • Typo: Should be JobInitializationPoller and not JobInitalizationPoller. Also, same typo (initaliz..) exists for other variables also, including in capacity scheduler.
        • Need not call super() explicitly.
        • maxUserToInitialize should be maxUsersToInitialize
        • Please document the purpose of the various data structures.
        • terminate should use the logger and not print stack trace while handling InterruptedException. And it should probably break here, and exit.
        • terminate is interrupting and joining threads more than once. While this may not be a problem, it is best not to do so. The code should check if the thread is still alive before calling these APIs.
        • threadsAssignedToQueue seems to indicate more than one thread can be assigned to the same queue. Suggestions for renaming include threadsAssignedToQueues or threadsToQueuesMap. Similarly, assignThreadsToQueue.
        • Initialization of threadsAssignedToQueue can be in the constructor itself.
        • Since threadsAssignedToQueue is a member variable, we don't need to pass it to assignThreadsToQueue
        • commented line "// startIndex = ((startIndex+1)%countOfQueues);" can be removed.
        • Some comments on simplifying getJobsToInitialize
          • need more comments explaining the code. Particularly why the code is checking for total number of jobs per queue is not obvious.
          • maxJobsAllowedToInitalize should be maxJobsPerUserAllowedToInitialize
          • would the (pseudo-code) below do the same thing ? It seems like this is a bit more clear.
                  if (!isUserPresent 
                		  && userJobsInitalized.size() < maximumUsersAllowedToInitialize) {
                	  // new user and we are within limits, so initialize
                      userJobsInitalized.put(user, Integer.valueOf(1));
                      jobsToInitalize.add(job);
                      initalizedJobs.add(job.getJobID());
                      countOfJobsInitialized++;
                  } else if (isUserPresent 
                		  && numberOfJobs < maxJobsAllowedToInitalize
                		  && countOfJobsInitialized < maxJobsPerQueueToInitialize) {
                	  // existing user, we are within overall limits and per user limit
                      userJobsInitalized.put(user, Integer.valueOf(1));
                      jobsToInitalize.add(job);
                      initalizedJobs.add(job.getJobID());
                      countOfJobsInitialized++;
                  }
            
        • printJobs should be called only if the Log is enabled (LOG.isDebugEnabled)
        • When initTasks fails, we are calling job.fail. Maybe we should also notify the jobqueuemanager to remove the job from the queue ? This needs some more discussion.
        • In the worker thread's run, the continue seems redundant, as there's no other code to execute.
        • The comment on MAX_ADDITIONAL_USERS_TO_INIT can be improved a bit. Basically, we want to keep a couple of additional users jobs initialized so when their jobs need to run because of user limits, they would be ready.
        • Do we need a ConcurrentHashMap for the queue to threads mapping ? The modification is done in the addQueue method which is only done once, and all subsequent operations are read only.
        • The terminate method should be package private.
        • The worker thread's shutdown should be called before initializing
        • We should check if the stop flag (or equivalent) is set before going into Thread.sleep. Alternative is to use Thread.isInterrupted().

        JobQueueManager:

        • I am thinking removal of completed jobs from the 'jobqueue' must also be done in jobCompleted, and not from the poller. This keeps it simple to understand.

        CapacityTaskScheduler:

        • Use terminate to stop the Poller thread, rather than stop

        CapacitySchedulerConf:

        • maximum-initialized-jobs-per-user should be included in the xml file. I am thinking we should define the values for the default queue atleast. It is preferable to define the default value for all queues as well - just for other variables. This will keep the variables consistent.
        • Check for invalid (negative values) is not done for maximum-initialized-jobs-per-user.
        Show
        Hemanth Yamijala added a comment - Some comments: Please expose variables like initalizationPoller, that are needed in tests via accessor methods, and make the variable itself private. JobInitializationPoller: Typo: Should be JobInitializationPoller and not JobInitalizationPoller . Also, same typo (initaliz..) exists for other variables also, including in capacity scheduler. Need not call super() explicitly. maxUserToInitialize should be maxUsersToInitialize Please document the purpose of the various data structures. terminate should use the logger and not print stack trace while handling InterruptedException. And it should probably break here, and exit. terminate is interrupting and joining threads more than once. While this may not be a problem, it is best not to do so. The code should check if the thread is still alive before calling these APIs. threadsAssignedToQueue seems to indicate more than one thread can be assigned to the same queue. Suggestions for renaming include threadsAssignedToQueues or threadsToQueuesMap . Similarly, assignThreadsToQueue . Initialization of threadsAssignedToQueue can be in the constructor itself. Since threadsAssignedToQueue is a member variable, we don't need to pass it to assignThreadsToQueue commented line "// startIndex = ((startIndex+1)%countOfQueues);" can be removed. Some comments on simplifying getJobsToInitialize need more comments explaining the code. Particularly why the code is checking for total number of jobs per queue is not obvious. maxJobsAllowedToInitalize should be maxJobsPerUserAllowedToInitialize would the (pseudo-code) below do the same thing ? It seems like this is a bit more clear. if (!isUserPresent && userJobsInitalized.size() < maximumUsersAllowedToInitialize) { // new user and we are within limits, so initialize userJobsInitalized.put(user, Integer .valueOf(1)); jobsToInitalize.add(job); initalizedJobs.add(job.getJobID()); countOfJobsInitialized++; } else if (isUserPresent && numberOfJobs < maxJobsAllowedToInitalize && countOfJobsInitialized < maxJobsPerQueueToInitialize) { // existing user, we are within overall limits and per user limit userJobsInitalized.put(user, Integer .valueOf(1)); jobsToInitalize.add(job); initalizedJobs.add(job.getJobID()); countOfJobsInitialized++; } printJobs should be called only if the Log is enabled (LOG.isDebugEnabled) When initTasks fails, we are calling job.fail. Maybe we should also notify the jobqueuemanager to remove the job from the queue ? This needs some more discussion. In the worker thread's run, the continue seems redundant, as there's no other code to execute. The comment on MAX_ADDITIONAL_USERS_TO_INIT can be improved a bit. Basically, we want to keep a couple of additional users jobs initialized so when their jobs need to run because of user limits, they would be ready. Do we need a ConcurrentHashMap for the queue to threads mapping ? The modification is done in the addQueue method which is only done once, and all subsequent operations are read only. The terminate method should be package private. The worker thread's shutdown should be called before initializing We should check if the stop flag (or equivalent) is set before going into Thread.sleep. Alternative is to use Thread.isInterrupted(). JobQueueManager: I am thinking removal of completed jobs from the 'jobqueue' must also be done in jobCompleted, and not from the poller. This keeps it simple to understand. CapacityTaskScheduler: Use terminate to stop the Poller thread, rather than stop CapacitySchedulerConf: maximum-initialized-jobs-per-user should be included in the xml file. I am thinking we should define the values for the default queue atleast. It is preferable to define the default value for all queues as well - just for other variables. This will keep the variables consistent. Check for invalid (negative values) is not done for maximum-initialized-jobs-per-user.
        Hide
        Sreekanth Ramakrishnan added a comment -

        Attaching new patch by changing booleans to volatile, in order to address JMM issues as mentioned in JIRA HADOOP-4671

        Show
        Sreekanth Ramakrishnan added a comment - Attaching new patch by changing booleans to volatile, in order to address JMM issues as mentioned in JIRA HADOOP-4671
        Hide
        Sreekanth Ramakrishnan added a comment -

        Attaching output of ant test-patch

             [exec] +1 overall.  
             [exec] 
             [exec]     +1 @author.  The patch does not contain any @author tags.
             [exec] 
             [exec]     +1 tests included.  The patch appears to include 3 new or modified tests.
             [exec] 
             [exec]     +1 javadoc.  The javadoc tool did not generate any warning messages.
             [exec] 
             [exec]     +1 javac.  The applied patch does not increase the total number of javac compiler warnings.
             [exec] 
             [exec]     +1 findbugs.  The patch does not introduce any new Findbugs warnings.
             [exec] 
             [exec]     +1 Eclipse classpath. The patch retains Eclipse classpath integrity.
        
        Show
        Sreekanth Ramakrishnan added a comment - Attaching output of ant test-patch [exec] +1 overall. [exec] [exec] +1 @author. The patch does not contain any @author tags. [exec] [exec] +1 tests included. The patch appears to include 3 new or modified tests. [exec] [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. [exec] [exec] +1 Eclipse classpath. The patch retains Eclipse classpath integrity.
        Hide
        Sreekanth Ramakrishnan added a comment -

        Attaching patch.

        Show
        Sreekanth Ramakrishnan added a comment - Attaching patch.
        Hide
        Sreekanth Ramakrishnan added a comment -

        After off-line discussion with Hemanth and Vivek, following is the proposal for implementing asynchronous initialization of jobs by capacity Scheduler:

        • Modify CapacityTaskScheduler to look only at the Run-queue maintained by JobQueueManager. This queue contains all initialized jobs.
        • Modify JobQueueManager to change semantics of waiting job queue to a list of jobs which with are waiting to be scheduled. Please note that when a job is waiting to be scheduled it means, that there is a possibility that a Job J1 would be in both running and job queue at same time. When the first map or reduce of the job is scheduled, the job would be removed from the job queue which JobQueueManager maintains.
        • Introduce a new poller class, which looks at the JobQueueManager.getJobs(queue) and picks up tasks to initialize for that queue.
        • Following will be parameters which would be parameters which would be used for selecting jobs for eager initialization:
          • Maximum jobs which can be initialized per user. This would be a configuration parameter which would be introduced in capacity_scheduler.xml
          • Number of concurrent users supported by the queue, so the initialization poller would initialize ((userlimits/100) + 2 ) user jobs.
        • The selected jobs would be passed on to worker threads, which can be assigned duty of initializing jobs from one or more queues.
        • The worker thread maintains separate lists for jobs from different queues sorted by priority as same as JobQueueManager
        • The worker thread then initializes the jobs from queues in a round robin fashion amongst the job queues assigned to it, i.e. it initializes first job from q1 and then first job from q2.

        Illustration:

        Consider a job queue : q which can support one con-current user (i.e. userlimits = 100%). Three users U1,U2,U3 are submittign jobs in following distribution:

        Maximum number of jobs to be initialized per user : 2

        J1U1,J2U1,J3U1,J4U1,J1U2,J2U2,J3U3,J4U4,J1U3,J2U3,J3U3,J4U3.

        Jobs initialized by the Initialization threads would be:

        J1U1,J2U1,J1U2,J2U2,J1U3,J2U3.

        And all these are just initialized but not scheduled and a User U4 submits a very high priority Job and a normal priority, so our job queue in t+1 instance would look like :

        J1U4,J1U1,J2U1,J3U1,J4U1,J1U2,J2U2,J3U3,J4U4,J1U3,J2U3,J3U3,J4U3,J2U4.

        So next iteration poller would have initialized following :

        J1U4,J1U1,J2U1,J1U2,J2U2,J1U3,J2U3.

        Please note that U4's second job would not be initialized.

        If user1 had submitted the very high priority Job then he would be crossing the maximum limit of jobs which are allowed to be initialized per user.

        In above example if J1U1 is a job which takes long initialization time, the next job to be initialized would be the next highest priority or highest priority jobs (if the job is submitted late as above example).

        Any thoughts on the above approach?

        Show
        Sreekanth Ramakrishnan added a comment - After off-line discussion with Hemanth and Vivek, following is the proposal for implementing asynchronous initialization of jobs by capacity Scheduler: Modify CapacityTaskScheduler to look only at the Run-queue maintained by JobQueueManager . This queue contains all initialized jobs. Modify JobQueueManager to change semantics of waiting job queue to a list of jobs which with are waiting to be scheduled. Please note that when a job is waiting to be scheduled it means, that there is a possibility that a Job J1 would be in both running and job queue at same time. When the first map or reduce of the job is scheduled, the job would be removed from the job queue which JobQueueManager maintains. Introduce a new poller class, which looks at the JobQueueManager.getJobs(queue) and picks up tasks to initialize for that queue. Following will be parameters which would be parameters which would be used for selecting jobs for eager initialization: Maximum jobs which can be initialized per user. This would be a configuration parameter which would be introduced in capacity_scheduler.xml Number of concurrent users supported by the queue, so the initialization poller would initialize ((userlimits/100) + 2 ) user jobs. The selected jobs would be passed on to worker threads, which can be assigned duty of initializing jobs from one or more queues. The worker thread maintains separate lists for jobs from different queues sorted by priority as same as JobQueueManager The worker thread then initializes the jobs from queues in a round robin fashion amongst the job queues assigned to it, i.e. it initializes first job from q1 and then first job from q2. Illustration: Consider a job queue : q which can support one con-current user (i.e. userlimits = 100%). Three users U1,U2,U3 are submittign jobs in following distribution: Maximum number of jobs to be initialized per user : 2 J1U1,J2U1,J3U1,J4U1,J1U2,J2U2,J3U3,J4U4,J1U3,J2U3,J3U3,J4U3. Jobs initialized by the Initialization threads would be: J1U1,J2U1,J1U2,J2U2,J1U3,J2U3. And all these are just initialized but not scheduled and a User U4 submits a very high priority Job and a normal priority, so our job queue in t+1 instance would look like : J1U4,J1U1,J2U1,J3U1,J4U1,J1U2,J2U2,J3U3,J4U4,J1U3,J2U3,J3U3,J4U3,J2U4. So next iteration poller would have initialized following : J1U4,J1U1,J2U1,J1U2,J2U2,J1U3,J2U3. Please note that U4's second job would not be initialized. If user1 had submitted the very high priority Job then he would be crossing the maximum limit of jobs which are allowed to be initialized per user. In above example if J1U1 is a job which takes long initialization time, the next job to be initialized would be the next highest priority or highest priority jobs (if the job is submitted late as above example). Any thoughts on the above approach?
        Hide
        Vivek Ratan added a comment -

        We should have a thread (call it the init thread) within the Capacity Scheduler that initializes jobs (i.e., calls initTasks() on jobs even when the job is not ready to run, as per the scheduler). The following features are desirable:

        • We want pre-initialized jobs in all the queues, since the Scheduler can possibly look at all queues between consecutive runs of this thread.
        • Since initTasks() may take a fair bit of time for some jobs, we don't want to block on this call as much as possible.
        • Remember that the init thread is really predicting which jobs the scheduler will run in the next many heartbeats. It's hard to make this prediction as the decision of which job gets a free slot depends on the state of the running jobs at that very instant (to compute user quotas, for example). But there are some constraints that the thread can consider:
          • the waiting queue is sorted, and the scheduler considers jobs in that sort order. So the init thread should pre-initialize jobs in the same order.
          • User quotas are tricky. Suppose a queue supports N concurrent users. The init thread should really pre-initialize jobs from these N users only, as described earlier. But this list of N users can change often, depending on which jobs from which users finish. So, to be safe, the init thread should pre-initialize jobs from, say N+2 users, to be safe.
        • Since we're keeping the run-queue sorted (HADOOP-4471), and that jobs are moved from the wait queue to the run queue once initTasks is called and the setup task is complete, the Scheduler only needs to look at jobs in its run queue. If it doesn't find any job that can accept a task, it moves on to the next queue.
        • If the init thread runs periodically, there may be a brief startup delay. Suppose the thread runs every 5 seconds. Suppose a job is submitted to an empty queue just after the thread runs. That job will not be initialized till the init thread runs next (a max delay of 5 secs). This is not too bad, given that there will be a delay in running the setup task of that job too. Worst case, the Scheduler can always cause the init thread to run as soon as a job is submitted to an empty queue, but this may be an overkill.
        Show
        Vivek Ratan added a comment - We should have a thread (call it the init thread) within the Capacity Scheduler that initializes jobs (i.e., calls initTasks() on jobs even when the job is not ready to run, as per the scheduler). The following features are desirable: We want pre-initialized jobs in all the queues, since the Scheduler can possibly look at all queues between consecutive runs of this thread. Since initTasks() may take a fair bit of time for some jobs, we don't want to block on this call as much as possible. Remember that the init thread is really predicting which jobs the scheduler will run in the next many heartbeats. It's hard to make this prediction as the decision of which job gets a free slot depends on the state of the running jobs at that very instant (to compute user quotas, for example). But there are some constraints that the thread can consider: the waiting queue is sorted, and the scheduler considers jobs in that sort order. So the init thread should pre-initialize jobs in the same order. User quotas are tricky. Suppose a queue supports N concurrent users. The init thread should really pre-initialize jobs from these N users only, as described earlier. But this list of N users can change often, depending on which jobs from which users finish. So, to be safe, the init thread should pre-initialize jobs from, say N+2 users, to be safe. Since we're keeping the run-queue sorted ( HADOOP-4471 ), and that jobs are moved from the wait queue to the run queue once initTasks is called and the setup task is complete, the Scheduler only needs to look at jobs in its run queue. If it doesn't find any job that can accept a task, it moves on to the next queue. If the init thread runs periodically, there may be a brief startup delay. Suppose the thread runs every 5 seconds. Suppose a job is submitted to an empty queue just after the thread runs. That job will not be initialized till the init thread runs next (a max delay of 5 secs). This is not too bad, given that there will be a delay in running the setup task of that job too. Worst case, the Scheduler can always cause the init thread to run as soon as a job is submitted to an empty queue, but this may be an overkill.
        Hide
        Hemanth Yamijala added a comment -

        Some more information on the proposal above, based on my discussion with Vivek.

        The limits on the initialized jobs are for waiting jobs only.

        This means that we do not count jobs that are already running (and therefore, init'ed) in applying the limits. In that sense, it is easier for me to think about the limit as analogous to a cache pre-fetch limit, rather than a cap on the number of init'ed jobs. Maybe we should call this something like mapred.capacity-scheduler.queue.queue-name.max-waiting-jobs-inited-per-user.

        So it doesn't make sense to have a per-queue limit on the total number of initialized jobs. Having such a limit can actually cause incorrect behavior, as this pre-configured limit may be small enough to prevent initialization of jobs from one or more users.

        To illustrate this point, suppose we had such a limit as 5 jobs in the example above, then we would never end up initializing any job from the 4th user. Hence though by virtue of user limits, he could have run, as the job is not inited until one of the other jobs becomes running, he does not. Even worse, if there are more jobs from the first three users ahead of the queue, he would have to wait until all of them become running before this job can run. This could take quite a while.

        Ideally, the thread would un-initialize one of the 2 previously jobs. This is a nice optimization, but we probably don't need it right away.

        Reversing the initialization of a job looks like a good option to think about.

        Show
        Hemanth Yamijala added a comment - Some more information on the proposal above, based on my discussion with Vivek. The limits on the initialized jobs are for waiting jobs only. This means that we do not count jobs that are already running (and therefore, init'ed) in applying the limits. In that sense, it is easier for me to think about the limit as analogous to a cache pre-fetch limit, rather than a cap on the number of init'ed jobs. Maybe we should call this something like mapred.capacity-scheduler.queue.queue-name.max-waiting-jobs-inited-per-user . So it doesn't make sense to have a per-queue limit on the total number of initialized jobs. Having such a limit can actually cause incorrect behavior, as this pre-configured limit may be small enough to prevent initialization of jobs from one or more users. To illustrate this point, suppose we had such a limit as 5 jobs in the example above, then we would never end up initializing any job from the 4th user. Hence though by virtue of user limits, he could have run, as the job is not inited until one of the other jobs becomes running, he does not. Even worse, if there are more jobs from the first three users ahead of the queue, he would have to wait until all of them become running before this job can run. This could take quite a while. Ideally, the thread would un-initialize one of the 2 previously jobs. This is a nice optimization, but we probably don't need it right away. Reversing the initialization of a job looks like a good option to think about.
        Hide
        Vivek Ratan added a comment -

        Some details.

        The limits on the initialized jobs are for waiting jobs only. Because of user quotas, we actually need only one limit: the # of initialized (waiting) jobs per user. That number should probably be 1, 2 or 3. Let's assume it's 2. User quotas decide how many concurrent users the queue can support at a given time, in terms of running jobs. If the user quota is 25%, for example, the queue can run jobs from up to 4 users. Suppose there are waiting jobs from 4 or more users. Then, we need to asynchronously initialize the first 2 waiting jobs from each user, for a total of 8 jobs. That's because any waiting job that runs next will come from one of these 8 jobs. If only 2 users have waiting jobs, then we just need to asynchronously initialize 2 jobs from each of these 2 users. So it doesn't make sense to have a per-queue limit on the total number of initialized jobs. Having such a limit can actually cause incorrect behavior, as this pre-configured limit may be small enough to prevent initialization of jobs from one or more users.

        Note also that because jobs can shift their position in the wait queue because of priorities, and that jobs can complete between the interval that this init thread (which is handling asynchronous inits) run, the total number of initialized jobs at any given time may be higher than what the limits specify. As an example, consider a limit of 2 jobs/user. Suppose three users have submitted jobs that are waiting. Our thread will initialize 6 jobs, two each from each of the three users. Now suppose that one of the user submits a high priority job which jumps to the head of the wait queue. The next time our init thread runs, it will have to initialize this high priority job, even though the user already has two jobs initialized. Ideally, the thread would un-initialize one of the 2 previously jobs. This is a nice optimization, but we probably don't need it right away.

        Show
        Vivek Ratan added a comment - Some details. The limits on the initialized jobs are for waiting jobs only. Because of user quotas, we actually need only one limit: the # of initialized (waiting) jobs per user. That number should probably be 1, 2 or 3. Let's assume it's 2. User quotas decide how many concurrent users the queue can support at a given time, in terms of running jobs. If the user quota is 25%, for example, the queue can run jobs from up to 4 users. Suppose there are waiting jobs from 4 or more users. Then, we need to asynchronously initialize the first 2 waiting jobs from each user, for a total of 8 jobs. That's because any waiting job that runs next will come from one of these 8 jobs. If only 2 users have waiting jobs, then we just need to asynchronously initialize 2 jobs from each of these 2 users. So it doesn't make sense to have a per-queue limit on the total number of initialized jobs. Having such a limit can actually cause incorrect behavior, as this pre-configured limit may be small enough to prevent initialization of jobs from one or more users. Note also that because jobs can shift their position in the wait queue because of priorities, and that jobs can complete between the interval that this init thread (which is handling asynchronous inits) run, the total number of initialized jobs at any given time may be higher than what the limits specify. As an example, consider a limit of 2 jobs/user. Suppose three users have submitted jobs that are waiting. Our thread will initialize 6 jobs, two each from each of the three users. Now suppose that one of the user submits a high priority job which jumps to the head of the wait queue. The next time our init thread runs, it will have to initialize this high priority job, even though the user already has two jobs initialized. Ideally, the thread would un-initialize one of the 2 previously jobs. This is a nice optimization, but we probably don't need it right away.
        Hide
        Vivek Ratan added a comment -

        Yes, we need to make sure jobs are initialized asynchronously (so that initTasks() is not called synchronously from within a heartbeat) and as early as possible (so that a job is already initialized when we consider it to run). We also want to have just a few number of waiting jobs initialized at any given time so that their memory footprint is low. I suggest we use an enhanced version of EagerTaskInitializationListener, so that jobs are initialized asynchronously in a separate thread. The difference being, we use some of the limits described in HADOOP-4428. We can have a limit on the total number of waiting jobs initialized (maybe 10 per queue), as well a limit on initialized jobs/user/queue (maybe 3/per/queue). The modified EagerTaskInitializationListener thread enforces these limits and only initializes jobs as necessary.

        Show
        Vivek Ratan added a comment - Yes, we need to make sure jobs are initialized asynchronously (so that initTasks() is not called synchronously from within a heartbeat) and as early as possible (so that a job is already initialized when we consider it to run). We also want to have just a few number of waiting jobs initialized at any given time so that their memory footprint is low. I suggest we use an enhanced version of EagerTaskInitializationListener, so that jobs are initialized asynchronously in a separate thread. The difference being, we use some of the limits described in HADOOP-4428 . We can have a limit on the total number of waiting jobs initialized (maybe 10 per queue), as well a limit on initialized jobs/user/queue (maybe 3/per/queue). The modified EagerTaskInitializationListener thread enforces these limits and only initializes jobs as necessary.

          People

          • Assignee:
            Sreekanth Ramakrishnan
            Reporter:
            Hemanth Yamijala
          • Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development