Hadoop Common
  1. Hadoop Common
  2. HADOOP-3412

Refactor the scheduler out of the JobTracker

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Minor Minor
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.19.0
    • Component/s: None
    • Labels:
      None
    • Hadoop Flags:
      Reviewed
    • Release Note:
      Added the ability to chose between many schedulers, and to limit the number of running tasks per job.

      Description

      First I would like warn you that my proposition is assumed to be very naive. I just hope that reading it won't make you lose time.

      The aim

      It seems to me that improving Hadoop scheduling could be very profitable. But, it is hard to implement and compare schedulers, because the scheduling logic is mixed within the rest of the JobTracker.
      This bug is the first step of an attempt to improve the Hadoop scheduler. It re-implements the current scheduling algorithm in a separate class called JobScheduler. This new class is instantiated in the JobTracker.

      Bug fixed as a side effects

      This patch probably cannot be submited as it is.
      A first difficulty is that it does not have exactly the same behaviour than the current JobTracker. More precisely, it doesn't re-implement things like code that seems to be never called or concurency problems.
      I wrote TOCONFIRM where my proposition differ from the current implementation, so you can find them easily.
      I know that fixing bugs silently is bad. So, independently of what you decide about this patch, I will open issues for bugs that you confirm.

      Other side effects

      Another side effect of this patch is to add documentation about each step of the scheduling. I hope that it will help future improvement by lowering the level required to contribute to the scheduler.
      It also reduces the complexity and the granularity of the JobTracker (making it more parallel).

      The future

      If you feel that this is a step the right direction, I will try to propose a JobSchedulerInterface that many JobSchedulers could implement and to propose alternatives to the current « FifoJobScheduler ». If some of you have ideas about that please tell ^^ I will also open issues for things marked as FIXME in the patch.

      1. JobScheduler-v12.1.patch
        67 kB
        Tom White
      2. JobScheduler-v12.patch
        67 kB
        Tom White
      3. JobScheduler-v11.patch
        60 kB
        Tom White
      4. JobScheduler-v10.patch
        46 kB
        Tom White
      5. JobScheduler-v9.2.patch
        29 kB
        Matei Zaharia
      6. JobScheduler-v9.1.patch
        37 kB
        Tom White
      7. JobScheduler-v9.patch
        37 kB
        Tom White
      8. JobScheduler_v8.patch
        57 kB
        Brice Arnould
      9. JobScheduler_v7.1.patch
        52 kB
        Brice Arnould
      10. JobScheduler_v7.patch
        50 kB
        Brice Arnould
      11. JobScheduler_v6.4.patch
        49 kB
        Brice Arnould
      12. JobScheduler_v6.3.patch
        50 kB
        Brice Arnould
      13. SimpleResourceAwareJobScheduler.java
        8 kB
        Brice Arnould
      14. JobScheduler_v6.2.patch
        46 kB
        Brice Arnould
      15. JobScheduler_v6.1.patch
        41 kB
        Brice Arnould
      16. JobScheduler_v6.patch
        40 kB
        Brice Arnould
      17. JobScheduler_v5.patch
        40 kB
        Brice Arnould
      18. RackAwareJobScheduler.java
        2 kB
        Brice Arnould
      19. JobScheduler_v4.patch
        37 kB
        Brice Arnould
      20. JobScheduler_v3b.patch
        39 kB
        Brice Arnould
      21. JobScheduler_v3.patch
        39 kB
        Brice Arnould
      22. JobScheduler_v2.patch
        34 kB
        Brice Arnould
      23. JobScheduler.patch
        30 kB
        Brice Arnould

        Issue Links

          Activity

          Hide
          Brice Arnould added a comment -

          This new version makes a few more changes in the JobTracker, but allows the JobScheduler to use any container to store jobs (so we can have e.g. per-user queues) and have a lesser asymptotic complexity. It is yet untested.

          Show
          Brice Arnould added a comment - This new version makes a few more changes in the JobTracker, but allows the JobScheduler to use any container to store jobs (so we can have e.g. per-user queues) and have a lesser asymptotic complexity. It is yet untested.
          Hide
          Doug Cutting added a comment -

          This sounds like a good approach to me. It might make sense to implement two schedulers as a part of the patch, to validate the abstraction, one that's reproduces current FIFO, and one that, e.g., limits tasks per job (to better timeshare the cluster, HADOOP-2573).

          Show
          Doug Cutting added a comment - This sounds like a good approach to me. It might make sense to implement two schedulers as a part of the patch, to validate the abstraction, one that's reproduces current FIFO, and one that, e.g., limits tasks per job (to better timeshare the cluster, HADOOP-2573 ).
          Hide
          Brice Arnould added a comment -

          I initially intended to propose the generic interface and some other schedulers as other issues, but I don't mind fixing #2573 -_^
          So this new version adds a new FifoScheduler that takes in account a configurable maximum limit on the number of running tasks per job, and an Interface that characterise JobSchedulers.

          If you want me to, I'll write a proof-of-concept "RackScheduler" that try to group tasks belonging to the same job on low number of racks. So we can ensure that this interface can be used to implements other things than limitations (and in all cases I will work on a more efficient scheduler that takes in account locality).

          Show
          Brice Arnould added a comment - I initially intended to propose the generic interface and some other schedulers as other issues, but I don't mind fixing #2573 -_^ So this new version adds a new FifoScheduler that takes in account a configurable maximum limit on the number of running tasks per job, and an Interface that characterise JobSchedulers. If you want me to, I'll write a proof-of-concept "RackScheduler" that try to group tasks belonging to the same job on low number of racks. So we can ensure that this interface can be used to implements other things than limitations (and in all cases I will work on a more efficient scheduler that takes in account locality).
          Hide
          Brice Arnould added a comment -

          An updated patch that build on trunk

          Show
          Brice Arnould added a comment - An updated patch that build on trunk
          Hide
          Brice Arnould added a comment -

          This scheduler is more a proof of a concept for now. It tries to assign to each Job a reduce set of Racks. Appart from that, it is a FifoJobScheduler.

          Show
          Brice Arnould added a comment - This scheduler is more a proof of a concept for now. It tries to assign to each Job a reduce set of Racks. Appart from that, it is a FifoJobScheduler.
          Hide
          Brice Arnould added a comment -

          This patch do the TODOs and add the JobTracker as an argument to the constructors of JobSchedulers, so they can have all the informations they might need.
          I think that it is now ready.

          Show
          Brice Arnould added a comment - This patch do the TODOs and add the JobTracker as an argument to the constructors of JobSchedulers, so they can have all the informations they might need. I think that it is now ready.
          Hide
          Brice Arnould added a comment -

          (The proof of concept is now much simpler, thanks to the topology service of bug HADOOP-1985 )

          Show
          Brice Arnould added a comment - (The proof of concept is now much simpler, thanks to the topology service of bug HADOOP-1985 )
          Hide
          Ari Rabkin added a comment -

          I agree that the scheduler is currently excessively difficult to modify and reason about. Something like this could be very useful in scheduler development. I have a couple thought about the details your patch.

          Why is the scheduler responsible for managing the task tracker statuses? Shouldn't that stay in JobTracker? Though you'd still need updateTaskTrackerStatus() in the JobScheduler so that the scheduler can do something smart if there's been a change in status.

          It might be wise to add an initialize() to the JobScheduler interface so that JobSchedulers can be written using only the default constructor. This would make it easier to push the choice of scheduler into a config file; you'd just list a class name and the system could use reflection to load the scheduler and start it.

          For which structures do you intend getLockOnJobs() to return a lock? Saying that the scheduler "won't move anything" is a little open-ended. I don't see why the scheduler would ever need to write to externally-visible structures.

          Show
          Ari Rabkin added a comment - I agree that the scheduler is currently excessively difficult to modify and reason about. Something like this could be very useful in scheduler development. I have a couple thought about the details your patch. Why is the scheduler responsible for managing the task tracker statuses? Shouldn't that stay in JobTracker? Though you'd still need updateTaskTrackerStatus() in the JobScheduler so that the scheduler can do something smart if there's been a change in status. It might be wise to add an initialize() to the JobScheduler interface so that JobSchedulers can be written using only the default constructor. This would make it easier to push the choice of scheduler into a config file; you'd just list a class name and the system could use reflection to load the scheduler and start it. For which structures do you intend getLockOnJobs() to return a lock? Saying that the scheduler "won't move anything" is a little open-ended. I don't see why the scheduler would ever need to write to externally-visible structures.
          Hide
          Brice Arnould added a comment -

          Why is the scheduler responsible for managing the task tracker statuses? Shouldn't that stay in JobTracker? Though you'd still need updateTaskTrackerStatus() in the JobScheduler so that the scheduler can do something smart if there's been a change in status.

          You're right. My aim was to move inside the JobScheduler the two structures we might want to reorder in order to select tasks in a more efficient way. I told myself that it would be a waste to keep two copies of the same collections (the unordered in the JobTracker, the ordered in the JobScheduler). But on the other hand, it can make the code more verbose on simple algorithms.
          So I propose to write an AbstractJobScheduler with default implementation for

          {remove,update}

          TaskTracker (and maybe also jobs, if it seems reasonable). This way, the schedulers who just need to change addJob, removeJob and assignTask will be able to reuse a common basis.

          It might be wise to add an initialize() to the JobScheduler interface so that JobSchedulers can be written using only the default constructor. This would make it easier to push the choice of scheduler into a config file; you'd just list a class name and the system could use reflection to load the scheduler and start it.

          The latest version of the patch (v4) already use reflection to load a scheduler according to the configuration file. I'm really unsure about the name I choose for the option however ("mapred.jobtracker.scheduler").
          I didn't used Utils.Reflection because it seems to be restricted by design to objects with a single argument of type Configuration. Moreover, I like better constructor that return objects "ready for use" :-P. But if you feel that it doesn't respect the Hadoop code style, I can change that, it's not a problem.

          For which structures do you intend getLockOnJobs() to return a lock? Saying that the scheduler "won't move anything" is a little open-ended. I don't see why the scheduler would ever need to write to externally-visible structures.

          This description really needs some clarification. getLocksOnJobs() give you the insurance that every operation on internally stored jobs, namely addJob, removeJob and assignTask, will wait for you to release the lock before to alter the list. It's intended use is that a user of this class intending to alter a Job do it in that way :
          synchronize (scheduler.getLocksOnJobs())

          { scheduler.remove (job); [Modify job] scheduler.add(job); }

          I told myself that in this way (add() then remove()), the asymptotic complexity with most containers would be lesser than it would be if I provided a notifyJobModification() which ressorted everything (my original intention).

          Thanks for your comments, I'm going to post soon a new version of this patch that will take them in account. Please tell me if you see other errors in that one.

          PS: Please excuse me for my English. If I said something misplaced or impolite, it is not by intention.

          Show
          Brice Arnould added a comment - Why is the scheduler responsible for managing the task tracker statuses? Shouldn't that stay in JobTracker? Though you'd still need updateTaskTrackerStatus() in the JobScheduler so that the scheduler can do something smart if there's been a change in status. You're right. My aim was to move inside the JobScheduler the two structures we might want to reorder in order to select tasks in a more efficient way. I told myself that it would be a waste to keep two copies of the same collections (the unordered in the JobTracker, the ordered in the JobScheduler). But on the other hand, it can make the code more verbose on simple algorithms. So I propose to write an AbstractJobScheduler with default implementation for {remove,update} TaskTracker (and maybe also jobs, if it seems reasonable). This way, the schedulers who just need to change addJob, removeJob and assignTask will be able to reuse a common basis. It might be wise to add an initialize() to the JobScheduler interface so that JobSchedulers can be written using only the default constructor. This would make it easier to push the choice of scheduler into a config file; you'd just list a class name and the system could use reflection to load the scheduler and start it. The latest version of the patch (v4) already use reflection to load a scheduler according to the configuration file. I'm really unsure about the name I choose for the option however ("mapred.jobtracker.scheduler"). I didn't used Utils.Reflection because it seems to be restricted by design to objects with a single argument of type Configuration. Moreover, I like better constructor that return objects "ready for use" :-P. But if you feel that it doesn't respect the Hadoop code style, I can change that, it's not a problem. For which structures do you intend getLockOnJobs() to return a lock? Saying that the scheduler "won't move anything" is a little open-ended. I don't see why the scheduler would ever need to write to externally-visible structures. This description really needs some clarification. getLocksOnJobs() give you the insurance that every operation on internally stored jobs, namely addJob, removeJob and assignTask, will wait for you to release the lock before to alter the list. It's intended use is that a user of this class intending to alter a Job do it in that way : synchronize (scheduler.getLocksOnJobs()) { scheduler.remove (job); [Modify job] scheduler.add(job); } I told myself that in this way (add() then remove()), the asymptotic complexity with most containers would be lesser than it would be if I provided a notifyJobModification() which ressorted everything (my original intention). Thanks for your comments, I'm going to post soon a new version of this patch that will take them in account. Please tell me if you see other errors in that one. PS: Please excuse me for my English. If I said something misplaced or impolite, it is not by intention.
          Hide
          Brice Arnould added a comment -

          A new version addressing issues raised by Ari Rabkin, and fix another bug that would have occured if a subclass redefined getLockOnJobs().

          Show
          Brice Arnould added a comment - A new version addressing issues raised by Ari Rabkin, and fix another bug that would have occured if a subclass redefined getLockOnJobs().
          Hide
          Matei Zaharia added a comment -

          I'd like to mention one other thing that it might be useful to think about: Letting the pluggable scheduler control speculative execution. Currently, the scheduler asks each job for a map or reduce task using obtainNewMapTask or obtainNewReduceTask. These may return a speculative task based on some fixed thresholds. It would make more sense to have the JobInProgress expose only a list of task statuses, and let the scheduler decide which ones to speculate. In effect, move obtainNewMapTask and obtainNewReduceTask into the scheduler (have them be utility methods in AbstractJobScheduler). This would consolidate all scheduling code into one class, making it easier to understand and modify. It's fairly clear that there's no "one size fits all" solution for speculation, and users often disable it, change the thresholds, etc, so it makes sense to make the entire speculation algorithm pluggable.

          This is not a critical problem because a JobScheduler implementation can choose to not call obtainNewMapTask / obtainNewReduceTask and just look at the JobInProgress and select tasks by itself. It's just something that would make the code more understandable by consolidating scheduling decisions in one place.

          We might also consider making the speculative execution scheduler a separate pluggable object, so that multiple JobSchedulers can reuse it. Thus we might create a TaskScheduler interface (which would just contain obtainNewMapTask() and obtainNewReduceTask()), and a default implementation that uses the current threshold-based algorithm. AbstractJobScheduler could then look up which TaskScheduler to use in the config file. However, the most important thing is to make sure that the JobScheduler has a narrow interface (addJob, removeJob and assignTask), because then people can write this type of thing behind it.

          Show
          Matei Zaharia added a comment - I'd like to mention one other thing that it might be useful to think about: Letting the pluggable scheduler control speculative execution. Currently, the scheduler asks each job for a map or reduce task using obtainNewMapTask or obtainNewReduceTask. These may return a speculative task based on some fixed thresholds. It would make more sense to have the JobInProgress expose only a list of task statuses, and let the scheduler decide which ones to speculate. In effect, move obtainNewMapTask and obtainNewReduceTask into the scheduler (have them be utility methods in AbstractJobScheduler). This would consolidate all scheduling code into one class, making it easier to understand and modify. It's fairly clear that there's no "one size fits all" solution for speculation, and users often disable it, change the thresholds, etc, so it makes sense to make the entire speculation algorithm pluggable. This is not a critical problem because a JobScheduler implementation can choose to not call obtainNewMapTask / obtainNewReduceTask and just look at the JobInProgress and select tasks by itself. It's just something that would make the code more understandable by consolidating scheduling decisions in one place. We might also consider making the speculative execution scheduler a separate pluggable object, so that multiple JobSchedulers can reuse it. Thus we might create a TaskScheduler interface (which would just contain obtainNewMapTask() and obtainNewReduceTask()), and a default implementation that uses the current threshold-based algorithm. AbstractJobScheduler could then look up which TaskScheduler to use in the config file. However, the most important thing is to make sure that the JobScheduler has a narrow interface (addJob, removeJob and assignTask), because then people can write this type of thing behind it.
          Hide
          Vivek Ratan added a comment -

          This fix, together with HADOOP-3445, should greatly enhance the JT.

          Show
          Vivek Ratan added a comment - This fix, together with HADOOP-3445 , should greatly enhance the JT.
          Hide
          Brice Arnould added a comment -

          This new proposition (v6) factorises more things in the AbstractJobScheduler.

          Show
          Brice Arnould added a comment - This new proposition (v6) factorises more things in the AbstractJobScheduler.
          Hide
          Brice Arnould added a comment -

          Is there other issues that prevent this patch from being merged ? I'm willing to do the required changes.

          Show
          Brice Arnould added a comment - Is there other issues that prevent this patch from being merged ? I'm willing to do the required changes.
          Hide
          Owen O'Malley added a comment -

          I think we need more time to review this. It looks like good, but I don't think we can get it properly review this afternoon. I'm pushing it to 0.19.

          Show
          Owen O'Malley added a comment - I think we need more time to review this. It looks like good, but I don't think we can get it properly review this afternoon. I'm pushing it to 0.19.
          Hide
          Andy Konwinski added a comment -

          I don't think that in java 1.5 the @override annotation is supported for use with methods which are defined in an interface which the class implements. It seems that this behavior is supported in java 1.6, but in 1.5 @override is supposed to be used only when you override a method that is defined in your superclass. Thus when i run ant with this patch applied, i get errors. Everything still needs to compile with 1.5 as of now, so I guess we should remove these @override annotations.

          Show
          Andy Konwinski added a comment - I don't think that in java 1.5 the @override annotation is supported for use with methods which are defined in an interface which the class implements. It seems that this behavior is supported in java 1.6, but in 1.5 @override is supposed to be used only when you override a method that is defined in your superclass. Thus when i run ant with this patch applied, i get errors. Everything still needs to compile with 1.5 as of now, so I guess we should remove these @override annotations.
          Hide
          Brice Arnould added a comment -

          You're right, here is the updated patch.
          It also updates the paths and some gramatical errors in the documentation.

          Show
          Brice Arnould added a comment - You're right, here is the updated patch. It also updates the paths and some gramatical errors in the documentation.
          Hide
          Tom White added a comment -

          This looks good. A few comments:

          1. You can change AbstractJobScheduler and FifoJobScheduler's constructors to be no-arg constructors. The JobTracker isn't being used, and if you make AbstractJobScheduler implement Configurable, the the Configuration will be automatically called if you use ReflectionUtils#newInstance (this is the standard Hadoop pattern).

          2. This needs some unit tests. It should be possible to write a unit test for FifoJobScheduler since it is relatively isolated.

          3. In AbstractJobScheduler#assignTask you move to the next step if the following condition is true:

          (step == 0 || step == 2) && mapTasksNumber > maximumMapLoad
                      || (step == 1 || step == 3) && reduceTasksNumber > maximumReduceLoad
          

          Should these checks be >= (greater than or equal to) so the maximum load isn't exceeded?

          4. Rather than having AbstractJobScheduler manage maxTasksPerJob, it might be better to have a subclass of FifoJobScheduler that limits tasks per job (TaskLimitedFifoJobScheduler). Or if this is possible through composition rather than inheritance then even better. Then FifoJobScheduler would then just preserve the current behaviour.

          Show
          Tom White added a comment - This looks good. A few comments: 1. You can change AbstractJobScheduler and FifoJobScheduler's constructors to be no-arg constructors. The JobTracker isn't being used, and if you make AbstractJobScheduler implement Configurable, the the Configuration will be automatically called if you use ReflectionUtils#newInstance (this is the standard Hadoop pattern). 2. This needs some unit tests. It should be possible to write a unit test for FifoJobScheduler since it is relatively isolated. 3. In AbstractJobScheduler#assignTask you move to the next step if the following condition is true: (step == 0 || step == 2) && mapTasksNumber > maximumMapLoad || (step == 1 || step == 3) && reduceTasksNumber > maximumReduceLoad Should these checks be >= (greater than or equal to) so the maximum load isn't exceeded? 4. Rather than having AbstractJobScheduler manage maxTasksPerJob, it might be better to have a subclass of FifoJobScheduler that limits tasks per job (TaskLimitedFifoJobScheduler). Or if this is possible through composition rather than inheritance then even better. Then FifoJobScheduler would then just preserve the current behaviour.
          Hide
          Brice Arnould added a comment -

          Tom White :

          1. You can change AbstractJobScheduler and FifoJobScheduler's constructors to be no-arg constructors. The JobTracker isn't being used, and if you make AbstractJobScheduler implement Configurable, the the Configuration will be automatically called if you use ReflectionUtils#newInstance (this is the standard Hadoop pattern).

          I started using that pattern (up to v4), but I told myself that it could be nice to give to the scheduler an access to the topology. It's true however that the FifoJobScheduler do not use it, so I can remove it if you want.

          2. This needs some unit tests. It should be possible to write a unit test for FifoJobScheduler since it is relatively isolated.

          A really good idea ! I found a bug writing the tests :-P (that bug occured when two job of the same priority were added at the same time).
          This test tries to circumvent the need of starting a whole cluster by adding a protected constructor to JobInProgress, used by a class called FakeJobInProgress. I really don't like the idea of adding a constructor to ease the write of tests, but all other ideas I had seemed uglier. Suggestions are welcome ^^

          3. In AbstractJobScheduler#assignTask you move to the next step if the following condition is true:

          (step == 0 || step == 2) && mapTasksNumber > maximumMapLoad
                      || (step == 1 || step == 3) && reduceTasksNumber > maximumReduceLoad

          Should these checks be >= (greater than or equal to) so the maximum load isn't exceeded?

          Ooops
          Fixed ^^

          4. Rather than having AbstractJobScheduler manage maxTasksPerJob, it might be better to have a subclass of FifoJobScheduler that limits tasks per job (TaskLimitedFifoJobScheduler). Or if this is possible through composition rather than inheritance then even better. Then FifoJobScheduler would then just preserve the current behaviour.

          My argument in favor of keeping that in the AbstractJobScheduler is that, when I implemented other schedulers to test the API (just before v6), this part enforcing limitations was common to all of them. The reason is that those options keeps their meaning with all schedulers.
          However if the point is that the current behaviour should still be the default, I completely agree.

          For sure I have nothing against making the modifications you propose in 1) and 4), but I wanted to be sure you had considered the reasons that made me choose the current organisation.

          Show
          Brice Arnould added a comment - Tom White : 1. You can change AbstractJobScheduler and FifoJobScheduler's constructors to be no-arg constructors. The JobTracker isn't being used, and if you make AbstractJobScheduler implement Configurable, the the Configuration will be automatically called if you use ReflectionUtils#newInstance (this is the standard Hadoop pattern). I started using that pattern (up to v4), but I told myself that it could be nice to give to the scheduler an access to the topology. It's true however that the FifoJobScheduler do not use it, so I can remove it if you want. 2. This needs some unit tests. It should be possible to write a unit test for FifoJobScheduler since it is relatively isolated. A really good idea ! I found a bug writing the tests :-P (that bug occured when two job of the same priority were added at the same time). This test tries to circumvent the need of starting a whole cluster by adding a protected constructor to JobInProgress, used by a class called FakeJobInProgress. I really don't like the idea of adding a constructor to ease the write of tests, but all other ideas I had seemed uglier. Suggestions are welcome ^^ 3. In AbstractJobScheduler#assignTask you move to the next step if the following condition is true: (step == 0 || step == 2) && mapTasksNumber > maximumMapLoad || (step == 1 || step == 3) && reduceTasksNumber > maximumReduceLoad Should these checks be >= (greater than or equal to) so the maximum load isn't exceeded? Ooops Fixed ^^ 4. Rather than having AbstractJobScheduler manage maxTasksPerJob, it might be better to have a subclass of FifoJobScheduler that limits tasks per job (TaskLimitedFifoJobScheduler). Or if this is possible through composition rather than inheritance then even better. Then FifoJobScheduler would then just preserve the current behaviour. My argument in favor of keeping that in the AbstractJobScheduler is that, when I implemented other schedulers to test the API (just before v6), this part enforcing limitations was common to all of them. The reason is that those options keeps their meaning with all schedulers. However if the point is that the current behaviour should still be the default, I completely agree. For sure I have nothing against making the modifications you propose in 1) and 4), but I wanted to be sure you had considered the reasons that made me choose the current organisation.
          Hide
          Brice Arnould added a comment -

          PS: Another example of a scheduler.

          Show
          Brice Arnould added a comment - PS: Another example of a scheduler.
          Hide
          Brice Arnould added a comment -

          Takes in account changes from HADOOP-3333.

          Show
          Brice Arnould added a comment - Takes in account changes from HADOOP-3333 .
          Hide
          Owen O'Malley added a comment -

          Your patches need to be svn diffs relative to HADOOP_HOME.

          I would prefer the use of ReflectionUtils.newInstance, because it handles the case of non-public classes and constructors and does the configuration in a standard way. It would mean moving your parameters from the constructor to an initialize method.

          Why do you have "// TODO: Java 6 @Override" all over? @Override is in Java 5.

          Generally with log4j, we don't provide loggers to other classes, but rather let them get their own loggers. So I'd remove getLog()

          The interface for AbstractJobScheduler has too many ways to update the task tracker state. I'd suggest just leaving updateTaskTrackerStatus, which can do deletes if the new status is null. In particular, I'd remove the two removeTaskTracker methods.

          Show
          Owen O'Malley added a comment - Your patches need to be svn diffs relative to HADOOP_HOME. I would prefer the use of ReflectionUtils.newInstance, because it handles the case of non-public classes and constructors and does the configuration in a standard way. It would mean moving your parameters from the constructor to an initialize method. Why do you have "// TODO: Java 6 @Override" all over? @Override is in Java 5. Generally with log4j, we don't provide loggers to other classes, but rather let them get their own loggers. So I'd remove getLog() The interface for AbstractJobScheduler has too many ways to update the task tracker state. I'd suggest just leaving updateTaskTrackerStatus, which can do deletes if the new status is null. In particular, I'd remove the two removeTaskTracker methods.
          Hide
          Brice Arnould added a comment -

          Owen O'Malley

          Your patches need to be svn diffs relative to HADOOP_HOME.

          Ooops. Sorry for that. I think that I fixed it.

          I would prefer the use of ReflectionUtils.newInstance, because it handles the case of non-public classes and constructors and does the configuration in a standard way. It would mean moving your parameters from the constructor to an initialize method.

          Done ! (I'll open another issue to discuss of ReflectionUtils)

          Why do you have "// TODO: Java 6 @Override" all over? @Override is in Java 5.

          Andy Konwinski explained me that in Java 5 this annotation was not supported on methods defined in an interface.

          Generally with log4j, we don't provide loggers to other classes, but rather let them get their own loggers. So I'd remove getLog()

          Done !

          The interface for AbstractJobScheduler has too many ways to update the task tracker state. I'd suggest just leaving updateTaskTrackerStatus, which can do deletes if the new status is null. In particular, I'd remove the two removeTaskTracker methods.

          My aims was to allow calls to the jobscheduler from unsynchronized methods, to help HADOOP-3608 ... too much anticipatory
          I removed them.

          Thanks for your comments ^^

          Show
          Brice Arnould added a comment - Owen O'Malley Your patches need to be svn diffs relative to HADOOP_HOME. Ooops. Sorry for that. I think that I fixed it. I would prefer the use of ReflectionUtils.newInstance, because it handles the case of non-public classes and constructors and does the configuration in a standard way. It would mean moving your parameters from the constructor to an initialize method. Done ! (I'll open another issue to discuss of ReflectionUtils) Why do you have "// TODO: Java 6 @Override" all over? @Override is in Java 5. Andy Konwinski explained me that in Java 5 this annotation was not supported on methods defined in an interface. Generally with log4j, we don't provide loggers to other classes, but rather let them get their own loggers. So I'd remove getLog() Done ! The interface for AbstractJobScheduler has too many ways to update the task tracker state. I'd suggest just leaving updateTaskTrackerStatus, which can do deletes if the new status is null. In particular, I'd remove the two removeTaskTracker methods. My aims was to allow calls to the jobscheduler from unsynchronized methods, to help HADOOP-3608 ... too much anticipatory I removed them. Thanks for your comments ^^
          Hide
          Tom White added a comment -

          My argument in favor of keeping that in the AbstractJobScheduler is that, when I implemented other schedulers to test the API (just before v6), this part enforcing limitations was common to all of them. The reason is that those options keeps their meaning with all schedulers.

          Thinking about this some more, perhaps a better way of achieving this would be to make things more composable.

          How about breaking apart the responsibilities, so there is a class for choosing the next task to run - call it TaskScheduler (which has the functionality of AbstractJobScheduler), and a class for prioritizing jobs: JobQueue (which is basically FifoJobScheduler). This would permit mixing of different JobQueues with different TaskSchedulers. This would not preclude more complex TaskSchedulers that have their own specialized JobQueue when it's not possible to sort jobs before choosing a task.

          public abstract class JobQueue implements Iterable<JobInProgress> {
            public abstract void add(JobInProgress job);
            public abstract Iterator<JobInProgress> iterator();
            public abstract void remove(JobInProgress job);
          }
          
          public abstract class TaskScheduler extends Configured {
            
            private JobQueue jobQueue;
            
            public TaskScheduler() {
              // jobQueue specified by configuration
            }
            
            /**
             * For subclasses that want to provide their own JobQueue
             */
            protected TaskScheduler(JobQueue jobQueue) {
              this.jobQueue = jobQueue;
            }
          
            public void setConf(Configuration conf) {
              super.setConf(conf);
              // initialize jobQueue here
            }
            
            public JobQueue getJobQueue() {
              return jobQueue;
            }
            
            public abstract Task assignTask(String taskTrackerName) throws IOException;
            
            public abstract boolean updateTaskTrackerStatus(String taskTrackerName,
                TaskTrackerStatus newStatus);
            
            // other methods...
          
          }
          

          Then I would have FifoJobQueue, and two TaskSchedulers: DefaultTaskScheduler (which implements the current behaviour) and JobLimitedTaskScheduler both using FifoJobQueue by default.

          Thoughts?

          A couple of other points:

          • I'm not sure we need the getLockOnJobs method. As long as JobTracker is the only client of JobScheduler then it can manage its access to JobScheduler using external synchronization. Put another way, why it is not sufficient for JobScheduler to be thread-safe when it comes to adding and removing jobs?
          • Single line blocks need curly braces.
          Show
          Tom White added a comment - My argument in favor of keeping that in the AbstractJobScheduler is that, when I implemented other schedulers to test the API (just before v6), this part enforcing limitations was common to all of them. The reason is that those options keeps their meaning with all schedulers. Thinking about this some more, perhaps a better way of achieving this would be to make things more composable. How about breaking apart the responsibilities, so there is a class for choosing the next task to run - call it TaskScheduler (which has the functionality of AbstractJobScheduler), and a class for prioritizing jobs: JobQueue (which is basically FifoJobScheduler). This would permit mixing of different JobQueues with different TaskSchedulers. This would not preclude more complex TaskSchedulers that have their own specialized JobQueue when it's not possible to sort jobs before choosing a task. public abstract class JobQueue implements Iterable<JobInProgress> { public abstract void add(JobInProgress job); public abstract Iterator<JobInProgress> iterator(); public abstract void remove(JobInProgress job); } public abstract class TaskScheduler extends Configured { private JobQueue jobQueue; public TaskScheduler() { // jobQueue specified by configuration } /** * For subclasses that want to provide their own JobQueue */ protected TaskScheduler(JobQueue jobQueue) { this .jobQueue = jobQueue; } public void setConf(Configuration conf) { super .setConf(conf); // initialize jobQueue here } public JobQueue getJobQueue() { return jobQueue; } public abstract Task assignTask( String taskTrackerName) throws IOException; public abstract boolean updateTaskTrackerStatus( String taskTrackerName, TaskTrackerStatus newStatus); // other methods... } Then I would have FifoJobQueue, and two TaskSchedulers: DefaultTaskScheduler (which implements the current behaviour) and JobLimitedTaskScheduler both using FifoJobQueue by default. Thoughts? A couple of other points: I'm not sure we need the getLockOnJobs method. As long as JobTracker is the only client of JobScheduler then it can manage its access to JobScheduler using external synchronization. Put another way, why it is not sufficient for JobScheduler to be thread-safe when it comes to adding and removing jobs? Single line blocks need curly braces.
          Hide
          Brice Arnould added a comment -

          Tom White

          How about breaking apart the responsibilities, so there is a class for choosing the next task to run - call it TaskScheduler (which has the functionality of AbstractJobScheduler), and a class for prioritizing jobs: JobQueue (which is basically FifoJobScheduler). This would permit mixing of different JobQueues with different TaskSchedulers. This would not preclude more complex TaskSchedulers that have their own specialized JobQueue when it's not possible to sort jobs before choosing a task.

          That seems to be a really great idea ! I'm going to rewrite my patch the way you suggest.

          Show
          Brice Arnould added a comment - Tom White How about breaking apart the responsibilities, so there is a class for choosing the next task to run - call it TaskScheduler (which has the functionality of AbstractJobScheduler), and a class for prioritizing jobs: JobQueue (which is basically FifoJobScheduler). This would permit mixing of different JobQueues with different TaskSchedulers. This would not preclude more complex TaskSchedulers that have their own specialized JobQueue when it's not possible to sort jobs before choosing a task. That seems to be a really great idea ! I'm going to rewrite my patch the way you suggest.
          Hide
          Brice Arnould added a comment -

          Done ^^ Thanks Tom

          Show
          Brice Arnould added a comment - Done ^^ Thanks Tom
          Hide
          Tom White added a comment -

          Brice,

          This is looking good. A few comments:

          • We don't need iterator() and getSortedJobs() - iterator() is sufficient.
          • Unless we're absolutely sure that we've got the interface right, I think JobQueue and TaskScheduler should be abstract classes. See https://issues.apache.org/jira/browse/HADOOP-1230?focusedCommentId=12573958#action_12573958
          • JobQueue doesn't need to be Configurable. Its implementations may choose to be Configurable to get hold of the config, but they shouldn't have to. FifoJobQueue doesn't use the conifg it's passed. (It JobQueue is an abstract class then this goes away - it can just extend Configured.)
          • There are a few javadoc warnings.
          • What did you think of the idea of having JobLimitedTaskScheduler?
          Show
          Tom White added a comment - Brice, This is looking good. A few comments: We don't need iterator() and getSortedJobs() - iterator() is sufficient. Unless we're absolutely sure that we've got the interface right, I think JobQueue and TaskScheduler should be abstract classes. See https://issues.apache.org/jira/browse/HADOOP-1230?focusedCommentId=12573958#action_12573958 JobQueue doesn't need to be Configurable. Its implementations may choose to be Configurable to get hold of the config, but they shouldn't have to. FifoJobQueue doesn't use the conifg it's passed. (It JobQueue is an abstract class then this goes away - it can just extend Configured.) There are a few javadoc warnings. What did you think of the idea of having JobLimitedTaskScheduler?
          Hide
          Doug Cutting added a comment -

          > I think JobQueue and TaskScheduler should be abstract classes [ ... ]

          +1 (Thanks for watching for this, Tom!) A good rule of thumb is that if it has more than one method it should probably be an abstract class, not an interface. Even then, interfaces aren't always best even for one-method APIs.

          Show
          Doug Cutting added a comment - > I think JobQueue and TaskScheduler should be abstract classes [ ... ] +1 (Thanks for watching for this, Tom!) A good rule of thumb is that if it has more than one method it should probably be an abstract class, not an interface. Even then, interfaces aren't always best even for one-method APIs.
          Hide
          Brice Arnould added a comment -

          Tom White

          Unless we're absolutely sure that we've got the interface right, I think JobQueue and TaskScheduler should be abstract classes. See https://issues.apache.org/jira/browse/HADOOP-1230?focusedCommentId=12573958#action_12573958

          You mean that interfaces are easier to make evolve because they can provide default implementations to the methods we will add ?
          I didn't thought of that. I made the change.

          We don't need iterator() and getSortedJobs() - iterator() is sufficient.

          getSortedJobs() allows to bias the choice of the job by the characteristics of the TaskTracker, something that appeared to be useful when I played with the API. This new proposition however provides a default implementation for it.

          What did you think of the idea of having JobLimitedTaskScheduler?

          Not a problem to add it, but I see two way of doing so without duplicating most of assignTask() :

          • By composition, adding a JobFilter subclass with two methods : isAcceptable (job, step) and getNumberOfSteps(). The first would tell if a job is right for the step we're in and the second the number of steps we need.
          • By inheritance, providing isAcceptable and getNumberOfSteps as methods of the DefaultTaskScheduler.

          Both are easy to implement but that new level of abstraction seems contrary to the KISS principle, except if we really need it for other filters. For now, when limits are disabled, the TaskScheduler just do one more test line 104, and one other line 124 when limits are enabled. That might not justify the creation of another class and a filter concept (again : excepted if we need them for something else).

          I fixed the warnings. Thanks for your advices. It's instructive ^^

          Show
          Brice Arnould added a comment - Tom White Unless we're absolutely sure that we've got the interface right, I think JobQueue and TaskScheduler should be abstract classes. See https://issues.apache.org/jira/browse/HADOOP-1230?focusedCommentId=12573958#action_12573958 You mean that interfaces are easier to make evolve because they can provide default implementations to the methods we will add ? I didn't thought of that. I made the change. We don't need iterator() and getSortedJobs() - iterator() is sufficient. getSortedJobs() allows to bias the choice of the job by the characteristics of the TaskTracker, something that appeared to be useful when I played with the API. This new proposition however provides a default implementation for it. What did you think of the idea of having JobLimitedTaskScheduler? Not a problem to add it, but I see two way of doing so without duplicating most of assignTask() : By composition, adding a JobFilter subclass with two methods : isAcceptable (job, step) and getNumberOfSteps(). The first would tell if a job is right for the step we're in and the second the number of steps we need. By inheritance, providing isAcceptable and getNumberOfSteps as methods of the DefaultTaskScheduler. Both are easy to implement but that new level of abstraction seems contrary to the KISS principle, except if we really need it for other filters. For now, when limits are disabled, the TaskScheduler just do one more test line 104, and one other line 124 when limits are enabled. That might not justify the creation of another class and a filter concept (again : excepted if we need them for something else). I fixed the warnings. Thanks for your advices. It's instructive ^^
          Hide
          Matei Zaharia added a comment -

          While the idea of splitting up the TaskScheduler and JobQueue is
          great, isn't having a getJobQueue() in TaskScheduler needlessly
          complicating the interface? Now, when somebody wants to write a new
          scheduling algorithm, they may have to write two classes, a
          TaskScheduler and a JobQueue, and figure out how they'll interact. I'd
          suggest having the add(JobInProgress) and remove(JobInProgress)
          methods directly in the TaskScheduler, and removing getJobQueue(), so
          that the JobTracker only calls methods of the TaskScheduler object.
          The DefaultTaskScheduler can include a JobQueue that it passes the add/
          remove methods to, but people writing new schedulers should have the
          option to do something different.

          Here are two concrete cases where this would be useful:

          • Some TaskSchedulers, even if they use a JobQueue, might want to run
            code in response to the add() and remove() methods (updating data
            structures, deciding to preempt existing jobs because a new one, etc).
            Right now this is not possible unless the JobQueue tells them it was
            updated.
          • Some schedulers might not logically have a job queue. For example,
            we might imagine a scheduler that sorts tasks, not jobs, based on
            locality and on whether the task is speculative.

          Matei

          Show
          Matei Zaharia added a comment - While the idea of splitting up the TaskScheduler and JobQueue is great, isn't having a getJobQueue() in TaskScheduler needlessly complicating the interface? Now, when somebody wants to write a new scheduling algorithm, they may have to write two classes, a TaskScheduler and a JobQueue, and figure out how they'll interact. I'd suggest having the add(JobInProgress) and remove(JobInProgress) methods directly in the TaskScheduler, and removing getJobQueue(), so that the JobTracker only calls methods of the TaskScheduler object. The DefaultTaskScheduler can include a JobQueue that it passes the add/ remove methods to, but people writing new schedulers should have the option to do something different. Here are two concrete cases where this would be useful: Some TaskSchedulers, even if they use a JobQueue, might want to run code in response to the add() and remove() methods (updating data structures, deciding to preempt existing jobs because a new one, etc). Right now this is not possible unless the JobQueue tells them it was updated. Some schedulers might not logically have a job queue. For example, we might imagine a scheduler that sorts tasks, not jobs, based on locality and on whether the task is speculative. Matei
          Hide
          Brice Arnould added a comment -

          Matei Zaharia
          Hi !
          It's late in France, so please excuse me if what I say is more stupid than usually :-P But I really like that separation between the TaskScheduler and the JobQueue.

          A lot of characteristics are shared by all tasks of a Job : the userName, the priority, the typical resources usage of composing tasks... Things like that. Most admins will want to use this to reflect the "politic" of the cluster (eg. should it give the priority to users that pay more, balance the charge among users, minimize the maximum time for job completions and so on). Writing a local JobQueue to reflect a local policy should be easy and should not interfere with the "optimizing part" of the scheduling (represented by the TaskScheduler). I provided a few example that use JobConf to easily attach tags to jobs, allowing an user to describe it's need. With the new interface that should be even shorter.

          The TaskScheduler, at the contrary, would be more technical and would take in account things like locality or bandwidth, independently of the site policies (OK, mapred.jobtracker.scheduler.maxRunningTasksPerJob is an exception... maybe I should move it into the JobQueue ? :-/)

          That being said, if you think that their will be a need for Task level scheduling, I can make JobQueue an interface again. In that way, a TaskScheduler could be it's own JobQueue by returning self in the getJobQueue() method. It's a bit more complex but that's a special case.
          I could also provide an AbstractJobQueue which would be the recommended for implementing JobQueues.
          In that way, everyone would be happy _
          But I'm unsure that we will need Task level scheduling.

          Regards,
          Brice

          Show
          Brice Arnould added a comment - Matei Zaharia Hi ! It's late in France, so please excuse me if what I say is more stupid than usually :-P But I really like that separation between the TaskScheduler and the JobQueue. A lot of characteristics are shared by all tasks of a Job : the userName, the priority, the typical resources usage of composing tasks... Things like that. Most admins will want to use this to reflect the "politic" of the cluster (eg. should it give the priority to users that pay more, balance the charge among users, minimize the maximum time for job completions and so on). Writing a local JobQueue to reflect a local policy should be easy and should not interfere with the "optimizing part" of the scheduling (represented by the TaskScheduler). I provided a few example that use JobConf to easily attach tags to jobs, allowing an user to describe it's need. With the new interface that should be even shorter. The TaskScheduler, at the contrary, would be more technical and would take in account things like locality or bandwidth, independently of the site policies (OK, mapred.jobtracker.scheduler.maxRunningTasksPerJob is an exception... maybe I should move it into the JobQueue ? :-/) That being said, if you think that their will be a need for Task level scheduling, I can make JobQueue an interface again. In that way, a TaskScheduler could be it's own JobQueue by returning self in the getJobQueue() method. It's a bit more complex but that's a special case. I could also provide an AbstractJobQueue which would be the recommended for implementing JobQueues. In that way, everyone would be happy _ But I'm unsure that we will need Task level scheduling. Regards, Brice
          Hide
          Matei Zaharia added a comment -

          Hi Brice,

          You're right that the JobQueue abstraction is very nice and that a lot
          of people would want to use it. However, I'm just saying that when
          defining the interface to the scheduler, it should be as simple and
          versatile as possible. Part of the goal of this JIRA was to make it
          possible for organizations to control scheduling without modifying
          Hadoop source code, so the interface it defines for the scheduler
          should hopefully never have to change once it is committed. Separating
          the JobQueue from the rest of the scheduler right away makes it
          difficult to implement certain types of schedulers behind the
          interface. Instead, I'm suggesting that you just add the JobQueue
          functionality to DefaultTaskScheduler (or a superclass, maybe
          something called QueueBasedTaskScheduler). Users will still be able to
          use JobQueue in the same way (just specify it in a config file), but
          anyone wanting to do something more complicated will have a simple
          interface to implement (just TaskScheduler). This is less "hacky" than
          creating a class that is both a JobQueue and TaskScheduler.

          The reason I bring this up is because I've been trying to use your
          patch to write a scheduler that emulates what Hadoop on Demand does
          today - the ability to specify quotas for each user (a max number of
          slots that the user can use). The most elegant way to implement this
          would be to have a TaskScheduler that knows about a quota on each
          user, and contains a separate JobQueue for each user, maybe chosen by
          the user (some might want FIFO, some might want shortest job first,
          etc). However, with the current interface, I have to create a single
          JobQueue class and hack the getSortedJobs method to return different
          things based on how many tasks each user has running. In fact changing
          the interface to TaskScheduler to make it more general as suggested
          would allow more code reuse because I'd be able to reuse other
          peoples' JobQueues as long as I write a TaskScheduler that works with
          multiple queues.

          Matei

          Show
          Matei Zaharia added a comment - Hi Brice, You're right that the JobQueue abstraction is very nice and that a lot of people would want to use it. However, I'm just saying that when defining the interface to the scheduler, it should be as simple and versatile as possible. Part of the goal of this JIRA was to make it possible for organizations to control scheduling without modifying Hadoop source code, so the interface it defines for the scheduler should hopefully never have to change once it is committed. Separating the JobQueue from the rest of the scheduler right away makes it difficult to implement certain types of schedulers behind the interface. Instead, I'm suggesting that you just add the JobQueue functionality to DefaultTaskScheduler (or a superclass, maybe something called QueueBasedTaskScheduler). Users will still be able to use JobQueue in the same way (just specify it in a config file), but anyone wanting to do something more complicated will have a simple interface to implement (just TaskScheduler). This is less "hacky" than creating a class that is both a JobQueue and TaskScheduler. The reason I bring this up is because I've been trying to use your patch to write a scheduler that emulates what Hadoop on Demand does today - the ability to specify quotas for each user (a max number of slots that the user can use). The most elegant way to implement this would be to have a TaskScheduler that knows about a quota on each user, and contains a separate JobQueue for each user, maybe chosen by the user (some might want FIFO, some might want shortest job first, etc). However, with the current interface, I have to create a single JobQueue class and hack the getSortedJobs method to return different things based on how many tasks each user has running. In fact changing the interface to TaskScheduler to make it more general as suggested would allow more code reuse because I'd be able to reuse other peoples' JobQueues as long as I write a TaskScheduler that works with multiple queues. Matei
          Hide
          Brice Arnould added a comment -

          Thanks for your very detailed answer.
          I have nothing about adding a superclass, if it is needed. I think that this class would look like the JobScheduler of v6.4 .

          However, I'm unsure about whether I understand your use case or not.
          If I understand, you want to write a scheduler that have two functionalities :

          • limiting the maximum number of task running per user
          • providing per-user jobqueues

          If my understanding is correct, those two functionalities are independent and should not be glued in a single class (eg. you might want to limit the number of per-user tasks even if the JobQueue is a simple Fifo). However, you would benefit of the JobFilter discussed above to ease the implementation of your TaskScheduler. (It's also true that if the jobqueue is a roundrobin between users, an optimized JobFilter could be written)

          Do you have a draft of the scheduler you're talking about (implemented for example on top of v6.4) ?
          If so, I would like to see it, so I'll port it to both API and see what it looks like on them.

          Thanks for your patience, but I want to be sure that I understand the problem
          Brice

          Show
          Brice Arnould added a comment - Thanks for your very detailed answer. I have nothing about adding a superclass, if it is needed. I think that this class would look like the JobScheduler of v6.4 . However, I'm unsure about whether I understand your use case or not. If I understand, you want to write a scheduler that have two functionalities : limiting the maximum number of task running per user providing per-user jobqueues If my understanding is correct, those two functionalities are independent and should not be glued in a single class (eg. you might want to limit the number of per-user tasks even if the JobQueue is a simple Fifo). However, you would benefit of the JobFilter discussed above to ease the implementation of your TaskScheduler. (It's also true that if the jobqueue is a roundrobin between users, an optimized JobFilter could be written) Do you have a draft of the scheduler you're talking about (implemented for example on top of v6.4) ? If so, I would like to see it, so I'll port it to both API and see what it looks like on them. Thanks for your patience, but I want to be sure that I understand the problem Brice
          Hide
          Tom White added a comment -

          Separating the JobQueue from the rest of the scheduler right away makes it difficult to implement certain types of schedulers behind the interface.

          I think this is a good point. So I would support the suggestion of having methods to add, remove and iterate over jobs on the TaskScheduler. And keep JobQueue, but don't make it mandatory for TaskSchedulers to use it.

          Another thing I would like to see is the code for managing the tasktracker state being pushed up into the base class, since it is shared. (In fact, I wondered whether it belongs in the TaskScheduler at all, since it would be possible to make TaskScheduler a listener with only the updateTaskTrackerStatus method. But this would duplicate state with the JobTracker, so on balance it's fine to have it in the TaskScheduler.)

          Taking these points together, we would then have

          public abstract class TaskScheduler extends Configured {
          
            // abstract methods:
            public abstract void addJob(JobInProgress jobInProgress);
            public abstract void removeJob(JobInProgress jobInProgress);
            public abstract Iterator<JobInProgress> getJobsIterator();
            public abstract Task assignTask(String taskTrackerName) throws IOException;
          
            // base implementations (currently in DefaultTaskScheduler)
            public Collection<TaskTrackerStatus> getAllTaskTrackers() { }
            public int getNumberOfTaskTrackers() { }
            public TaskTrackerStatus getTaskTracker(String taskTrackerName) { }
            public boolean updateTaskTrackerStatus(String taskTrackerName,
                TaskTrackerStatus newStatus) { }
            public Statistics getStatistics() { }
          
            Collection<JobInProgress> retireOldJobs (long retireBefore) { }
          
          }
          
          public class DefaultTaskScheduler extends TaskScheduler {
          
            private JobQueue jobQueue;
          
            @Override
            public void addJob(JobInProgress jobInProgress) {
              jobQueue.add(jobInProgress);
            }
          
            @Override
            public Iterator<JobInProgress> getJobsIterator() {
              return jobQueue.iterator();
            }
          
            @Override
            public void removeJob(JobInProgress jobInProgress) {
              jobQueue.remove(jobInProgress);
            }
          
            @Override
            public Task assignTask(String taskTrackerName) throws IOException {
              // as before
            }
          
          }
          

          How does this look?

          (Note that TaskScheduler can extend Configured rather than implementing Configurable.)

          Also, retireOldJobs seems a bit out of place here and should really go back in JobTracker. This should be easy since it just calls getJobsIterator.

          > We don't need iterator() and getSortedJobs() - iterator() is sufficient.

          getSortedJobs() allows to bias the choice of the job by the characteristics of the TaskTracker, something that appeared to be useful when I played with the API. This new proposition however provides a default implementation for it.

          But the iterator returned by iterator() can be any iterator - so we can make it the same one returned by getSortedJobs(). In other words, we only need one way of iterating over the JobQueue.

          Show
          Tom White added a comment - Separating the JobQueue from the rest of the scheduler right away makes it difficult to implement certain types of schedulers behind the interface. I think this is a good point. So I would support the suggestion of having methods to add, remove and iterate over jobs on the TaskScheduler. And keep JobQueue, but don't make it mandatory for TaskSchedulers to use it. Another thing I would like to see is the code for managing the tasktracker state being pushed up into the base class, since it is shared. (In fact, I wondered whether it belongs in the TaskScheduler at all, since it would be possible to make TaskScheduler a listener with only the updateTaskTrackerStatus method. But this would duplicate state with the JobTracker, so on balance it's fine to have it in the TaskScheduler.) Taking these points together, we would then have public abstract class TaskScheduler extends Configured { // abstract methods: public abstract void addJob(JobInProgress jobInProgress); public abstract void removeJob(JobInProgress jobInProgress); public abstract Iterator<JobInProgress> getJobsIterator(); public abstract Task assignTask( String taskTrackerName) throws IOException; // base implementations (currently in DefaultTaskScheduler) public Collection<TaskTrackerStatus> getAllTaskTrackers() { } public int getNumberOfTaskTrackers() { } public TaskTrackerStatus getTaskTracker( String taskTrackerName) { } public boolean updateTaskTrackerStatus( String taskTrackerName, TaskTrackerStatus newStatus) { } public Statistics getStatistics() { } Collection<JobInProgress> retireOldJobs ( long retireBefore) { } } public class DefaultTaskScheduler extends TaskScheduler { private JobQueue jobQueue; @Override public void addJob(JobInProgress jobInProgress) { jobQueue.add(jobInProgress); } @Override public Iterator<JobInProgress> getJobsIterator() { return jobQueue.iterator(); } @Override public void removeJob(JobInProgress jobInProgress) { jobQueue.remove(jobInProgress); } @Override public Task assignTask( String taskTrackerName) throws IOException { // as before } } How does this look? (Note that TaskScheduler can extend Configured rather than implementing Configurable.) Also, retireOldJobs seems a bit out of place here and should really go back in JobTracker. This should be easy since it just calls getJobsIterator. > We don't need iterator() and getSortedJobs() - iterator() is sufficient. getSortedJobs() allows to bias the choice of the job by the characteristics of the TaskTracker, something that appeared to be useful when I played with the API. This new proposition however provides a default implementation for it. But the iterator returned by iterator() can be any iterator - so we can make it the same one returned by getSortedJobs(). In other words, we only need one way of iterating over the JobQueue.
          Hide
          Matei Zaharia added a comment -

          Tom - I like your new TaskScheduler API. I also like the idea of putting the statistics and tasktracker status stuff back into JobTracker, and making the TaskScheduler a listener only. I don't think this needs to duplicate state at all - the JobTracker can expose the getAllTaskTrackers() and getTaskTracker(name) through a public API, and then you're set. (Well, you'll also need the TaskScheduler to have a reference to the JobTracker; maybe give it a setJobTracker method that the JobTracker calls after setConf.) This makes more sense to me than putting this functionality in all TaskSchedulers, especially because the statistics stuff is something where there is only one desired answer (nobody would ever make their TaskScheduler return something else).

          Brice - I haven't implemented my scheduler yet, but I will post when I have it. Overall it will be similar to the proposal in HADOOP-3445.

          Show
          Matei Zaharia added a comment - Tom - I like your new TaskScheduler API. I also like the idea of putting the statistics and tasktracker status stuff back into JobTracker, and making the TaskScheduler a listener only. I don't think this needs to duplicate state at all - the JobTracker can expose the getAllTaskTrackers() and getTaskTracker(name) through a public API, and then you're set. (Well, you'll also need the TaskScheduler to have a reference to the JobTracker; maybe give it a setJobTracker method that the JobTracker calls after setConf.) This makes more sense to me than putting this functionality in all TaskSchedulers, especially because the statistics stuff is something where there is only one desired answer (nobody would ever make their TaskScheduler return something else). Brice - I haven't implemented my scheduler yet, but I will post when I have it. Overall it will be similar to the proposal in HADOOP-3445 .
          Hide
          Brice Arnould added a comment -

          How does this look?

          This TaskScheduler is almost identical to what the JobScheduler was, so it will work. I'm still a bit bothered by the fact this modification to the API is not motivated by the example of some real code. But since you both seems to agree it's probably the right thing to do .
          DefaultTaskScheduler would benefit of a protected accessor to the JobQueue to ease inheritance, and TaskScheduler need the getLockOnJobs method. But appart from that it should be OK.

          Also, retireOldJobs seems a bit out of place here and should really go back in JobTracker. This should be easy since it just calls getJobsIterator.

          Ooops :-° You're right. I initally intended to handle this as part of HADOOP-3609 but I should also have reverted the creation of retireOldJobs in this branch.

          >> We don't need iterator() and getSortedJobs() - iterator() is sufficient.

          > getSortedJobs() allows to bias the choice of the job by the characteristics of the TaskTracker, something that appeared to be useful when I played with the API. This new proposition however provides a default implementation for it.

          But the iterator returned by iterator() can be any iterator - so we can make it the same one returned by getSortedJobs(). In other words, we only need one way of iterating over the JobQueue.

          Some implementation may not be backed by a single sorted container. Iterating on their jobs (for example when retiring jobs) should be possible without requiring them to create a single sorted container of all their jobs. Moreover, as I said before, the removal of getSortedJobs(trackerName) implies to loose the hability to bias the choice of the job by the characteristics of the TaskTracker.
          The attached SimpleResourceAwareJobScheduler.java illustrate both points. Another example would be a JobQueue in which users would reserve a few nodes for their exclusive usage (maybe the best way to ensure that Tasks running on the same machine do not interfere, is that what you were talking about Matei ?).
          On the other hand, providing a default getSortedJobs() method adds only three lines and no complexity for those who don't need it.

          I also like the idea of putting the statistics and tasktracker status stuff back into JobTracker, and making the TaskScheduler a listener only.

          Another concern with that approch is the complexity of the JobTracker.
          IMHO statistics are not used by the JobTracker (it just pass them away) and so should be kept inside the

          {Job,Task}

          Scheduler. Since it's a static class that have no additional cost.

          I haven't implemented my scheduler yet, but I will post when I have it.

          Thanks ! I think that writing code is the best way to evaluate an API.

          Overall it will be similar to the proposal in HADOOP-3445.

          I still have to understand this bug . I'll wait for your code.

          Show
          Brice Arnould added a comment - How does this look? This TaskScheduler is almost identical to what the JobScheduler was, so it will work. I'm still a bit bothered by the fact this modification to the API is not motivated by the example of some real code. But since you both seems to agree it's probably the right thing to do . DefaultTaskScheduler would benefit of a protected accessor to the JobQueue to ease inheritance, and TaskScheduler need the getLockOnJobs method. But appart from that it should be OK. Also, retireOldJobs seems a bit out of place here and should really go back in JobTracker. This should be easy since it just calls getJobsIterator. Ooops :-° You're right. I initally intended to handle this as part of HADOOP-3609 but I should also have reverted the creation of retireOldJobs in this branch. >> We don't need iterator() and getSortedJobs() - iterator() is sufficient. > getSortedJobs() allows to bias the choice of the job by the characteristics of the TaskTracker, something that appeared to be useful when I played with the API. This new proposition however provides a default implementation for it. But the iterator returned by iterator() can be any iterator - so we can make it the same one returned by getSortedJobs(). In other words, we only need one way of iterating over the JobQueue. Some implementation may not be backed by a single sorted container. Iterating on their jobs (for example when retiring jobs) should be possible without requiring them to create a single sorted container of all their jobs. Moreover, as I said before, the removal of getSortedJobs(trackerName) implies to loose the hability to bias the choice of the job by the characteristics of the TaskTracker. The attached SimpleResourceAwareJobScheduler.java illustrate both points. Another example would be a JobQueue in which users would reserve a few nodes for their exclusive usage (maybe the best way to ensure that Tasks running on the same machine do not interfere, is that what you were talking about Matei ?). On the other hand, providing a default getSortedJobs() method adds only three lines and no complexity for those who don't need it. I also like the idea of putting the statistics and tasktracker status stuff back into JobTracker, and making the TaskScheduler a listener only. Another concern with that approch is the complexity of the JobTracker. IMHO statistics are not used by the JobTracker (it just pass them away) and so should be kept inside the {Job,Task} Scheduler. Since it's a static class that have no additional cost. I haven't implemented my scheduler yet, but I will post when I have it. Thanks ! I think that writing code is the best way to evaluate an API. Overall it will be similar to the proposal in HADOOP-3445 . I still have to understand this bug . I'll wait for your code.
          Hide
          Matei Zaharia added a comment -

          The TaskScheduler is not the only user of statistics right now - for example, the web interface (JSP pages) also use them. So it makes very little sense to put them into the TaskScheduler and then have the JobTracker pass them from the TaskScheduler onto other users. If the concern is that the JobTracker class is too complicated, then we can move statistics into a separate class, but that should be a separate JIRA. For now, what's wrong with leaving them in the JobTracker and just providing an API that the TaskScheduler can use to read them? It would make this patch simpler, making it easier for it to get tested and accepted, and it's also the right thing from a software engineering point of view (scheduling has little to do with keeping track of statistics). Originally I thought that you'd put stats into the TaskScheduler because it needed to do something very complicated with them, but now I think that a simple getTaskTrackerStatuses method in the JobTracker would work fine.

          Show
          Matei Zaharia added a comment - The TaskScheduler is not the only user of statistics right now - for example, the web interface (JSP pages) also use them. So it makes very little sense to put them into the TaskScheduler and then have the JobTracker pass them from the TaskScheduler onto other users. If the concern is that the JobTracker class is too complicated, then we can move statistics into a separate class, but that should be a separate JIRA. For now, what's wrong with leaving them in the JobTracker and just providing an API that the TaskScheduler can use to read them? It would make this patch simpler, making it easier for it to get tested and accepted, and it's also the right thing from a software engineering point of view (scheduling has little to do with keeping track of statistics). Originally I thought that you'd put stats into the TaskScheduler because it needed to do something very complicated with them, but now I think that a simple getTaskTrackerStatuses method in the JobTracker would work fine.
          Hide
          Brice Arnould added a comment -

          @Matei Zaharia
          While I'm looking at this code again, I think that TaskScheduler.Statistics was a bad idea, since it essentially duplicates mapred.ClustersStatus. So I propose to remove the mapred.TaskScheduler.Statistics and to make TaskScheduler.getStatistics() to return directly a mapred.ClusterStatus.
          What do you think of that ?

          PS: By the way, please excuse me if I've been rude by inadvertance. I'm bad in English and choosing the wrong word, or forgeting another, can change the tone of a sentense :-P

          Show
          Brice Arnould added a comment - @Matei Zaharia While I'm looking at this code again, I think that TaskScheduler.Statistics was a bad idea, since it essentially duplicates mapred.ClustersStatus. So I propose to remove the mapred.TaskScheduler.Statistics and to make TaskScheduler.getStatistics() to return directly a mapred.ClusterStatus. What do you think of that ? PS: By the way, please excuse me if I've been rude by inadvertance. I'm bad in English and choosing the wrong word, or forgeting another, can change the tone of a sentense :-P
          Hide
          Brice Arnould added a comment -

          This new version try to take in account the previous comments.
          It also extracts the handling of tasktracker into a seperate class called TaskTrackerContainer.

          So, we now have :
          A big TaskScheduler abstract class, from which you can do almost everything, and the rest.
          A JobQueue, an abstract class describing an entity which sort jobs.
          A FifoJobQueue, well described by it's name.
          A TaskTrackContainer, which helps to write other simple schedulers without duplicating the code managing TaskTrackers from DefaultTaskScheduler.
          A DefaultTaskScheduler glueing together a single JobQueue that it get from the configuration and the simple TaskTrackContainer.
          Is that OK? I also removed the Statistics class.

          If I forgot something, please tell me !

          Show
          Brice Arnould added a comment - This new version try to take in account the previous comments. It also extracts the handling of tasktracker into a seperate class called TaskTrackerContainer . So, we now have : A big TaskScheduler abstract class, from which you can do almost everything, and the rest. A JobQueue , an abstract class describing an entity which sort jobs. A FifoJobQueue , well described by it's name. A TaskTrackContainer , which helps to write other simple schedulers without duplicating the code managing TaskTrackers from DefaultTaskScheduler . A DefaultTaskScheduler glueing together a single JobQueue that it get from the configuration and the simple TaskTrackContainer . Is that OK? I also removed the Statistics class. If I forgot something, please tell me !
          Hide
          eric baldeschwieler added a comment -

          Hi Folks,

          It looks to me like we are heading for a major merge headache on this patch vs some of the other major scheduler work in flight for 19. I'm surprised it has taken this long for us to realize this.

          Could I ask that folks contributing to this issues' patches state what their goals are here? We're going to need to look for compromise, so it will be good to understand what short term impact they want from this work.

          We'll be listing conflicting JIRAs and the yahoo teams goals here too. Just wanted to give folks a heads up and start the discussion. With any luck we will quickly find a way to meet everyones goals with a minimum amount of conflict.

          E14

          Show
          eric baldeschwieler added a comment - Hi Folks, It looks to me like we are heading for a major merge headache on this patch vs some of the other major scheduler work in flight for 19. I'm surprised it has taken this long for us to realize this. Could I ask that folks contributing to this issues' patches state what their goals are here? We're going to need to look for compromise, so it will be good to understand what short term impact they want from this work. We'll be listing conflicting JIRAs and the yahoo teams goals here too. Just wanted to give folks a heads up and start the discussion. With any luck we will quickly find a way to meet everyones goals with a minimum amount of conflict. E14
          Hide
          Owen O'Malley added a comment -

          It is unfortunate that this likely conflicts with HADOOP-3445, but this patch has received a lot more attention. (HADOOP-3445 isn't even patch available yet...) It sucks when large patches conflict, but it is hard to avoid on large projects with a lot of contributors. We should push to finish reviewing this patch to get it in, so that 3445 can use it as a base to make changes from.

          Show
          Owen O'Malley added a comment - It is unfortunate that this likely conflicts with HADOOP-3445 , but this patch has received a lot more attention. ( HADOOP-3445 isn't even patch available yet...) It sucks when large patches conflict, but it is hard to avoid on large projects with a lot of contributors. We should push to finish reviewing this patch to get it in, so that 3445 can use it as a base to make changes from.
          Hide
          Matei Zaharia added a comment -

          HADOOP-3445 has been listed as "blocked" by this JIRA since at least May, but I guess they've started implementing it against the old codebase now? In any case, I think it would be great for Hadoop going forward if there was a clean way to plug in separate schedulers. As Tom and I commented, all you really need the scheduler to support is three methods - addJob, removeJob and assignTask - plus visibility into the data structures in the JobTracker. If it were possible to refactor the changes in HADOOP-3445 to work behind this interface that would be great.

          I should add that I'm also working on a Hadoop job scheduler at Facebook. The goal of my scheduler is to implement fair sharing between jobs, so that short jobs are not starved by long jobs, while allowing for the same kinds of per-group quotas that HADOOP-3445 provides. It will be similar to the Completely Fair Scheduler in Linux. This seems to be the right thing for Facebook's use case, which is a single cluster running a mix of long jobs, shorter ad-hoc "interactive" queries, and jobs that need some kind of guaranteed capacity to finish in time. I'll post a JIRA with a detailed design in the next few days. I've implemented my scheduler so that it works with Brice's patch, although I've been developing against a slightly different patch that I wrote for Hadoop 0.17 because we want to be able to use the scheduler in 0.17.

          Show
          Matei Zaharia added a comment - HADOOP-3445 has been listed as "blocked" by this JIRA since at least May, but I guess they've started implementing it against the old codebase now? In any case, I think it would be great for Hadoop going forward if there was a clean way to plug in separate schedulers. As Tom and I commented, all you really need the scheduler to support is three methods - addJob, removeJob and assignTask - plus visibility into the data structures in the JobTracker. If it were possible to refactor the changes in HADOOP-3445 to work behind this interface that would be great. I should add that I'm also working on a Hadoop job scheduler at Facebook. The goal of my scheduler is to implement fair sharing between jobs, so that short jobs are not starved by long jobs, while allowing for the same kinds of per-group quotas that HADOOP-3445 provides. It will be similar to the Completely Fair Scheduler in Linux. This seems to be the right thing for Facebook's use case, which is a single cluster running a mix of long jobs, shorter ad-hoc "interactive" queries, and jobs that need some kind of guaranteed capacity to finish in time. I'll post a JIRA with a detailed design in the next few days. I've implemented my scheduler so that it works with Brice's patch, although I've been developing against a slightly different patch that I wrote for Hadoop 0.17 because we want to be able to use the scheduler in 0.17.
          Hide
          Tom White added a comment -

          As it currently stands this patch does look like a big change. I agree with Matei that we can scale down the impact of the changes and still meet the goal of having a simple pluggable scheduling mechanism. I've actually reworked Brice's latest patch to do this, so I hope we'll have something with minimal changes to the JobTracker (and hence more acceptable to 3445) that we can commit soon. I'm just finishing off the unit tests and will post the patch later today.

          Show
          Tom White added a comment - As it currently stands this patch does look like a big change. I agree with Matei that we can scale down the impact of the changes and still meet the goal of having a simple pluggable scheduling mechanism. I've actually reworked Brice's latest patch to do this, so I hope we'll have something with minimal changes to the JobTracker (and hence more acceptable to 3445) that we can commit soon. I'm just finishing off the unit tests and will post the patch later today.
          Hide
          Brice Arnould added a comment -

          Yes that conflict is trully regretable :-/
          If it can help, I can try to port HADOOP-3445 to this new API. The difficulty being that if this algorithm changed since the time it have been described, I'm likely to miss a few of those changes. But I should be hable to present a prototype before the week end.

          By the way : it's true that this patch makes a lot of changes, but it works well for me (tm) documents some of the JobTracker internals that seemed pretty obscure before, removes many useless synchronisation primitives and is of lesser asymptotic complexity.
          That being said, I understand that you rather to make the refactoring and the implementation of the API two distinct issues.
          If I started it now, that's what I would do.

          Show
          Brice Arnould added a comment - Yes that conflict is trully regretable :-/ If it can help, I can try to port HADOOP-3445 to this new API. The difficulty being that if this algorithm changed since the time it have been described, I'm likely to miss a few of those changes. But I should be hable to present a prototype before the week end. By the way : it's true that this patch makes a lot of changes, but it works well for me (tm) documents some of the JobTracker internals that seemed pretty obscure before, removes many useless synchronisation primitives and is of lesser asymptotic complexity. That being said, I understand that you rather to make the refactoring and the implementation of the API two distinct issues. If I started it now, that's what I would do.
          Hide
          Vivek Ratan added a comment -

          I don't think there will be a significant effort to merge HADOOP-3445 and this patch. 3445 has a lot of new code, encapsulated in new methods, and it shouldn't be too difficult to make it fit this patch. I recommend evaluating this Jira on its own merits.

          Show
          Vivek Ratan added a comment - I don't think there will be a significant effort to merge HADOOP-3445 and this patch. 3445 has a lot of new code, encapsulated in new methods, and it shouldn't be too difficult to make it fit this patch. I recommend evaluating this Jira on its own merits.
          Hide
          Tom White added a comment -

          Here is a new patch which takes Brice's good work and strips it back to minimise the changes to JobTracker to provide a start for future work on scheduling, such as HADOOP-3445. In a nutshell, the major change here is the removal of the getNewTaskForTaskTracker method from JobTracker, replacing it with a call to TaskScheduler#assignTask, to meet the goal of supporting pluggable schedulers.

          TaskScheduler has the three methods Matei mentioned: addJob, removeJob, assignTask. It also has a reference to the JobTracker, via a TaskTrackerManager interface, which is like TaskTrackerContainer in the last patch and which I introduced to permit testing. (I've put a comment in it to say as much.)

          I've renamed DefaultTaskScheduler to JobQueueTaskScheduler to better describe what it does, and I've replaced the JobQueue abstraction with a SortedSet<JobInProgress>. My feeling is that a SortedSet is sufficient to do job prioritization, since you can specify a Comparator to impose an order on the queue. More complex schedulers can just implement the three TaskScheduler methods, which shouldn't be too much of a burden. This is an area where I imagine we can make improvements after this change has been committed.

          Finally, I've enhanced the unit test to test several scheduling scenarios - this test directly tests the scheduling algorithm by providing a (static) mock TaskTrackerManager implementation.

          Show
          Tom White added a comment - Here is a new patch which takes Brice's good work and strips it back to minimise the changes to JobTracker to provide a start for future work on scheduling, such as HADOOP-3445 . In a nutshell, the major change here is the removal of the getNewTaskForTaskTracker method from JobTracker, replacing it with a call to TaskScheduler#assignTask, to meet the goal of supporting pluggable schedulers. TaskScheduler has the three methods Matei mentioned: addJob, removeJob, assignTask. It also has a reference to the JobTracker, via a TaskTrackerManager interface, which is like TaskTrackerContainer in the last patch and which I introduced to permit testing. (I've put a comment in it to say as much.) I've renamed DefaultTaskScheduler to JobQueueTaskScheduler to better describe what it does, and I've replaced the JobQueue abstraction with a SortedSet<JobInProgress>. My feeling is that a SortedSet is sufficient to do job prioritization, since you can specify a Comparator to impose an order on the queue. More complex schedulers can just implement the three TaskScheduler methods, which shouldn't be too much of a burden. This is an area where I imagine we can make improvements after this change has been committed. Finally, I've enhanced the unit test to test several scheduling scenarios - this test directly tests the scheduling algorithm by providing a (static) mock TaskTrackerManager implementation.
          Hide
          Vivek Ratan added a comment -

          Nice job, Tom. It's a lot cleaner and will make it easier for us to merge stuff from HADOOP-3445. I did have a couple of comments:

          A Scheduler is usually just an algorithm for deciding which task to pick for a TT. It uses information from the TT (its hostname, rack, what it's currently running, how many resources it has free), and information from what is required (which job/queue to look at, capacities, user limits) and makes the best match. I'm wondering if TaskScheduler should handle addition/removal of jobs. As we make the JT persistent, we need the ability to persist job state to disk, to initialize jobs (expand their task-based structures, as in JobInProgress::Init()) dynamically (since, in order to scale, you don't want jobs to be expanded unless absolutely needed), to store only relevant information in memory and the rest on disk. Something else should likely do this, not TaskScheduler. TaskScheduler needs to access the collection of jobs when it runs its scheduling algorithms, but it should not be responsible for them. Methods like addJob() and removeJob() probably belong to some other class, something like a JobQueueManager. Which, by the way, can also handle multiple queues of jobs, as we'll need for 3445. Maybe the JT itself can handle the queues of jobs initially. Regardless, do you think TaskScheduler should be responsible for jobs?

          Another thought I had is regarding the work we're doing for 3445. It's more of an observation than a suggestion. HADOOP-3421 introduces multiple queues, capacity per queue, and user limits. Each of these features affects the scheduling of tasks, which likely would go something like this: a TT indicates that it has one or more free Map or Reduce slots. JT figures out whether to look for a Map or Reduce task. JT needs to find a task in a job in a queue, so it first looks at which queue to consider (primarily based on queue capacities and whether the queue can accept a TT slot). Within the selected queue, the JT considers which job to look at, based on user limits (the job needs to be belong to a user who is not using more capacity than he/she is allowed) and priorities. Finally, within a job, the JT (actually, the JobInProgress object) needs to pick what task to run, based on data locality, speculation, and some other heuristics. My guess is that many developers will want to plug in tweaks to some of the pieces of entire scheduling algorithm, and not modify other logic. Someone, for example, may want to tweak how a queue is chosen, and not touch the other stuff. For this, IMO, we need to break down the JT's scheduling flow (decide on M or R task, then pick a queue, then pick a job, then pick a task), which now sits in JobQueueTaskScheduler, into discrete units and allow folks to override one or more of these units. There are a few ways to do this and we can use some of the suggestions and principles applied in this patch there as well. I guess I'm really making a plug here for folks to look at extending the stuff for 3445 (once it is ready) in a similar way, so that the entire scheduling flow can be made extensible at different steps.

          Once again, I don't think merging the 3445 work with this patch should be very hard, especially with the latest patch. I'll take a look at that soon. Thanks, Brice for your offer to help. I'm sure you can help us out there. And of course, I'm glad you took this whole effort up and pushed it all the way to where it is. Nice work.

          Show
          Vivek Ratan added a comment - Nice job, Tom. It's a lot cleaner and will make it easier for us to merge stuff from HADOOP-3445 . I did have a couple of comments: A Scheduler is usually just an algorithm for deciding which task to pick for a TT. It uses information from the TT (its hostname, rack, what it's currently running, how many resources it has free), and information from what is required (which job/queue to look at, capacities, user limits) and makes the best match. I'm wondering if TaskScheduler should handle addition/removal of jobs. As we make the JT persistent, we need the ability to persist job state to disk, to initialize jobs (expand their task-based structures, as in JobInProgress::Init()) dynamically (since, in order to scale, you don't want jobs to be expanded unless absolutely needed), to store only relevant information in memory and the rest on disk. Something else should likely do this, not TaskScheduler . TaskScheduler needs to access the collection of jobs when it runs its scheduling algorithms, but it should not be responsible for them. Methods like addJob() and removeJob() probably belong to some other class, something like a JobQueueManager . Which, by the way, can also handle multiple queues of jobs, as we'll need for 3445. Maybe the JT itself can handle the queues of jobs initially. Regardless, do you think TaskScheduler should be responsible for jobs? Another thought I had is regarding the work we're doing for 3445. It's more of an observation than a suggestion. HADOOP-3421 introduces multiple queues, capacity per queue, and user limits. Each of these features affects the scheduling of tasks, which likely would go something like this: a TT indicates that it has one or more free Map or Reduce slots. JT figures out whether to look for a Map or Reduce task. JT needs to find a task in a job in a queue, so it first looks at which queue to consider (primarily based on queue capacities and whether the queue can accept a TT slot). Within the selected queue, the JT considers which job to look at, based on user limits (the job needs to be belong to a user who is not using more capacity than he/she is allowed) and priorities. Finally, within a job, the JT (actually, the JobInProgress object) needs to pick what task to run, based on data locality, speculation, and some other heuristics. My guess is that many developers will want to plug in tweaks to some of the pieces of entire scheduling algorithm, and not modify other logic. Someone, for example, may want to tweak how a queue is chosen, and not touch the other stuff. For this, IMO, we need to break down the JT's scheduling flow (decide on M or R task, then pick a queue, then pick a job, then pick a task), which now sits in JobQueueTaskScheduler , into discrete units and allow folks to override one or more of these units. There are a few ways to do this and we can use some of the suggestions and principles applied in this patch there as well. I guess I'm really making a plug here for folks to look at extending the stuff for 3445 (once it is ready) in a similar way, so that the entire scheduling flow can be made extensible at different steps. Once again, I don't think merging the 3445 work with this patch should be very hard, especially with the latest patch. I'll take a look at that soon. Thanks, Brice for your offer to help. I'm sure you can help us out there. And of course, I'm glad you took this whole effort up and pushed it all the way to where it is. Nice work.
          Hide
          Matei Zaharia added a comment -

          Tom, the new patch looks good. The fake container for unit testing will be very helpful.

          Vivek, regarding job persistence - I think the addJob and removeJob methods in the TaskScheduler are only meant to be "listener" methods to notify it that a job should be considered for scheduling. The JobTracker still keeps a list of jobs in the jobs variable, so it is the ultimate "owner" of the job list. Thus it should be possible to persist the jobs in the JobTracker or JobQueueManager or some other class and just add/remove them from the scheduler when they become schedulable.

          Show
          Matei Zaharia added a comment - Tom, the new patch looks good. The fake container for unit testing will be very helpful. Vivek, regarding job persistence - I think the addJob and removeJob methods in the TaskScheduler are only meant to be "listener" methods to notify it that a job should be considered for scheduling. The JobTracker still keeps a list of jobs in the jobs variable, so it is the ultimate "owner" of the job list. Thus it should be possible to persist the jobs in the JobTracker or JobQueueManager or some other class and just add/remove them from the scheduler when they become schedulable.
          Hide
          Matei Zaharia added a comment -

          I've submitted a JIRA about a fair scheduler now - HADOOP-3746.

          Show
          Matei Zaharia added a comment - I've submitted a JIRA about a fair scheduler now - HADOOP-3746 .
          Hide
          Brice Arnould added a comment -

          This new version have far greater tests than my hacks and is much less intrusive (with the drawbacks mentioned before).
          Thanks Tom for writing this ^^
          My two cents :

          • Maybe the TaskTrackerManager interface could be package-private, since it is not meant to be used by end-users (if I understood well).
          • Maybe the fact that queues are now described just by comparators is a bit restrictive. For example I think that it prevents schedulers like HADOOP-3746 (and many others) to be implemented as queues. I really liked the previous JobQueue abstract class but since it didn't made a consensus I suggest that subclasses could specify a Set<JobInProgress> instead of a Comparator<JobInProgress> (or a SortSet but I don't think that the extra methods this interface adds are needed).
            So the constructor
              protected JobQueueTaskScheduler(
                  Comparator<JobInProgress> jobQueueComparator) {
                this.jobQueue = new TreeSet<JobInProgress>(jobQueueComparator);
              }
            

            would become

              protected JobQueueTaskScheduler(
                  Set<JobInProgress> jobQueue) {
                this.jobQueue = jobQueue                                       
              }
            

          @Vivek Thanks, happy if I helped :-D Contributing to Hadoop is very instructive (and fun too :-P)

          Show
          Brice Arnould added a comment - This new version have far greater tests than my hacks and is much less intrusive (with the drawbacks mentioned before). Thanks Tom for writing this ^^ My two cents : Maybe the TaskTrackerManager interface could be package-private, since it is not meant to be used by end-users (if I understood well). Maybe the fact that queues are now described just by comparators is a bit restrictive. For example I think that it prevents schedulers like HADOOP-3746 (and many others) to be implemented as queues. I really liked the previous JobQueue abstract class but since it didn't made a consensus I suggest that subclasses could specify a Set<JobInProgress> instead of a Comparator<JobInProgress> (or a SortSet but I don't think that the extra methods this interface adds are needed). So the constructor protected JobQueueTaskScheduler( Comparator<JobInProgress> jobQueueComparator) { this .jobQueue = new TreeSet<JobInProgress>(jobQueueComparator); } would become protected JobQueueTaskScheduler( Set<JobInProgress> jobQueue) { this .jobQueue = jobQueue } @Vivek Thanks, happy if I helped :-D Contributing to Hadoop is very instructive (and fun too :-P)
          Hide
          Vivek Ratan added a comment -

          @Matei:
          >> Vivek, regarding job persistence - I think the addJob and removeJob methods in the TaskScheduler are only meant to be "listener" methods to notify it that a job should be considered for scheduling. The JobTracker still keeps a list of jobs in the jobs variable, so it is the ultimate "owner" of the job list. Thus it should be possible to persist the jobs in the JobTracker or JobQueueManager or some other class and just add/remove them from the scheduler when they become schedulable.

          My point was, the Scheduler should get whatever jobs it needs to consider from someone who manages jobs. It shouldn't maintain a separate list. Suppose you have one Scheduler that wants to look at all submitted jobs before deciding which task is best. Suppose you have another that only wants to look at the job with the highest priority and pick a task from it. In the first case, the caller (JT) needs to invoke the Scheduler's addJob() method for every job. In the second case, it needs to invoke the Scheduler's addJob() method only for one job. This is not good. The caller's code should be the same regardless of which scheduler is used behind the scenes. What should really happen is that when the first scheduler is called, it looks at all the jobs by fetching them from a JobManager or whatever class it is that handles jobs. The second scheduler will call the Jobmanager in a different way. The scheduler user's code is not affected. You shouldn't tell the Scheduler what jobs to consider - that decision is part of the Scheduler's internals.

          Show
          Vivek Ratan added a comment - @Matei: >> Vivek, regarding job persistence - I think the addJob and removeJob methods in the TaskScheduler are only meant to be "listener" methods to notify it that a job should be considered for scheduling. The JobTracker still keeps a list of jobs in the jobs variable, so it is the ultimate "owner" of the job list. Thus it should be possible to persist the jobs in the JobTracker or JobQueueManager or some other class and just add/remove them from the scheduler when they become schedulable. My point was, the Scheduler should get whatever jobs it needs to consider from someone who manages jobs. It shouldn't maintain a separate list. Suppose you have one Scheduler that wants to look at all submitted jobs before deciding which task is best. Suppose you have another that only wants to look at the job with the highest priority and pick a task from it. In the first case, the caller (JT) needs to invoke the Scheduler's addJob() method for every job. In the second case, it needs to invoke the Scheduler's addJob() method only for one job. This is not good. The caller's code should be the same regardless of which scheduler is used behind the scenes. What should really happen is that when the first scheduler is called, it looks at all the jobs by fetching them from a JobManager or whatever class it is that handles jobs. The second scheduler will call the Jobmanager in a different way. The scheduler user's code is not affected. You shouldn't tell the Scheduler what jobs to consider - that decision is part of the Scheduler's internals.
          Hide
          Tom White added a comment -

          A new patch with a few small changes.

          > I think the addJob and removeJob methods in the TaskScheduler are only meant to be "listener" methods to notify it that a job should be considered for scheduling.

          To emphasise this I have renamed these methods to be consistent with the "listener style" often found in Java: jobAdded and jobRemoved. I've also added a jobUpdated method since the job is actually being updated rather than added then removed.

          > Maybe the TaskTrackerManager interface could be package-private, since it is not meant to be used by end-users (if I understood well).

          I didn't make it package-private as I wanted implementors of TaskScheduler (which shouldn't have to be in the same package) to be able to use it. However, I've just noticed that Task, JobInProgress and TaskTrackerStatus are package-private, so I have made TaskTrackerManager the same. I suggest we figure out how to allow TaskScheduler implementations to be in other packages as a separate issue, where we consider how much of Task, JobInProgress and TaskTrackerStatus we need to make public. This can be done in parallel with HADOOP-3445 and HADOOP-3746.

          > I suggest that subclasses could specify a Set<JobInProgress>

          I agree - I've done this.

          I've also fixed up some badly named TaskTrackerManager instances which still had the word "Container" in them.

          Show
          Tom White added a comment - A new patch with a few small changes. > I think the addJob and removeJob methods in the TaskScheduler are only meant to be "listener" methods to notify it that a job should be considered for scheduling. To emphasise this I have renamed these methods to be consistent with the "listener style" often found in Java: jobAdded and jobRemoved. I've also added a jobUpdated method since the job is actually being updated rather than added then removed. > Maybe the TaskTrackerManager interface could be package-private, since it is not meant to be used by end-users (if I understood well). I didn't make it package-private as I wanted implementors of TaskScheduler (which shouldn't have to be in the same package) to be able to use it. However, I've just noticed that Task, JobInProgress and TaskTrackerStatus are package-private, so I have made TaskTrackerManager the same. I suggest we figure out how to allow TaskScheduler implementations to be in other packages as a separate issue, where we consider how much of Task, JobInProgress and TaskTrackerStatus we need to make public. This can be done in parallel with HADOOP-3445 and HADOOP-3746 . > I suggest that subclasses could specify a Set<JobInProgress> I agree - I've done this. I've also fixed up some badly named TaskTrackerManager instances which still had the word "Container" in them.
          Hide
          Vivek Ratan added a comment -

          I wanted to add some more detail to my previous comment, and also address Brice's last comment on implementing queues.

          I earlier talked of a JobQueueManager, which is responsible for maintaining the collection of jobs submitted to a JT. This class would deal with how jobs are stored in memory or on disk. It really would encapsulate the jobsByPriority and jobs (perhaps) data structures, which are currently in JobTracker. When a job is submitted, i.e., when JobTracker.submitJob() is called, the JobInProgress object representing the new job would be given to JobQueueManager to maintain. addJob() and removeJob(), or their equivalent, would be part of JobQueueManager. Whatever class derives from TaskScheduler would invoke methods in JobQueueManager to figure out what jobs to look at.

          It's a bit tricky as to what JobQueueManager should look like. HADOOP-3421 brings in the concept of queues, and jobs being submitted to queues. So you will have multiple queues, each queue containing jobs, and perhaps supporting priorities and limits and such. Should JobQueueManager have an explicit notion of queue names, so that, for example, you could get sorted jobs from a single queue? Or maybe you get sorted jobs from a collection of queues. For example, you could have a method in JobQueueManager as follows:

          Collection<JobInProgress> getJobs(String queueName, Comparator<JobInProgress> jobComparator)
          

          I don't want to get into too many details here. These probably belong more on 3445. My point is, when designing JobQueueManager, or when modelling the right interface for JobQueue, as Brice was talking about, you need to keep in mind that there could be multiple queues of jobs, and sometimes you may want to get jobs from a single queue, and sometimes from multiple queues. Again, why does this matter?

          • I like that Tom did away with the JobQueue abstract class. Its presence forced a scheduler to assume there was only one queue of jobs (sure, you needn't have used JobQueue, but then it would be wasted), which would create problems for 3445.
          • The design of JobQueueManager is a bit tricky, depending on whether you want to assume there will always be multiple queues of jobs, and whether you want to fetch jobs from multiple or single queues in one call. As a temporary alternative, till we figure out the right design, you could leave the functionality of maintaining jobs in the JobTracker class. This class could continue supporting jobsByPriority and jobs, and expose them directly to TaskScheduler, so that the latter could look at the jobs it needed to.

          Yet another alternative is to leave TaskScheduler as is, wait till we have a patch for 3445, then look at if/how we design JobQueueManager. It may end up changing TaskScheduler by moving addJob() and removeJob(), but at least you'd have a much better idea of what you want in JobQueueManager. I personally prefer this approach, but keep in mind, it may end up changing TaskScheduler.

          Show
          Vivek Ratan added a comment - I wanted to add some more detail to my previous comment, and also address Brice's last comment on implementing queues. I earlier talked of a JobQueueManager , which is responsible for maintaining the collection of jobs submitted to a JT. This class would deal with how jobs are stored in memory or on disk. It really would encapsulate the jobsByPriority and jobs (perhaps) data structures, which are currently in JobTracker . When a job is submitted, i.e., when JobTracker.submitJob() is called, the JobInProgress object representing the new job would be given to JobQueueManager to maintain. addJob() and removeJob() , or their equivalent, would be part of JobQueueManager . Whatever class derives from TaskScheduler would invoke methods in JobQueueManager to figure out what jobs to look at. It's a bit tricky as to what JobQueueManager should look like. HADOOP-3421 brings in the concept of queues, and jobs being submitted to queues. So you will have multiple queues, each queue containing jobs, and perhaps supporting priorities and limits and such. Should JobQueueManager have an explicit notion of queue names, so that, for example, you could get sorted jobs from a single queue? Or maybe you get sorted jobs from a collection of queues. For example, you could have a method in JobQueueManager as follows: Collection<JobInProgress> getJobs(String queueName, Comparator<JobInProgress> jobComparator) I don't want to get into too many details here. These probably belong more on 3445. My point is, when designing JobQueueManager , or when modelling the right interface for JobQueue , as Brice was talking about, you need to keep in mind that there could be multiple queues of jobs, and sometimes you may want to get jobs from a single queue, and sometimes from multiple queues. Again, why does this matter? I like that Tom did away with the JobQueue abstract class. Its presence forced a scheduler to assume there was only one queue of jobs (sure, you needn't have used JobQueue , but then it would be wasted), which would create problems for 3445. The design of JobQueueManager is a bit tricky, depending on whether you want to assume there will always be multiple queues of jobs, and whether you want to fetch jobs from multiple or single queues in one call. As a temporary alternative , till we figure out the right design, you could leave the functionality of maintaining jobs in the JobTracker class. This class could continue supporting jobsByPriority and jobs , and expose them directly to TaskScheduler , so that the latter could look at the jobs it needed to. Yet another alternative is to leave TaskScheduler as is, wait till we have a patch for 3445, then look at if/how we design JobQueueManager . It may end up changing TaskScheduler by moving addJob() and removeJob() , but at least you'd have a much better idea of what you want in JobQueueManager . I personally prefer this approach, but keep in mind, it may end up changing TaskScheduler .
          Hide
          Matei Zaharia added a comment -

          Tom - I ran into a few issues applying your 9.1 patch, which I've fixed in a new patch (9.2):

          • @Override was being used in TaskTracker on methods from the interface Configurable, which is not allowed in Java 1.5. Java 1.5 only lets you use @Override on methods overridden from a class.
          • TaskTrackerManager.taskTrackers() returned a Collection with no type parameters. I changed this to Collection<TaskTrackerStatus>.
          • Several classes used by the scheduler API had package-only visibility in org.apache.hadoop.mapred, which is problematic for anyone writing a scheduler outside this package. It makes sense to allow schedulers to be in different packages to prevent namespace pollution. I changed the visibility of several types to public (JobInProgress, Task, TaskInProgress, TaskTrackerStatus, TaskTrackerManager). I also made JobInProgress.getConf() public since the conf will often be used to put per-job scheduling parameters (such as queue name).
          Show
          Matei Zaharia added a comment - Tom - I ran into a few issues applying your 9.1 patch, which I've fixed in a new patch (9.2): @Override was being used in TaskTracker on methods from the interface Configurable, which is not allowed in Java 1.5. Java 1.5 only lets you use @Override on methods overridden from a class. TaskTrackerManager.taskTrackers() returned a Collection with no type parameters. I changed this to Collection<TaskTrackerStatus>. Several classes used by the scheduler API had package-only visibility in org.apache.hadoop.mapred, which is problematic for anyone writing a scheduler outside this package. It makes sense to allow schedulers to be in different packages to prevent namespace pollution. I changed the visibility of several types to public (JobInProgress, Task, TaskInProgress, TaskTrackerStatus, TaskTrackerManager). I also made JobInProgress.getConf() public since the conf will often be used to put per-job scheduling parameters (such as queue name).
          Hide
          Matei Zaharia added a comment -

          @Vivek - I'm still not sure I understand why jobAdded and jobRemoved should not be in the TaskScheduler. It's true that persistence of jobs should be managed by the JobQueueManager, but these methods are meant to be "listeners", to let the scheduler update its internal data structures when jobs are added or removed. Without these methods, the scheduler would have to scan the list of jobs in the JobQueueManager every time it needs to make a scheduling decision to see whether it has changed from last time. There's no serious problem with having to do this, but it seems a little clunky. Or is there a problem with the TaskScheduler even being able to access the full list of runnable jobs?

          Show
          Matei Zaharia added a comment - @Vivek - I'm still not sure I understand why jobAdded and jobRemoved should not be in the TaskScheduler. It's true that persistence of jobs should be managed by the JobQueueManager, but these methods are meant to be "listeners", to let the scheduler update its internal data structures when jobs are added or removed. Without these methods, the scheduler would have to scan the list of jobs in the JobQueueManager every time it needs to make a scheduling decision to see whether it has changed from last time. There's no serious problem with having to do this, but it seems a little clunky. Or is there a problem with the TaskScheduler even being able to access the full list of runnable jobs?
          Hide
          Tom White added a comment -

          @Override was being used in TaskTracker on methods from the interface Configurable, which is not allowed in Java 1.5. Java 1.5 only lets you use @Override on methods overridden from a class.

          Java 6 is required now. See HADOOP-2325.

          Several classes used by the scheduler API had package-only visibility in org.apache.hadoop.mapred, which is problematic for anyone writing a scheduler outside this package. It makes sense to allow schedulers to be in different packages to prevent namespace pollution. I changed the visibility of several types to public (JobInProgress, Task, TaskInProgress, TaskTrackerStatus, TaskTrackerManager). I also made JobInProgress.getConf() public since the conf will often be used to put per-job scheduling parameters (such as queue name).

          Yes, I mentioned this above. I'm not comfortable with just making all of these classes public without thinking through the interfaces, since we have to maintain these public interfaces, and be careful (and backwards compatible) with evolution. So I suggest we keep them package private for the first release, and figure how to open it up later.

          I'd be interested to hear others' thoughts about this.

          Show
          Tom White added a comment - @Override was being used in TaskTracker on methods from the interface Configurable, which is not allowed in Java 1.5. Java 1.5 only lets you use @Override on methods overridden from a class. Java 6 is required now. See HADOOP-2325 . Several classes used by the scheduler API had package-only visibility in org.apache.hadoop.mapred, which is problematic for anyone writing a scheduler outside this package. It makes sense to allow schedulers to be in different packages to prevent namespace pollution. I changed the visibility of several types to public (JobInProgress, Task, TaskInProgress, TaskTrackerStatus, TaskTrackerManager). I also made JobInProgress.getConf() public since the conf will often be used to put per-job scheduling parameters (such as queue name). Yes, I mentioned this above. I'm not comfortable with just making all of these classes public without thinking through the interfaces, since we have to maintain these public interfaces, and be careful (and backwards compatible) with evolution. So I suggest we keep them package private for the first release, and figure how to open it up later. I'd be interested to hear others' thoughts about this.
          Hide
          Arun C Murthy added a comment - - edited

          Brice/Tom, this is looking good. Couple of brief comments - pardon me jumping in late, I've only recently started looking at this and other related jiras.

          1. Let me get the minor one out of way first: can we call the main scheduler interface 'Scheduler' rather than 'TaskScheduler', it might be confusing vis-a-vis JobScheduler? smile

          Ok, the serious stuff:

          2. I propose we update TaskScheduler.assignTask to reflect that a TaskTracker might have multiple slots free (HADOOP-3136 has very important utilization benefits). With that change it becomes explicit that the TaskTracker could have multiple map/reduce slots available and the scheduler services that request by giving it tasks from possibly different jobs.

          public List<Task> assignTasks(TaskTrackerId taskTracker);
          

          Oh, this might be good time to introduce a notion of TaskTrackerId similar to what Enis did for Job/Task/TaskAttempt (HADOOP-544) ?

          3. The one major comment I had is to help quickly resolve the gap between this and HADOOP-3445. The major change coming with HADOOP-3445 is the notion of Queues. I'm inclined to believe that it will be beneficial to explicitly state the notion of Queues (and multiple Queues) in the Scheduler interface. To that effect I propose a minor change to the jobAdded/jobRemoved/jobUpdated apis:

          public void jobAdded(QueueId, Job);
          public void jobRemoved(QueueId, Job);
          public void jobUpdated(QueueId, Job);
          

          With this, it will be pave the way for HADOOP-3445 to get in quite easily.

          4. Also, we'd need some query capabilities given the notion of Queues:

          List<Queue> getQueues();
          List<Job> getJobs(QueueId, State); // State is RUNNING/PENDING/COMPLETE
          

          I'm not comfortable with just making all of these classes public without thinking through the interfaces, since we have to maintain these public interfaces, and be careful (and backwards compatible) with evolution. So I suggest we keep them package private for the first release, and figure how to open it up later.

          I'm inclined to go with Tom on keep these interfaces package-private for the first release, but I do realise Matei and others might be eager to run with it right-away!

          Show
          Arun C Murthy added a comment - - edited Brice/Tom, this is looking good. Couple of brief comments - pardon me jumping in late, I've only recently started looking at this and other related jiras. 1. Let me get the minor one out of way first: can we call the main scheduler interface 'Scheduler' rather than 'TaskScheduler', it might be confusing vis-a-vis JobScheduler? smile Ok, the serious stuff: 2. I propose we update TaskScheduler.assignTask to reflect that a TaskTracker might have multiple slots free ( HADOOP-3136 has very important utilization benefits). With that change it becomes explicit that the TaskTracker could have multiple map/reduce slots available and the scheduler services that request by giving it tasks from possibly different jobs. public List<Task> assignTasks(TaskTrackerId taskTracker); Oh, this might be good time to introduce a notion of TaskTrackerId similar to what Enis did for Job/Task/TaskAttempt ( HADOOP-544 ) ? 3. The one major comment I had is to help quickly resolve the gap between this and HADOOP-3445 . The major change coming with HADOOP-3445 is the notion of Queues. I'm inclined to believe that it will be beneficial to explicitly state the notion of Queues (and multiple Queues) in the Scheduler interface. To that effect I propose a minor change to the jobAdded/jobRemoved/jobUpdated apis: public void jobAdded(QueueId, Job); public void jobRemoved(QueueId, Job); public void jobUpdated(QueueId, Job); With this, it will be pave the way for HADOOP-3445 to get in quite easily. 4. Also, we'd need some query capabilities given the notion of Queues: List<Queue> getQueues(); List<Job> getJobs(QueueId, State); // State is RUNNING/PENDING/COMPLETE I'm not comfortable with just making all of these classes public without thinking through the interfaces, since we have to maintain these public interfaces, and be careful (and backwards compatible) with evolution. So I suggest we keep them package private for the first release, and figure how to open it up later. I'm inclined to go with Tom on keep these interfaces package-private for the first release, but I do realise Matei and others might be eager to run with it right-away!
          Hide
          Matei Zaharia added a comment - - edited

          Thanks for your comments Arun. Regarding queues, why not have the queue associated with the job (as a jobconf parameter, accessed through a method job.getQueueId()) rather than passing it along separately? Also, the scheduler does not need to keep track of which jobs are in which queue; this can be done by the JobTracker to begin with and the JobQueueManager later on. I'm just thinking that the more baggage there is in the Scheduler interface, the more trouble other implementers will have to go to, and the more temptation there will be to add more parameters and methods in the future, which would break schedulers. Really the only functional thing the scheduler does is to assign tasks.

          Show
          Matei Zaharia added a comment - - edited Thanks for your comments Arun. Regarding queues, why not have the queue associated with the job (as a jobconf parameter, accessed through a method job.getQueueId()) rather than passing it along separately? Also, the scheduler does not need to keep track of which jobs are in which queue; this can be done by the JobTracker to begin with and the JobQueueManager later on. I'm just thinking that the more baggage there is in the Scheduler interface, the more trouble other implementers will have to go to, and the more temptation there will be to add more parameters and methods in the future, which would break schedulers. Really the only functional thing the scheduler does is to assign tasks.
          Hide
          Arun C Murthy added a comment -

          Also, the scheduler does not need to keep track of which jobs are in which queue; this can be done by the JobTracker to begin with and the JobQueueManager later on. I'm just thinking that the more baggage there is in the Scheduler [...]

          My idea was to explicitly introduce the notion of the Queue right-away into the Scheduler, making it a first-class citizen - we need to discuss whether this is the right call. Given that both HADOOP-3445 and HADOOP-3746 need to deal with the notion of Queue, I believe we should. So, the argument on the table is that the Scheduler needs to be aware of Queues to assign tasks. Thoughts?

          Show
          Arun C Murthy added a comment - Also, the scheduler does not need to keep track of which jobs are in which queue; this can be done by the JobTracker to begin with and the JobQueueManager later on. I'm just thinking that the more baggage there is in the Scheduler [...] My idea was to explicitly introduce the notion of the Queue right-away into the Scheduler, making it a first-class citizen - we need to discuss whether this is the right call. Given that both HADOOP-3445 and HADOOP-3746 need to deal with the notion of Queue, I believe we should. So, the argument on the table is that the Scheduler needs to be aware of Queues to assign tasks. Thoughts?
          Hide
          Matei Zaharia added a comment -

          I agree that most schedulers will probably use queues. However, I think that if the queue is a first-class concept, we should make it a property of the JobInProgress rather than a separate parameter, the same way we do with priority, user, etc. This is more consistent and also sets an example for people who want to introduce other per-job scheduling parameters. (In fact I imagine that most people will put per-job parameters as properties in each job's jobconf, so they don't have to modify any Hadoop code to run their scheduler). It's mostly a software design point - if two objects will be associated together, this should be represented as membership.

          Show
          Matei Zaharia added a comment - I agree that most schedulers will probably use queues. However, I think that if the queue is a first-class concept, we should make it a property of the JobInProgress rather than a separate parameter, the same way we do with priority, user, etc. This is more consistent and also sets an example for people who want to introduce other per-job scheduling parameters. (In fact I imagine that most people will put per-job parameters as properties in each job's jobconf, so they don't have to modify any Hadoop code to run their scheduler). It's mostly a software design point - if two objects will be associated together, this should be represented as membership.
          Hide
          Vivek Ratan added a comment -

          I agree with Matei. Queues are associated with jobs and are maintained and exposed by the JobInProgress object (Arun, see the patch for 3445 - this is exactly what we've done). I also agree with Matei that you want to keep the Scheduler interface as simple as possible and really just have it assign tasks. In fact, to reiterate, I think the design for this Jira has come out quite well and there is no additional need, at least none that I can see yet, to make things easier for 3445. Once this patch is finalized/committed, we'll change 3445 to fit in with this structure.

          Show
          Vivek Ratan added a comment - I agree with Matei. Queues are associated with jobs and are maintained and exposed by the JobInProgress object (Arun, see the patch for 3445 - this is exactly what we've done). I also agree with Matei that you want to keep the Scheduler interface as simple as possible and really just have it assign tasks. In fact, to reiterate, I think the design for this Jira has come out quite well and there is no additional need, at least none that I can see yet, to make things easier for 3445. Once this patch is finalized/committed, we'll change 3445 to fit in with this structure.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12385892/JobScheduler-v9.2.patch
          against trunk revision 676069.

          +1 @author. The patch does not contain any @author tags.

          -1 tests included. The patch doesn't appear to include any new or modified tests.
          Please justify why no tests are needed for this patch.

          -1 javadoc. The javadoc tool appears to have generated 1 warning messages.

          -1 javac. The applied patch generated 521 javac compiler warnings (more than the trunk's current 520 warnings).

          +1 findbugs. The patch does not introduce any new Findbugs warnings.

          +1 release audit. The applied patch does not increase the total number of release audit warnings.

          +1 core tests. The patch passed core unit tests.

          +1 contrib tests. The patch passed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2844/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2844/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2844/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2844/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12385892/JobScheduler-v9.2.patch against trunk revision 676069. +1 @author. The patch does not contain any @author tags. -1 tests included. The patch doesn't appear to include any new or modified tests. Please justify why no tests are needed for this patch. -1 javadoc. The javadoc tool appears to have generated 1 warning messages. -1 javac. The applied patch generated 521 javac compiler warnings (more than the trunk's current 520 warnings). +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2844/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2844/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2844/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2844/console This message is automatically generated.
          Hide
          Vivek Ratan added a comment -

          @Matei:

          I'm still not sure I understand why jobAdded and jobRemoved should not be in the TaskScheduler. It's true that persistence of jobs should be managed by the JobQueueManager, but these methods are meant to be "listeners" [...]

          After some thought, I'm unable to convincingly argue, even to myself, for the removal of the methods from TaskScheduler.

          My concern really was with state. On one hand, I see the Scheduler as a stateless algorithm. The information it needs about jobs, when it runs, it gets from some other class. I was worried about any class that extends TaskScheduler having to maintain its own data structures for jobs, while a class like JobQueueManager is also maintaining (similar?) structures. On the other hand, I see your point too - for efficiency, a scheduler may want to know about what's changed since it ran last, rather than look at the entire set of jobs each time. A scheduler can certainly cache what information it needs (and maybe even support listener methods as you've suggested) if performance becomes an issue, but there is a state that it imposes on the system - it orders jobs a certain way (one scheduler may order jobs in FIFO order, another may choose a different ordering) - and perhaps this state is inherent to the scheduler.

          Like I said, I can't see a very strong reason for removing the job methods from TaskScheduler, so let's leave them there.

          Show
          Vivek Ratan added a comment - @Matei: I'm still not sure I understand why jobAdded and jobRemoved should not be in the TaskScheduler. It's true that persistence of jobs should be managed by the JobQueueManager, but these methods are meant to be "listeners" [...] After some thought, I'm unable to convincingly argue, even to myself, for the removal of the methods from TaskScheduler . My concern really was with state. On one hand, I see the Scheduler as a stateless algorithm. The information it needs about jobs, when it runs, it gets from some other class. I was worried about any class that extends TaskScheduler having to maintain its own data structures for jobs, while a class like JobQueueManager is also maintaining (similar?) structures. On the other hand, I see your point too - for efficiency, a scheduler may want to know about what's changed since it ran last, rather than look at the entire set of jobs each time. A scheduler can certainly cache what information it needs (and maybe even support listener methods as you've suggested) if performance becomes an issue, but there is a state that it imposes on the system - it orders jobs a certain way (one scheduler may order jobs in FIFO order, another may choose a different ordering) - and perhaps this state is inherent to the scheduler. Like I said, I can't see a very strong reason for removing the job methods from TaskScheduler , so let's leave them there.
          Hide
          Hemanth Yamijala added a comment -

          One comment from Arun was missed possibly.

          public List<Task> assignTasks(TaskTrackerId taskTracker);
          

          I think this is good to have in the scheduler. It is easy to default the implementation to return only 1 task as it does today. But leaves the options open with we get to HADOOP-3136. Should we get this in ?

          Other than that, +1 for leaving queues out of the interface, and leaving the job state change methods in the interface.

          Side note: in the current implementation of getNewTaskForTaskTracker(), we use a MIN_CLUSTER_SIZE_FOR_PADDING to give space for speculative and failed tasks. In the patch, with a default value of the MAX_TASKS_PER_JOB_PROPERTY (which is Long.MAX_VALUE), JobQueueTaskScheduler seems to not have this notion of padding. Is this not required, if we desire that, out of the box, the scheduling works as it does in the current JobTracker ?

          Show
          Hemanth Yamijala added a comment - One comment from Arun was missed possibly. public List<Task> assignTasks(TaskTrackerId taskTracker); I think this is good to have in the scheduler. It is easy to default the implementation to return only 1 task as it does today. But leaves the options open with we get to HADOOP-3136 . Should we get this in ? Other than that, +1 for leaving queues out of the interface, and leaving the job state change methods in the interface. Side note: in the current implementation of getNewTaskForTaskTracker(), we use a MIN_CLUSTER_SIZE_FOR_PADDING to give space for speculative and failed tasks. In the patch, with a default value of the MAX_TASKS_PER_JOB_PROPERTY (which is Long.MAX_VALUE), JobQueueTaskScheduler seems to not have this notion of padding. Is this not required, if we desire that, out of the box, the scheduling works as it does in the current JobTracker ?
          Hide
          Amar Kamat added a comment -

          Oh, this might be good time to introduce a notion of TaskTrackerId similar to what Enis did for Job/Task/TaskAttempt (HADOOP-544) ?

          HADOOP-3343 was opened to address this.

          Show
          Amar Kamat added a comment - Oh, this might be good time to introduce a notion of TaskTrackerId similar to what Enis did for Job/Task/TaskAttempt ( HADOOP-544 ) ? HADOOP-3343 was opened to address this.
          Hide
          Devaraj Das added a comment -

          I think this is good to have in the scheduler. It is easy to default the implementation to return only 1 task as it does today. But leaves the options open with we get to HADOOP-3136. Should we get this in ?

          Makes sense

          Regarding the padding, i am thinking that it makes sense to not have it at all. In general, we must execute failed tasks first always. This will ensure that jobs with deterministic task failures fail really early in the game. But this is more of a change in the JobInProgress class. Regarding speculative tasks the obtainNewMapTask/obtainNewReduceTask could return a speculative task if they desire so. And this logic better be in the JobInProgress class that has all the tasks' information.

          Show
          Devaraj Das added a comment - I think this is good to have in the scheduler. It is easy to default the implementation to return only 1 task as it does today. But leaves the options open with we get to HADOOP-3136 . Should we get this in ? Makes sense Regarding the padding, i am thinking that it makes sense to not have it at all. In general, we must execute failed tasks first always. This will ensure that jobs with deterministic task failures fail really early in the game. But this is more of a change in the JobInProgress class. Regarding speculative tasks the obtainNewMapTask/obtainNewReduceTask could return a speculative task if they desire so. And this logic better be in the JobInProgress class that has all the tasks' information.
          Hide
          Tom White added a comment -

          A new patch (v10) with the following changes:

          • The scheduling classes are package private as that seems to be the consensus for the moment.
          • TaskScheduler returns a list of Tasks as suggested by Arun to facilitate HADOOP-3136.
          • This Jira should not change the default scheduling algorithm, so I've made JobQueueTaskScheduler use the same algorithm as JobTracker currently does (in the getNewTaskForTaskTracker method). This keeps the padding code - thanks for pointing this out Hemanth. I've split out Brice's algorithm that limits the maximum number of tasks per job to a class called LimitTasksPerJobTaskScheduler.
          • The assignTasks method now takes a TaskTrackerStatus object, which serves as a representation of a tasktracker.
          Show
          Tom White added a comment - A new patch (v10) with the following changes: The scheduling classes are package private as that seems to be the consensus for the moment. TaskScheduler returns a list of Tasks as suggested by Arun to facilitate HADOOP-3136 . This Jira should not change the default scheduling algorithm, so I've made JobQueueTaskScheduler use the same algorithm as JobTracker currently does (in the getNewTaskForTaskTracker method). This keeps the padding code - thanks for pointing this out Hemanth. I've split out Brice's algorithm that limits the maximum number of tasks per job to a class called LimitTasksPerJobTaskScheduler. The assignTasks method now takes a TaskTrackerStatus object, which serves as a representation of a tasktracker.
          Hide
          Owen O'Malley added a comment -

          I think that:

          • we want to support lazy loading of jobs so that we can scale up the job tracker, which means we shouldn't use JobInProgress in the API since it is very heavy
          • we want to display jobs separated by queues to the webui and cli

          To accomplish that:

          • we create a JobDescription class that is very light weight and just includes the jobid, queue name, priority, and job.xml filename.
          class JobDescription {
            JobID getJobID();
            String getQueueName();
            void setQueueName(String queue);
            Priority getPriority();
            void setPriority(Priority priority);
            Path getJobConfPath();
          }
          
          class TaskScheduler {
            void addJob(JobDescription job) throws IOException;
            void removeJob(JobDescription job) throws IOException;
            void updateJob(JobDescription job) throws IOException;
            List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException;
            List<String> getQueueNames();
            List<JobDescription> getJobs(String queue) throws IOException;
          }
          

          Then the only data structure holding jobs is in the scheduler and doing queries can be done through this api.

          Thoughts?

          Show
          Owen O'Malley added a comment - I think that: we want to support lazy loading of jobs so that we can scale up the job tracker, which means we shouldn't use JobInProgress in the API since it is very heavy we want to display jobs separated by queues to the webui and cli To accomplish that: we create a JobDescription class that is very light weight and just includes the jobid, queue name, priority, and job.xml filename. class JobDescription { JobID getJobID(); String getQueueName(); void setQueueName( String queue); Priority getPriority(); void setPriority(Priority priority); Path getJobConfPath(); } class TaskScheduler { void addJob(JobDescription job) throws IOException; void removeJob(JobDescription job) throws IOException; void updateJob(JobDescription job) throws IOException; List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException; List< String > getQueueNames(); List<JobDescription> getJobs( String queue) throws IOException; } Then the only data structure holding jobs is in the scheduler and doing queries can be done through this api. Thoughts?
          Hide
          Matei Zaharia added a comment -

          This is a good idea, but there needs to be a way to tell the scheduler
          "load and start this job" and to get a JobInProgress from the
          JobDescription if the job has been loaded. Otherwise there will be no
          way to find a Task to return in assignTask(). Other useful fields in
          the JobInProgress include number of maps/reduces desired, number of
          maps/reduces running, and the list of running tasks in the job (which
          can be used for deciding when to do speculative execution).

          Show
          Matei Zaharia added a comment - This is a good idea, but there needs to be a way to tell the scheduler "load and start this job" and to get a JobInProgress from the JobDescription if the job has been loaded. Otherwise there will be no way to find a Task to return in assignTask(). Other useful fields in the JobInProgress include number of maps/reduces desired, number of maps/reduces running, and the list of running tasks in the job (which can be used for deciding when to do speculative execution).
          Hide
          Vivek Ratan added a comment -

          we want to support lazy loading of jobs so that we can scale up the job tracker, which means we shouldn't use JobInProgress in the API since it is very heavy

          Isn't there a far easier way to do this? A JobInProgress object grows in size when you call JobInProgress.initTasks(). The current JT code calls initTasks() when a job is submitted to the JT (the newly created JobInProgress object is actually put in a queue and some other thread calls initTasks(), but it's really called as early as possible). A simple fix to this is to call initTasks() only when a job is considered for running by the scheduler. That way, you initialize a JobInProgress object only when needed. Otherwise, its memory footprint is low. It's even worth arguing that a newly created JobInProgress object should look a lot like what JobDescription looks like. It's only when you 'initialize' it, at the point when the (first task in the) job can be considered for running, do you need to expand all the other data structures. This seems, IMO, to be a better way to handle scale than have another class. To be fair, JobDescription is really what the scheduler should be looking at, but it makes more sense if the Scheduler is a separate component/process. Otherwise, you're duplicating state in JobDescription and JobInProgress. You could also refactor JobInProgress so that it has a JobDescription member variable which it exposes, rather than expose separate methods for getting/setting priority or queue names, but there doesn't seem to be an advantage to it, other that conceptually encapsulating information that a Scheduler might need in one class.

          As to whether queue names need to be part of TaskScheduler: we have two options here.

          • Queues are explicit in the system, and jobs are always submitted to a queue. If so, you want this notion everywhere. JobTracker.submitJob() should be changed to take in a jobID and a queue name, as you're explicitly submitting a job to a queue. Then, TaskScheduler requires both a job and a queue name, in order to tell it that a job was submitted to the system (as per Matei and Tom's comment earlier, addJob() is a listener method and just needs to know when a job is submitted to the system).
          • Or, you could treat queues (and other things we may add later, such as Orgs) as part of the job configuration. So, a user submits a job, and everything the system needs to know is encapsulated in the jobID, when JobTracker.submitJob is called.

          Then the only data structure holding jobs is in the scheduler and doing queries can be done through this api.

          Do we want the Scheduler to serve queries? In the future, you may well want to think of the Scheduler as just an algorithm that, given the state of the system, only decides what task to give to a TT. Web serving may be done through a completely different component.

          I really think some other component besides the Scheduler needs to be responsible for storing jobs and maintaining data structures that associate the job with queues and deal with job persistence - everything to do with keeping track of jobs & queues in memory. Different schedulers impose different filters/sorting on these structures - they're really just algorithms that access these data structures. Schedulers may keep other data structures for their use. For example, in HADOOP-3445, the scheduler needs to know how many unique users have submitted jobs to a queue, or how many tasks for a given user are running. This information is kept in a different data structure that the scheduling code controls. It doesn't need to be persisted and doesn't need the same scaling/persistence functionality as you need for JobInProgress objects. So in that sense, the TaskScheduler interface should not also expose jobs and queues. getQueueNames() and getJobs() belong elsewhere (probably in a JobQueueManager class).

          You may actually want two separate interfaces - one for Scheduling (which will be similar to what TaskScheduler exposes) and one for iterating through jobs and queues. For performance sake, you may have the same class implement both, but they are two separate interfaces.

          Show
          Vivek Ratan added a comment - we want to support lazy loading of jobs so that we can scale up the job tracker, which means we shouldn't use JobInProgress in the API since it is very heavy Isn't there a far easier way to do this? A JobInProgress object grows in size when you call JobInProgress.initTasks(). The current JT code calls initTasks() when a job is submitted to the JT (the newly created JobInProgress object is actually put in a queue and some other thread calls initTasks(), but it's really called as early as possible). A simple fix to this is to call initTasks() only when a job is considered for running by the scheduler. That way, you initialize a JobInProgress object only when needed. Otherwise, its memory footprint is low. It's even worth arguing that a newly created JobInProgress object should look a lot like what JobDescription looks like. It's only when you 'initialize' it, at the point when the (first task in the) job can be considered for running, do you need to expand all the other data structures. This seems, IMO, to be a better way to handle scale than have another class. To be fair, JobDescription is really what the scheduler should be looking at, but it makes more sense if the Scheduler is a separate component/process. Otherwise, you're duplicating state in JobDescription and JobInProgress. You could also refactor JobInProgress so that it has a JobDescription member variable which it exposes, rather than expose separate methods for getting/setting priority or queue names, but there doesn't seem to be an advantage to it, other that conceptually encapsulating information that a Scheduler might need in one class. As to whether queue names need to be part of TaskScheduler: we have two options here. Queues are explicit in the system, and jobs are always submitted to a queue. If so, you want this notion everywhere. JobTracker.submitJob() should be changed to take in a jobID and a queue name, as you're explicitly submitting a job to a queue. Then, TaskScheduler requires both a job and a queue name, in order to tell it that a job was submitted to the system (as per Matei and Tom's comment earlier, addJob() is a listener method and just needs to know when a job is submitted to the system). Or, you could treat queues (and other things we may add later, such as Orgs) as part of the job configuration. So, a user submits a job, and everything the system needs to know is encapsulated in the jobID, when JobTracker.submitJob is called. Then the only data structure holding jobs is in the scheduler and doing queries can be done through this api. Do we want the Scheduler to serve queries? In the future, you may well want to think of the Scheduler as just an algorithm that, given the state of the system, only decides what task to give to a TT. Web serving may be done through a completely different component. I really think some other component besides the Scheduler needs to be responsible for storing jobs and maintaining data structures that associate the job with queues and deal with job persistence - everything to do with keeping track of jobs & queues in memory. Different schedulers impose different filters/sorting on these structures - they're really just algorithms that access these data structures. Schedulers may keep other data structures for their use. For example, in HADOOP-3445 , the scheduler needs to know how many unique users have submitted jobs to a queue, or how many tasks for a given user are running. This information is kept in a different data structure that the scheduling code controls. It doesn't need to be persisted and doesn't need the same scaling/persistence functionality as you need for JobInProgress objects. So in that sense, the TaskScheduler interface should not also expose jobs and queues. getQueueNames() and getJobs() belong elsewhere (probably in a JobQueueManager class). You may actually want two separate interfaces - one for Scheduling (which will be similar to what TaskScheduler exposes) and one for iterating through jobs and queues. For performance sake, you may have the same class implement both, but they are two separate interfaces.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12385980/JobScheduler-v10.patch
          against trunk revision 676772.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 6 new or modified tests.

          +1 javadoc. The javadoc tool did not generate any warning messages.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          -1 findbugs. The patch appears to introduce 1 new Findbugs warnings.

          +1 release audit. The applied patch does not increase the total number of release audit warnings.

          +1 core tests. The patch passed core unit tests.

          +1 contrib tests. The patch passed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2859/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2859/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2859/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2859/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12385980/JobScheduler-v10.patch against trunk revision 676772. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 6 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. -1 findbugs. The patch appears to introduce 1 new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2859/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2859/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2859/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2859/console This message is automatically generated.
          Hide
          Owen O'Malley added a comment -

          Fair point about the JobInProgress being fine for the API, provided that the scheduler is required to call initTasks on the JobInProgress when it should be loaded.

          On the other hand, we need

          • an event when a TIP changes state, so that the scheduler can update its data structures
          • an api to the scheduler that breaks down the queues and order within the queue, because the scheduler is the only place that has the order of jobs within the queue.

          So how does this look:

          class JobInProgress { 
            ...
            String getQueueName();
            Priority getPriority();
            void initTasks();
          }
          
          class TaskScheduler {
            void addJob(JobInProgress job) throws IOException;
            void removeJob(JobInProgress job) throws IOException;
            // the job has changed state
            void updateJob(Progress job) throws IOException;
            // the task (ie. map 0) has changed state
            void updateTask(TaskInProgress tip) throws IOException;
          
            // get a set of tasks for the given tracker
            List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException;
          
            // get all of the queue names
            List<String> getQueueNames();
            // get an ordered list of the jobs in the given queue
            List<JobDescription> getJobs(String queue) throws IOException;  
          }
          

          I don't think we need updateTaskAttempt, because I can't see anything that a potential scheduler would do with that fine of information.

          I think that you are right that we want the JobTracker to keep track of the running jobs and tasks, effectively owning the JobInProgress, TaskInProgress, and Tasks and updating their state based on the task tracker reports.

          In the medium term (ie. not this patch), this should only be for running and pending jobs. Any finished jobs will need to be queried via JobHistory or a similar interface.

          Show
          Owen O'Malley added a comment - Fair point about the JobInProgress being fine for the API, provided that the scheduler is required to call initTasks on the JobInProgress when it should be loaded. On the other hand, we need an event when a TIP changes state, so that the scheduler can update its data structures an api to the scheduler that breaks down the queues and order within the queue, because the scheduler is the only place that has the order of jobs within the queue. So how does this look: class JobInProgress { ... String getQueueName(); Priority getPriority(); void initTasks(); } class TaskScheduler { void addJob(JobInProgress job) throws IOException; void removeJob(JobInProgress job) throws IOException; // the job has changed state void updateJob(Progress job) throws IOException; // the task (ie. map 0) has changed state void updateTask(TaskInProgress tip) throws IOException; // get a set of tasks for the given tracker List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException; // get all of the queue names List< String > getQueueNames(); // get an ordered list of the jobs in the given queue List<JobDescription> getJobs( String queue) throws IOException; } I don't think we need updateTaskAttempt, because I can't see anything that a potential scheduler would do with that fine of information. I think that you are right that we want the JobTracker to keep track of the running jobs and tasks, effectively owning the JobInProgress, TaskInProgress, and Tasks and updating their state based on the task tracker reports. In the medium term (ie. not this patch), this should only be for running and pending jobs. Any finished jobs will need to be queried via JobHistory or a similar interface.
          Hide
          Matei Zaharia added a comment -

          an api to the scheduler that breaks down the queues and order within the queue, because the scheduler is the only place that has the order of jobs within the queue.

          What other pieces of code needs to use order within a queue except the scheduler? In fact, I think "order within a queue" doesn't even make sense for all schedulers. Some schedulers, such as the fair scheduler in HADOOP-3746, make all jobs runnable as soon as they are submitted and give them different fractions of the cluster.

          Similarly, I don't understand what getQueueNames() would be used for by pieces of code external to the scheduler. In fact, if the list of jobs is stored in the JobTracker (which I think we've said it should be), then the JobTracker has all the information it needs to figure out the queue names - it can scan through all jobs and get the queue name of each one from its getQueueName().

          Basically the scheduler should be a box whose only "output" is to assign a task, and whose only "input" is whatever state of the world it needs to know about in order to do its job (i.e. events for jobs being modified, etc). That makes it possible to separate the code that persists jobs from the code that schedules them.

          Show
          Matei Zaharia added a comment - an api to the scheduler that breaks down the queues and order within the queue, because the scheduler is the only place that has the order of jobs within the queue. What other pieces of code needs to use order within a queue except the scheduler? In fact, I think "order within a queue" doesn't even make sense for all schedulers. Some schedulers, such as the fair scheduler in HADOOP-3746 , make all jobs runnable as soon as they are submitted and give them different fractions of the cluster. Similarly, I don't understand what getQueueNames() would be used for by pieces of code external to the scheduler. In fact, if the list of jobs is stored in the JobTracker (which I think we've said it should be), then the JobTracker has all the information it needs to figure out the queue names - it can scan through all jobs and get the queue name of each one from its getQueueName(). Basically the scheduler should be a box whose only "output" is to assign a task, and whose only "input" is whatever state of the world it needs to know about in order to do its job (i.e. events for jobs being modified, etc). That makes it possible to separate the code that persists jobs from the code that schedules them.
          Hide
          Owen O'Malley added a comment -

          The users of the getQueueNames() and getJobs(queue) would be the web ui and the command line tools "bin/hadoop job". Of course the ordering doesn't have to be exclusive, but almost all schedulers will have a rough priority within the queue. It is basically about providing information about the scheduler to the user. Does that make sense?

          Show
          Owen O'Malley added a comment - The users of the getQueueNames() and getJobs(queue) would be the web ui and the command line tools "bin/hadoop job". Of course the ordering doesn't have to be exclusive, but almost all schedulers will have a rough priority within the queue. It is basically about providing information about the scheduler to the user. Does that make sense?
          Hide
          Matei Zaharia added a comment -

          The users of the getQueueNames() and getJobs(queue) would be the web ui and the command line tools "bin/hadoop job". Of course the ordering doesn't have to be exclusive, but almost all schedulers will have a rough priority within the queue. It is basically about providing information about the scheduler to the user. Does that make sense?

          I see, I guess it's an issue of increasing API complexity vs benefit of seeing jobs in sorted order then. Personally I'd prefer if the API that the scheduler had to implement was a simple as possible. I think there are two solutions to the problem you're mentioning with providing info about the scheduler to the user:

          • Short-term, just sort the jobs by submit time and also tell the user the priority of each job. This is easy to parse (the user can even figure out which job is which based on the order they submitted them), and it's also what the web UI currently provides.
          • Longer-term, it would be nice if schedulers (and maybe other elements of Hadoop as well) could simply register other web apps / servlets on the StatusHttpServer. This is actually how I'm planning to create a dashboard for my scheduler. Hopefully the only thing it requires is adding a getter for the server.
          Show
          Matei Zaharia added a comment - The users of the getQueueNames() and getJobs(queue) would be the web ui and the command line tools "bin/hadoop job". Of course the ordering doesn't have to be exclusive, but almost all schedulers will have a rough priority within the queue. It is basically about providing information about the scheduler to the user. Does that make sense? I see, I guess it's an issue of increasing API complexity vs benefit of seeing jobs in sorted order then. Personally I'd prefer if the API that the scheduler had to implement was a simple as possible. I think there are two solutions to the problem you're mentioning with providing info about the scheduler to the user: Short-term, just sort the jobs by submit time and also tell the user the priority of each job. This is easy to parse (the user can even figure out which job is which based on the order they submitted them), and it's also what the web UI currently provides. Longer-term, it would be nice if schedulers (and maybe other elements of Hadoop as well) could simply register other web apps / servlets on the StatusHttpServer. This is actually how I'm planning to create a dashboard for my scheduler. Hopefully the only thing it requires is adding a getter for the server.
          Hide
          Owen O'Malley added a comment -

          I see, I guess it's an issue of increasing API complexity vs benefit of seeing jobs in sorted order then. Personally I'd prefer if the API that the scheduler had to implement was a simple as possible. I think there are two solutions to the problem you're mentioning with providing info about the scheduler to the user:

          It is actually more than that. If the scheduler does not provide queues, it would be wrong to display the jobs as if they had queues. Note that any scheduler that does not want to deal with queues, only has to return

          {"default"}

          for the queue. And return all of the jobs for getJobs.

          The interface would be much more brittle, if you had custom servlets for each scheduler, because the interface there would be very broad. Note that you would also need a text version for the cli. I think the two query methods would be much less brittle.

          Show
          Owen O'Malley added a comment - I see, I guess it's an issue of increasing API complexity vs benefit of seeing jobs in sorted order then. Personally I'd prefer if the API that the scheduler had to implement was a simple as possible. I think there are two solutions to the problem you're mentioning with providing info about the scheduler to the user: It is actually more than that. If the scheduler does not provide queues, it would be wrong to display the jobs as if they had queues. Note that any scheduler that does not want to deal with queues, only has to return {"default"} for the queue. And return all of the jobs for getJobs. The interface would be much more brittle, if you had custom servlets for each scheduler, because the interface there would be very broad. Note that you would also need a text version for the cli. I think the two query methods would be much less brittle.
          Hide
          Tom White added a comment -

          For this issue, which is about moving scheduling logic from the JobTracker to a scheduler class, I think we can leave out queues. We don't currently have the explicit concept of a queue, so I think it makes sense to commit this change, and continue the discussion about adding queues in HADOOP-3445. As discussed earlier, this Jira will not change the public APIs yet, so we can go on evolving the scheduling interface.

          Fair point about the JobInProgress being fine for the API, provided that the scheduler is required to call initTasks on the JobInProgress when it should be loaded.

          The implication of this is that the Scheduler takes over the responsibility of managing the jobInitQueue. I've created a patch which does this (v11) by inserting a EagerTaskInitializationTaskScheduler into the TaskScheduler hierarchy. In doing so I needed a couple of lifecycle methods, which I've named following HADOOP-3628, so TaskScheduler can be retrofitted to extend Service after HADOOP-3628 is committed.

          Does this look OK?

          an event when a TIP changes state, so that the scheduler can update its data structures

          Would the taskUpdated method be called by JobTracker#updateTaskStatuses? I can see that it might be useful for schedulers to have this information, but perhaps this is something to add to the interface when a use case comes up? (TaskScheduler is an abstract class, so it's easy to add new methods to it.)

          Show
          Tom White added a comment - For this issue, which is about moving scheduling logic from the JobTracker to a scheduler class, I think we can leave out queues. We don't currently have the explicit concept of a queue, so I think it makes sense to commit this change, and continue the discussion about adding queues in HADOOP-3445 . As discussed earlier, this Jira will not change the public APIs yet, so we can go on evolving the scheduling interface. Fair point about the JobInProgress being fine for the API, provided that the scheduler is required to call initTasks on the JobInProgress when it should be loaded. The implication of this is that the Scheduler takes over the responsibility of managing the jobInitQueue. I've created a patch which does this (v11) by inserting a EagerTaskInitializationTaskScheduler into the TaskScheduler hierarchy. In doing so I needed a couple of lifecycle methods, which I've named following HADOOP-3628 , so TaskScheduler can be retrofitted to extend Service after HADOOP-3628 is committed. Does this look OK? an event when a TIP changes state, so that the scheduler can update its data structures Would the taskUpdated method be called by JobTracker#updateTaskStatuses? I can see that it might be useful for schedulers to have this information, but perhaps this is something to add to the interface when a use case comes up? (TaskScheduler is an abstract class, so it's easy to add new methods to it.)
          Hide
          Arun C Murthy added a comment -

          For this issue, which is about moving scheduling logic from the JobTracker to a scheduler class, I think we can leave out queues. We don't currently have the explicit concept of a queue, so I think it makes sense to commit this change, and continue the discussion about adding queues in HADOOP-3445.

          I'm ok with committing this as-is; as long as Matei is fine with changes to TaskScheduler which might arise due to discussions in other jiras... Matei?

          Show
          Arun C Murthy added a comment - For this issue, which is about moving scheduling logic from the JobTracker to a scheduler class, I think we can leave out queues. We don't currently have the explicit concept of a queue, so I think it makes sense to commit this change, and continue the discussion about adding queues in HADOOP-3445 . I'm ok with committing this as-is; as long as Matei is fine with changes to TaskScheduler which might arise due to discussions in other jiras... Matei?
          Hide
          Vivek Ratan added a comment -

          For this issue, which is about moving scheduling logic from the JobTracker to a scheduler class, I think we can leave out queues. We don't currently have the explicit concept of a queue, so I think it makes sense to commit this change, and continue the discussion about adding queues in HADOOP-3445. As discussed earlier, this Jira will not change the public APIs yet, so we can go on evolving the scheduling interface.

          It's fine to move this discussion elsewhere, but I think it should be in a new Jira. HADOOP-3445 is specifically about implementing part of HADOOP-3421 and comments there should reflect on the implementation (the algorithms for capacity redistribution, the algos for handling capacities and user limits, etc). This separate Jira should be about designing the Scheduler interface, given queues and perhaps some other new artifacts.

          Would the taskUpdated method be called by JobTracker#updateTaskStatuses? I can see that it might be useful for schedulers to have this information, but perhaps this is something to add to the interface when a use case comes up? (TaskScheduler is an abstract class, so it's easy to add new methods to it.)

          Just like a scheduler is a listener to jobs being added/deleted, and their states being modified, it should also be a listener to task states being modified. The 3445 scheduler needs this. For example, it keeps track of how many tasks of a user are running (to handle user limits). So it needs to know when a task starts running or when it completes. It can compute this by iterating through all jobs, but being a listener to a task status change is convenient. I'm not sure where exactly TaskScheduler.updateTask() will be called, but task status changes and job status changes both seem to be needed.

          Show
          Vivek Ratan added a comment - For this issue, which is about moving scheduling logic from the JobTracker to a scheduler class, I think we can leave out queues. We don't currently have the explicit concept of a queue, so I think it makes sense to commit this change, and continue the discussion about adding queues in HADOOP-3445 . As discussed earlier, this Jira will not change the public APIs yet, so we can go on evolving the scheduling interface. It's fine to move this discussion elsewhere, but I think it should be in a new Jira. HADOOP-3445 is specifically about implementing part of HADOOP-3421 and comments there should reflect on the implementation (the algorithms for capacity redistribution, the algos for handling capacities and user limits, etc). This separate Jira should be about designing the Scheduler interface, given queues and perhaps some other new artifacts. Would the taskUpdated method be called by JobTracker#updateTaskStatuses? I can see that it might be useful for schedulers to have this information, but perhaps this is something to add to the interface when a use case comes up? (TaskScheduler is an abstract class, so it's easy to add new methods to it.) Just like a scheduler is a listener to jobs being added/deleted, and their states being modified, it should also be a listener to task states being modified. The 3445 scheduler needs this. For example, it keeps track of how many tasks of a user are running (to handle user limits). So it needs to know when a task starts running or when it completes. It can compute this by iterating through all jobs, but being a listener to a task status change is convenient. I'm not sure where exactly TaskScheduler.updateTask() will be called, but task status changes and job status changes both seem to be needed.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12386181/JobScheduler-v11.patch
          against trunk revision 677470.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 6 new or modified tests.

          +1 javadoc. The javadoc tool did not generate any warning messages.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          -1 findbugs. The patch appears to introduce 1 new Findbugs warnings.

          -1 release audit. The applied patch generated 210 release audit warnings (more than the trunk's current 209 warnings).

          +1 core tests. The patch passed core unit tests.

          +1 contrib tests. The patch passed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2884/testReport/
          Release audit warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2884/artifact/trunk/current/releaseAuditDiffWarnings.txt
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2884/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2884/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2884/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12386181/JobScheduler-v11.patch against trunk revision 677470. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 6 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. -1 findbugs. The patch appears to introduce 1 new Findbugs warnings. -1 release audit. The applied patch generated 210 release audit warnings (more than the trunk's current 209 warnings). +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2884/testReport/ Release audit warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2884/artifact/trunk/current/releaseAuditDiffWarnings.txt Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2884/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2884/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2884/console This message is automatically generated.
          Hide
          Vivek Ratan added a comment -

          I realize we're trying to get this patch committed quickly, but having looked at the latest patch, I do have a concern. As per patch v11, we have the following class hierarchy: TaskScheduler --> EagerTaskInitializationTaskScheduler --> JobQueueTaskScheduler --> LimitTasksPerJobTaskScheduler. EagerTaskInitializationTaskScheduler provides a mechanism for initializing multiple JobInProgress objects asynchronously (through a separate thread). JobQueueTaskScheduler is your basic scheduler, and LimitTasksPerJobTaskScheduler is a scheduler that limits concurrent tasks per job. My concern is that this kind of class hierarchy is the wrong way to allow people to build their own schedulers. What you really want is a library, or a set of separate classes, that provide individual functionality: one for limiting concurrent tasks per job, one for initializing jobs in a separate thread, and so on. Then, somebody can build a scheduler by picking the various functionality they want and composing the classes that provide this functionality. Inheritance does not let you do that.

          Here's an example (based on 3445). I want to build a scheduler that limits concurrent tasks per job, but does not want to initialize jobs in a separate thread (it wants to initialize individual jobs directly, only when required, in order to scale). What do I do? I don;t want to extend LimitTasksPerJobTaskScheduler because then I get the functionality of EagerTaskInitializationTaskScheduler, which I don't want. What if my scheduler also supports some sort of fair share (give equal time slots to each user's jobs), and it also supports user limits (limit the number of total tasks associated by a user). Do I still extend LimitTasksPerJobTaskScheduler? In one class? Or do I define a further hierarchy: LimitTasksPerJobTaskScheduler --> FairShareScheduler --> LimitTasksPerUserScheduler ? Which class extends which other class? You really want people to build schedulers by composing lots of individual functionality because scheduling incorporates lots of individual algorithms, each possibly very different from another. I think you really want any class that extends TaskScheduler to be a complete scheduler in itself, made up of lots of different functionalities (the 3445 scheduler, for example, provides task limits per user AND capacities AND priorities AND some preemption, each of which may be reused by some scheduler). Because you want to allow different schedulers to share functionality (for example, two different schedulers may want to limit tasks per jobs, but may also support differing features, so they should ideally share code that limits tasks per job), you want this functionality to be available as composable objects, or in a separate library. You don't want hierarchies based on inheritance.

          I don't quite know what these composable objects looks like. Perhaps you define an interface which takes in tasks and decides if those tasks pass or fail the policy that the object is implementing. That should be our discussion. Having class hierarchies, as we do today, will severely limit extensibility, IMO.

          Show
          Vivek Ratan added a comment - I realize we're trying to get this patch committed quickly, but having looked at the latest patch, I do have a concern. As per patch v11, we have the following class hierarchy: TaskScheduler --> EagerTaskInitializationTaskScheduler --> JobQueueTaskScheduler --> LimitTasksPerJobTaskScheduler. EagerTaskInitializationTaskScheduler provides a mechanism for initializing multiple JobInProgress objects asynchronously (through a separate thread). JobQueueTaskScheduler is your basic scheduler, and LimitTasksPerJobTaskScheduler is a scheduler that limits concurrent tasks per job. My concern is that this kind of class hierarchy is the wrong way to allow people to build their own schedulers. What you really want is a library, or a set of separate classes, that provide individual functionality: one for limiting concurrent tasks per job, one for initializing jobs in a separate thread, and so on. Then, somebody can build a scheduler by picking the various functionality they want and composing the classes that provide this functionality. Inheritance does not let you do that. Here's an example (based on 3445). I want to build a scheduler that limits concurrent tasks per job, but does not want to initialize jobs in a separate thread (it wants to initialize individual jobs directly, only when required, in order to scale). What do I do? I don;t want to extend LimitTasksPerJobTaskScheduler because then I get the functionality of EagerTaskInitializationTaskScheduler, which I don't want. What if my scheduler also supports some sort of fair share (give equal time slots to each user's jobs), and it also supports user limits (limit the number of total tasks associated by a user). Do I still extend LimitTasksPerJobTaskScheduler? In one class? Or do I define a further hierarchy: LimitTasksPerJobTaskScheduler --> FairShareScheduler --> LimitTasksPerUserScheduler ? Which class extends which other class? You really want people to build schedulers by composing lots of individual functionality because scheduling incorporates lots of individual algorithms, each possibly very different from another. I think you really want any class that extends TaskScheduler to be a complete scheduler in itself, made up of lots of different functionalities (the 3445 scheduler, for example, provides task limits per user AND capacities AND priorities AND some preemption, each of which may be reused by some scheduler). Because you want to allow different schedulers to share functionality (for example, two different schedulers may want to limit tasks per jobs, but may also support differing features, so they should ideally share code that limits tasks per job), you want this functionality to be available as composable objects, or in a separate library. You don't want hierarchies based on inheritance. I don't quite know what these composable objects looks like. Perhaps you define an interface which takes in tasks and decides if those tasks pass or fail the policy that the object is implementing. That should be our discussion. Having class hierarchies, as we do today, will severely limit extensibility, IMO.
          Hide
          Tom White added a comment -

          Vivek, You're quite right. I think the hierarchy is a symptom of the abstract base class, TaskScheduler. It's mixing two concerns: listening to job state changes, and scheduling.

          To fix this I propose that we break out the listener methods from the TaskScheduler into a JobInProgressListener interface:

          
          interface JobInProgressListener {
            void jobAdded(JobInProgress job);
            void jobRemoved(JobInProgress job);
            void jobUpdated(JobInProgress job);
          }
          
          abstract class TaskScheduler {
            public void start() throws IOException {}
            public void terminate() throws IOException {}
            public abstract List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException;
          }
          
          

          TaskSchedulers can then use a choice of JobInProgressListener implementations. For example, JobQueueTaskScheduler has a JobQueueJobInProgressListener to maintain its job queue and a EagerTaskInitializationListener to do task initialization.

          TaskSchedulers register their listeners with the JobTracker so we add the following two methods to JobTracker (and the TaskTrackerManager interface):

          public void addJobInProgressListener(JobInProgressListener listener);
          public void removeJobInProgressListener(JobInProgressListener listener);
          

          In the future we might add a TaskInProgressListener interface that allowed TaskSchedulers to listen for changes to tasks.

          I want to build a scheduler that limits concurrent tasks per job, but does not want to initialize jobs in a separate thread

          To do this with the proposed changes you would override LimitTasksPerJobTaskScheduler#start so it doesn't start the EagerTaskInitializationListener, but instead creates a listener to initialize tasks according to its own policy.

          I've attached a new patch which implements the above (v12).

          Show
          Tom White added a comment - Vivek, You're quite right. I think the hierarchy is a symptom of the abstract base class, TaskScheduler. It's mixing two concerns: listening to job state changes, and scheduling. To fix this I propose that we break out the listener methods from the TaskScheduler into a JobInProgressListener interface: interface JobInProgressListener { void jobAdded(JobInProgress job); void jobRemoved(JobInProgress job); void jobUpdated(JobInProgress job); } abstract class TaskScheduler { public void start() throws IOException {} public void terminate() throws IOException {} public abstract List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException; } TaskSchedulers can then use a choice of JobInProgressListener implementations. For example, JobQueueTaskScheduler has a JobQueueJobInProgressListener to maintain its job queue and a EagerTaskInitializationListener to do task initialization. TaskSchedulers register their listeners with the JobTracker so we add the following two methods to JobTracker (and the TaskTrackerManager interface): public void addJobInProgressListener(JobInProgressListener listener); public void removeJobInProgressListener(JobInProgressListener listener); In the future we might add a TaskInProgressListener interface that allowed TaskSchedulers to listen for changes to tasks. I want to build a scheduler that limits concurrent tasks per job, but does not want to initialize jobs in a separate thread To do this with the proposed changes you would override LimitTasksPerJobTaskScheduler#start so it doesn't start the EagerTaskInitializationListener, but instead creates a listener to initialize tasks according to its own policy. I've attached a new patch which implements the above (v12).
          Hide
          Vivek Ratan added a comment -

          Tom, nice work. It certainly makes the hierarchy cleaner. It also cleanly separates out the differences between listening for changes and supporting a scheduler API.

          Another of my concerns (being able to share code/functionality among schedulers) is still not resolved, but it may be a nice-to-have feature, and will probably have a solution different from what we're discussing here, so perhaps that discussion can be on a separate Jira. To reiterate that concern: suppose I want a scheduler that limits tasks per job (so I'd like to reuse code from LimitTasksPerJobTaskScheduler, including the code that deals with configuration). Suppose I also want my scheduler to implement some fair-share functionality that some class, FairShareScheduler has defined (this could be a class similar to that proposed in HADOOP-3746). I'd like to reuse code from that class too. Maybe I also want some feature (per user limits, for example) that exists in the scheduler for 3445. How do I do that? Again, this may be something we do in the future. I don't think it affects the design of JobInProgressListener and TaskScheduler, so we can discuss it elsewhere at the appropriate time.

          Show
          Vivek Ratan added a comment - Tom, nice work. It certainly makes the hierarchy cleaner. It also cleanly separates out the differences between listening for changes and supporting a scheduler API. Another of my concerns (being able to share code/functionality among schedulers) is still not resolved, but it may be a nice-to-have feature, and will probably have a solution different from what we're discussing here, so perhaps that discussion can be on a separate Jira. To reiterate that concern: suppose I want a scheduler that limits tasks per job (so I'd like to reuse code from LimitTasksPerJobTaskScheduler, including the code that deals with configuration). Suppose I also want my scheduler to implement some fair-share functionality that some class, FairShareScheduler has defined (this could be a class similar to that proposed in HADOOP-3746 ). I'd like to reuse code from that class too. Maybe I also want some feature (per user limits, for example) that exists in the scheduler for 3445. How do I do that? Again, this may be something we do in the future. I don't think it affects the design of JobInProgressListener and TaskScheduler, so we can discuss it elsewhere at the appropriate time.
          Hide
          Tom White added a comment -

          Agreed. What you describe is composition at a finer level of granularity than the assignTasks method. Hopefully the components for doing this will emerge once a few schedulers have been written.

          Show
          Tom White added a comment - Agreed. What you describe is composition at a finer level of granularity than the assignTasks method. Hopefully the components for doing this will emerge once a few schedulers have been written.
          Hide
          Doug Cutting added a comment -

          My predictable comment: JobInProgressListener might better be an abstract class, so that its interface can evolve without breaking existing implementations.

          Show
          Doug Cutting added a comment - My predictable comment: JobInProgressListener might better be an abstract class, so that its interface can evolve without breaking existing implementations.
          Hide
          Hadoop QA added a comment -

          +1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12386304/JobScheduler-v12.patch
          against trunk revision 677712.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 6 new or modified tests.

          +1 javadoc. The javadoc tool did not generate any warning messages.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          +1 findbugs. The patch does not introduce any new Findbugs warnings.

          +1 release audit. The applied patch does not increase the total number of release audit warnings.

          +1 core tests. The patch passed core unit tests.

          +1 contrib tests. The patch passed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2893/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2893/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2893/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2893/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - +1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12386304/JobScheduler-v12.patch against trunk revision 677712. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 6 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2893/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2893/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2893/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2893/console This message is automatically generated.
          Hide
          Tom White added a comment -

          JobInProgressListener might better be an abstract class, so that its interface can evolve without breaking existing implementations.

          Yes, don't know why I didn't do that - perhaps I'm used to listeners being interfaces. Here's a patch with the change.

          Show
          Tom White added a comment - JobInProgressListener might better be an abstract class, so that its interface can evolve without breaking existing implementations. Yes, don't know why I didn't do that - perhaps I'm used to listeners being interfaces. Here's a patch with the change.
          Hide
          Hadoop QA added a comment -

          +1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12386376/JobScheduler-v12.1.patch
          against trunk revision 677872.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 6 new or modified tests.

          +1 javadoc. The javadoc tool did not generate any warning messages.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          +1 findbugs. The patch does not introduce any new Findbugs warnings.

          +1 release audit. The applied patch does not increase the total number of release audit warnings.

          +1 core tests. The patch passed core unit tests.

          +1 contrib tests. The patch passed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2901/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2901/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2901/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2901/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - +1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12386376/JobScheduler-v12.1.patch against trunk revision 677872. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 6 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2901/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2901/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2901/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2901/console This message is automatically generated.
          Hide
          Owen O'Malley added a comment -

          I just committed this. Thanks, Tom and Brice!

          Show
          Owen O'Malley added a comment - I just committed this. Thanks, Tom and Brice!
          Hide
          Vivek Ratan added a comment -

          I don't know what the protocol is for commenting on an issue that has been committed, and I don't mean to suggest un-committing the patch, but I just wanted to bring up a potential problem with making JobInProgressListener an abstract class, rather than an interface. I see Doug's point about it being perhaps easier to modify an abstract base rather than an interface. But, a scheduler, which extends TaskScheduler, may very easily want to also implement the JobInProgressListener methods. It may want to be in complete charge of the data structures to store jobs. In other words, a Scheduler may also be a listener to changes to JobInProgress objects. Is there an easy way to do that if JobInProgressListener is an abstract class?

          Show
          Vivek Ratan added a comment - I don't know what the protocol is for commenting on an issue that has been committed, and I don't mean to suggest un-committing the patch, but I just wanted to bring up a potential problem with making JobInProgressListener an abstract class, rather than an interface. I see Doug's point about it being perhaps easier to modify an abstract base rather than an interface. But, a scheduler, which extends TaskScheduler, may very easily want to also implement the JobInProgressListener methods. It may want to be in complete charge of the data structures to store jobs. In other words, a Scheduler may also be a listener to changes to JobInProgress objects. Is there an easy way to do that if JobInProgressListener is an abstract class?
          Hide
          Owen O'Malley added a comment -

          I just created HADOOP-3801 to address this.

          Show
          Owen O'Malley added a comment - I just created HADOOP-3801 to address this.
          Hide
          Hudson added a comment -
          Show
          Hudson added a comment - Integrated in Hadoop-trunk #581 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/581/ )

            People

            • Assignee:
              Brice Arnould
              Reporter:
              Brice Arnould
            • Votes:
              1 Vote for this issue
              Watchers:
              21 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development