Hadoop Common
  1. Hadoop Common
  2. HADOOP-719

Integration of Hadoop with batch schedulers

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Duplicate
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      Hadoop On Demand (HOD) is an integration of Hadoop with batch schedulers like Condor/torque/sun grid etc. Hadoop On Demand or HOD hereafter is a system that populates a Hadoop instance using a shared batch scheduler. HOD will find a requested number of nodes and start up Hadoop daemons on them. Users map reduce jobs can then run on the hadoop instance. After the job is done, HOD gives back the nodes to the shared batch scheduler. A group of users will use HOD to acquire Hadoop instances of varying sizes and the batch scheduler will schedule requests in a way that important jobs gain more importance/resources and finish fast. Here are a list of requirements for HOD and batch schedulers:

      Key Requirements :

      — Should allocate the specified minimum number of nodes for a job
      Many batch jobs can finish in time, only when enough resources are allocated. Therefore batch scheduler should allocate the asked number of nodes for a given job when the job starts. This is simple form of what's known as gang scheduling.

      Often the minimum nodes are not available right away, especially if the job asked for a large number. The batch scheduler should support advance reservation for important jobs so that the wait time can be determined. In advance reservation, a reservation is created on earliest future point when the preoccupied nodes become available. When nodes are currently idle but booked by future reservations, batch scheduler is ok to give them to other jobs to increase system utilization, but only when doing so does not delay existing reservations.

      — run short urgent job without costing too much loss to long job. Especially, should not kill job tracker of long job.
      Some jobs, mostly short ones, are time sensitive and need urgent treatment. Often, large portion of cluster nodes will be occupied by long running jobs. Batch scheduler should be able to preempt long jobs and run urgent jobs. Then, urgent jobs will finish quickly and long jobs can re-gain the nodes afterward.

      When preemption happens, HOD should minimize the loss to long jobs. Especially, it should not kill job tracker of long job.

      — be able to dial up, at run time, share of resources for more important projects.
      Viewed at high level, a given cluster is shared by multiple projects. A project consists of a number of jobs submitted by a group of users.Batch scheduler should allow important projects to have more resources. This should be tunable at run time as what projects deem more important may change over time.

      — prevent malicious abuse of the system.
      A shared cluster environment can be put in jeopardy if malicious or erroneous job code does:
      – hold unneeded resources for a long period
      – use privileges for unworthy work
      Such abuse can easily cause under-utilization or starvation of other jobs. Batch scheduler should allow setting up policies for preventing resource abuse by:
      – limit privileges to legitimate uses asking for proper amount
      – throttle peak use of resources per player
      – monitor and reduce starvation

      — The behavior should be simple and predictable
      When status of the system is queried, we should be able to determine what factors caused it to reach current status and what could be the future behavior with or without our tuning on the system.

      — be portable to major resource managers
      HOD design should be portable so that in future we are able to plugin other resource manager.

      Some of the key requirements are implemented by the batch schedulers. The others need to be implemented by HOD.

        Issue Links

          Activity

          Hide
          Mahadev konar added a comment -

          resolving this since hod is checked into the source code.

          Show
          Mahadev konar added a comment - resolving this since hod is checked into the source code.
          Hide
          Mehdi added a comment -

          Is there any documents for HOD architecture and implementation online?

          Show
          Mehdi added a comment - Is there any documents for HOD architecture and implementation online?
          Hide
          Doug Cutting added a comment -

          > Would this be a good candidate to check into hadoop contrib?

          It sounds useful to me. I can't see why not. Contrib sounds appropriate.

          Show
          Doug Cutting added a comment - > Would this be a good candidate to check into hadoop contrib? It sounds useful to me. I can't see why not. Contrib sounds appropriate.
          Hide
          Mahadev konar added a comment -

          We have implemented a basic integration of Hadoop with Torque resource manager – Hadoop On Demand (HOD). It is implemented in python. The implementation allows you to bring up

          multiple map reduce clusters on demand. In the current implementation you can specify a given number of nodes to run your job. HOD gets these nodes from the

          resource manager and brings up a map reduce cluster on these nodes. A job is specified to HOD to run on this transient cluster. HOD will run the job on this

          cluster and then bring down the transient map reduce cluster and give back the nodes to the resource manager. All the logs are shipped back to the client

          that invoked HOD. The drawbacks of the current implementation are –
          1) it just allows one job to run per map reduce cluster – i.e. until the main() method in your job.jar returns.
          2) the machines allocated via torque might not be machines having data locally.
          3) There is no support for software distribution to run multiple instances of different hadoop versions
          4) The current implementation waits for node allocation (rather than putting them in the queue and exiting) until the nodes are allocated.

          The current implementation is also capable of bringing up a transient DFS. This would be helpful for testing purposes where you could bring up a transient

          DFS and a transient MapRed cluster and run jobs on it. The current implementation is also capable of bringing up transient map reduce on a static list of nodes (via ssh) and locally.

          We intend to address the current drawbacks in the later upgrades to HOD.

          Would this be a good candidate to check into hadoop contrib?

          Show
          Mahadev konar added a comment - We have implemented a basic integration of Hadoop with Torque resource manager – Hadoop On Demand (HOD). It is implemented in python. The implementation allows you to bring up multiple map reduce clusters on demand. In the current implementation you can specify a given number of nodes to run your job. HOD gets these nodes from the resource manager and brings up a map reduce cluster on these nodes. A job is specified to HOD to run on this transient cluster. HOD will run the job on this cluster and then bring down the transient map reduce cluster and give back the nodes to the resource manager. All the logs are shipped back to the client that invoked HOD. The drawbacks of the current implementation are – 1) it just allows one job to run per map reduce cluster – i.e. until the main() method in your job.jar returns. 2) the machines allocated via torque might not be machines having data locally. 3) There is no support for software distribution to run multiple instances of different hadoop versions 4) The current implementation waits for node allocation (rather than putting them in the queue and exiting) until the nodes are allocated. The current implementation is also capable of bringing up a transient DFS. This would be helpful for testing purposes where you could bring up a transient DFS and a transient MapRed cluster and run jobs on it. The current implementation is also capable of bringing up transient map reduce on a static list of nodes (via ssh) and locally. We intend to address the current drawbacks in the later upgrades to HOD. Would this be a good candidate to check into hadoop contrib?
          Hide
          Mahadev konar added a comment -

          The three components of HOD are :
          bin/hod : This is the part of HOD that talks to the underlying scheduler/resource manager (condor, torque etc.) and asks for machines. This is the main component of HOD which allocates specified number of nodes and then monitors the HOD instance for any updates/ failuers. A HOD instance is a map reduce instance of Hadoop.
          bin/hod-tt : This is the script taht is run on the node which invokes a tasktracker instance.
          bin/hod-jt: This is the scipt that invokes the jobtracker instance on a given node.

          The basic idea being:
          — The user invokes bin/hod with all the required arguments (including the job.jar and hadoop jars)
          — bin/hod gets the required number of nodes from the underlying resource managers
          — invokes bin/hod-jt on the machine that will be running the jobtracker
          — invokes bin/hod-tt on the machine that will be running task trackers

          Here is a design document for HOD.
          ----- Design Document for bin/hod
          The two key design points for bin/hod are :

          1. Abstract Factory that allows batch scheduler specific behaviours behind common API
          2. event message loop that abstracts post-startup interaction between hadoop and batch scheduler. handles grow/shrink, termination, etc.

          The command line parameters for bin/hod are:

          -jar <map-reduce JAR>

          -resource-manager torque, condor, ...

          -min-node <n>
          the minimum number of nodes required for this map reduce instance

          -max-node <m>
          the maximum number of nodes required for this map reduce instance

          -qos <qos-level>

          -account <project-account-name>

          -wall-time <t>
          -S or -to-scheduler k=v,k1=v1,...
          -H or -to-hadoop k=v,...
          -J or -to-jobclient k=v,...
          and system-related ones like workdir, namenode addr, resource manager package dir, hadoop dir or tarball, etc

          The Classes that bin/hod will contain are the following:

          ClusterConfig--
          This is an abstract factory for all the cluster configs.

          Can be instantiated to –
          TorqueClusterConfig()

          CondorClusterConfig()
          ... etc
          These classes have information on what the cluster specific commands/configurations are-
          like
          submit() method would map to condor_submit in case of Condor and qsub in case of torque

          ClusterHead--
          This abstract class maps to the JobTracker class. A proxy for the Jobtracker.

          ClusterBody--

          This abstract class maps to the tasktrackers /worker nodes

          The above two classes can have batch scheduler specific ClusterHead/Body as there subclasses.

          There are two different classes for TaskTrackers/JobTracker since the reactions to events caused at at the JobTracker/Tasktrackers is different.

          ClusterMonitor--
          This class monitors the map reduce instance including JobTracker/Tasktrackers/jobclient, polls them for events and handles all those events. This will also
          be extended to batch scheduler specific cluster monitor and would implement common functionality (like querying the jobtracker/tasktrackers)

          JobConfig--

          This class handles the basic arguments of map/reduce jobs. The input dir/output dir and number of maps/reduces. Later we could use the input dir information to ask for nodes that are local/rack local to the input files for map.

          The pseudo code for bin/hod would look like--

          main()
          Config cfg = new Config(args) // this is main command line parser
          JobConfig jc = new JobConfig(cfg) // uses the following args – input dir, output dir, number of maps, number of reduces

          // The cluster config takes a jobconfig argument to create scheduler config that can specify what kind of nodes it wants (nodes local to some rack ?)
          ClusterConfig cc = ClusterConfig.createInstance(cfg, jc) // uses any scheduler specific arguments from cfg

          // The cluster head is the jobtracker is always launched first
          ClusterHead ch = ClusterHead.createInstance(cfg, cc) // gets the hadoop specific args from cfg

          // The clusterBody takes cluster head as the input parameter
          ClusterBody cb = ClusterBody.createInstance(cfg, ch, cc)

          JobClient jobclient = new JobClient(jobconf)
          jobclient.submitJob(ch)
          ClusterMonitor cm = new ClusterMonitor(ch, cb, jobclient)

          while(e = cm.newEvent())

          { // the events might be of the type-- jobtracker failed, tasktracker failed, only a few reduces are left with most of nodes lying idle, etc... cm.process(e) }

          Comments?

          Show
          Mahadev konar added a comment - The three components of HOD are : bin/hod : This is the part of HOD that talks to the underlying scheduler/resource manager (condor, torque etc.) and asks for machines. This is the main component of HOD which allocates specified number of nodes and then monitors the HOD instance for any updates/ failuers. A HOD instance is a map reduce instance of Hadoop. bin/hod-tt : This is the script taht is run on the node which invokes a tasktracker instance. bin/hod-jt: This is the scipt that invokes the jobtracker instance on a given node. The basic idea being: — The user invokes bin/hod with all the required arguments (including the job.jar and hadoop jars) — bin/hod gets the required number of nodes from the underlying resource managers — invokes bin/hod-jt on the machine that will be running the jobtracker — invokes bin/hod-tt on the machine that will be running task trackers Here is a design document for HOD. ----- Design Document for bin/hod The two key design points for bin/hod are : 1. Abstract Factory that allows batch scheduler specific behaviours behind common API 2. event message loop that abstracts post-startup interaction between hadoop and batch scheduler. handles grow/shrink, termination, etc. The command line parameters for bin/hod are: -jar <map-reduce JAR> -resource-manager torque, condor, ... -min-node <n> the minimum number of nodes required for this map reduce instance -max-node <m> the maximum number of nodes required for this map reduce instance -qos <qos-level> -account <project-account-name> -wall-time <t> -S or -to-scheduler k=v,k1=v1,... -H or -to-hadoop k=v,... -J or -to-jobclient k=v,... and system-related ones like workdir, namenode addr, resource manager package dir, hadoop dir or tarball, etc The Classes that bin/hod will contain are the following: ClusterConfig-- This is an abstract factory for all the cluster configs. Can be instantiated to – TorqueClusterConfig() CondorClusterConfig() ... etc These classes have information on what the cluster specific commands/configurations are- like submit() method would map to condor_submit in case of Condor and qsub in case of torque ClusterHead-- This abstract class maps to the JobTracker class. A proxy for the Jobtracker. ClusterBody-- This abstract class maps to the tasktrackers /worker nodes The above two classes can have batch scheduler specific ClusterHead/Body as there subclasses. There are two different classes for TaskTrackers/JobTracker since the reactions to events caused at at the JobTracker/Tasktrackers is different. ClusterMonitor-- This class monitors the map reduce instance including JobTracker/Tasktrackers/jobclient, polls them for events and handles all those events. This will also be extended to batch scheduler specific cluster monitor and would implement common functionality (like querying the jobtracker/tasktrackers) JobConfig-- This class handles the basic arguments of map/reduce jobs. The input dir/output dir and number of maps/reduces. Later we could use the input dir information to ask for nodes that are local/rack local to the input files for map. The pseudo code for bin/hod would look like-- main() Config cfg = new Config(args) // this is main command line parser JobConfig jc = new JobConfig(cfg) // uses the following args – input dir, output dir, number of maps, number of reduces // The cluster config takes a jobconfig argument to create scheduler config that can specify what kind of nodes it wants (nodes local to some rack ?) ClusterConfig cc = ClusterConfig.createInstance(cfg, jc) // uses any scheduler specific arguments from cfg // The cluster head is the jobtracker is always launched first ClusterHead ch = ClusterHead.createInstance(cfg, cc) // gets the hadoop specific args from cfg // The clusterBody takes cluster head as the input parameter ClusterBody cb = ClusterBody.createInstance(cfg, ch, cc) JobClient jobclient = new JobClient(jobconf) jobclient.submitJob(ch) ClusterMonitor cm = new ClusterMonitor(ch, cb, jobclient) while(e = cm.newEvent()) { // the events might be of the type-- jobtracker failed, tasktracker failed, only a few reduces are left with most of nodes lying idle, etc... cm.process(e) } Comments?
          Hide
          Mahadev konar added a comment -

          – another key requirement for HOD is distribution of hadoop jars onto the cluster. Most of the batch schedulers are not effective in transferring of files. HOD would require a service for transferring of hadoop jars.

          Show
          Mahadev konar added a comment - – another key requirement for HOD is distribution of hadoop jars onto the cluster. Most of the batch schedulers are not effective in transferring of files. HOD would require a service for transferring of hadoop jars.

            People

            • Assignee:
              Mahadev konar
              Reporter:
              Mahadev konar
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development