Hadoop Map/Reduce
  1. Hadoop Map/Reduce
  2. MAPREDUCE-278

Proposal for redesign/refactoring of the JobTracker and TaskTracker

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Not A Problem
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      During discussions on HADOOP-815 wrt some hard-to-maintain code on the JobTracker we all agreed that the current state-of-affairs there is brittle and merits some rework.

      Case in point: there are back-calls from TaskInProgress to JobTracker and from JobInProgress to JobTracker which mean that synchronization is quite involved and brittle, leading to issues like HADOOP-600. Also one is forced to lock several data-structures individually before certain operations (taskTrackers, trackerExpiryQueue, jobs etc.)

      Hence I'd like to present some early thoughts (which have undergone a quick iteration) on how we could do slightly better by a bit of redesign/refactoring, also during discussions with Owen on the same we agreed that HADOOP-554 is an integral part along the same direction... and I also feel that a good candidate to be done along with this is HADOOP-398 (mapred package refactoring).

      Context:
      ---------
      a) The unit of communication between the JobTracker & TaskTracker is a 'task'.
      b) Due to (a) the JobTracker maintains a bunch of information related on the 'taskid' i.e. taskidToTipMap, taskidToTrackerMap etc. and hence we need to update the JobTracker's data-structures via back-calls from TaskInProgress & JobInProgress where the context is available (complete/failed task, already-completed task etc.)
      c) This implies that we have a fairly elaborate and hard to maintain locking structures and also some redundant information in the JobTracker; making it harder to maintain.

      Overall at both the JobTracker & TaskTracker the concept of a 'job' is overshadowed by the 'task'; which I propose we fix.

      Proposal:
      ----------

      Here is the main flow of control:
      JobTracker -> JobInProgress -> TaskInProgress -> task_attempt

      The main idea is to break the existing nexus between the JobTracker & TaskInProgress/taskid by (I've put code for illustrative purposes only, and ignored pieces irrelevant to this discussion):

      a) Making the 'job' the primary unit of communication between JobTracker & TaskTracker.

      b) TaskTrackerStatus now looks like this:

      class TaskTrackerStatus

      { List<JobStatus> jobStatuses; // the status of the 'jobs' running on a TaskTracker String getTrackerName(); }

      class JobStatus

      { List<TaskStatus> taskStatuses; // the status of the 'tasks' belonging to a job JobId getJobId(); }

      c) The JobTracker maintains only a single map of jobid -> JobInProgress, and mapping from taskTracker -> List<JobInProgress>

      Map<JobId, JobInProgress> allJobs;
      Map<String, List<JobInProgress>> trackerToJobsMap;

      d) The JobTracker delegates a bunch of responsibilities to the JobInProgress to reflect the fact the primary 'concept' in map/reduce is the 'job', thus empowering the JobInProgress class:

      class JobInProgress

      { TaskInProgress[] mapTasks; TaskInProgress[] reduceTasks; Map<String, List<TaskInProgress>> trackerToTasksMap; // tracker -> tasks running Map<String, List<TaskAttempt>> trackerToMarkedTasksMap; // tracker -> completed (success/failed/killed) task-attempt, // but the tracker doesn't know it yet void updateStatus(JobStatus jobStatus); MapOutputLocation[] getMapOutputLocations(int[] mapTasksNeeded, int reduce); TaskAttempt getTaskToRun(String taskTracker); List<TaskTrackerAction> getTaskToKill(String taskTracker); }

      d) On receipt of TaskTrackerStatus from a tracker, the processeing of heartbeat looks like this:

      for (JobStatus jobStatus : taskTrackerStatus.getJobStatuses()) {
      JobInProgress job = allJobs.get(jobId);
      synchronized (job)

      { job.updateStatus(jobStatus); return (HeartbeatResponse(repsonseId, job.getTaskAttemptToRun(trackerName), job.getTaskToKill(trackerName) )); }

      }

      The big change is that the JobTracker delegates a lot of responsibility to the JobInProgress, we get away from all the complicated synchronization constructs: simply lock the JobInProgress object at all places via allJobs/trackerToJobsMap and we are done. This also enhances throughput since mostly we will not need to lock up the JobTracker (even in the heartbeat loop); locking the JobInProgress or the 2 maps is sufficient in most cases... thus enhance the inherent parallelism of the JobTracker's inner loop (processing heartbeat) and provide better response when multiple jobs are running on the cluster.

      Hence the JobInProgress is responsible for maintaining it's TaskInProgress'es which in turn are completely responsible for the TaskAttempt`s, the JobInProgress also provides sufficient information as and when needed to the JobTracker to schedule jobs/tasks and the JobTracker is blissfully unaware of the innards of jobs/tasks.

      -

      I hope to articulate more a general direction towards an improved and maintainable 'mapred' and would love to hear out how we can improve and pitfalls to avoid... lets discuss. We could take this piecemeal an implement or at one go...

      Last, not least; I propose that while we are at this we redo the nomenclature a bit:
      JobInProgress -> Job
      TaskInProgress -> Task
      taskid -> replace with a new TaskAttempt
      this should help clarify each class and it's roles.

      Of course we will probably need a separate org.apache.hadoop.mapred.job.Task v/s org.apache.hadoop.mapred.task.Task which is why I feel HADOOP-554 (refactoring of mapred packages) would be very important to get a complete, coherent solution.

      Thoughts?

      1. TestUTF8AndStringGetBytes.java
        0.7 kB
        yixiaohua
      2. Job_Tracker_FSM.pdf
        256 kB
        Sharad Agarwal
      3. mapred_as_dfa.patch
        17 kB
        Arun C Murthy

        Issue Links

          Activity

          Hide
          Harsh J added a comment -

          yixiaohua - Did you accidentally upload that file here? This JIRA is closed and the patch you've attached appears irrelevant. Please reupload to the right JIRA instead.

          Show
          Harsh J added a comment - yixiaohua - Did you accidentally upload that file here? This JIRA is closed and the patch you've attached appears irrelevant. Please reupload to the right JIRA instead.
          Hide
          Harsh J added a comment -

          Closing out as a result of MAPREDUCE-279

          Show
          Harsh J added a comment - Closing out as a result of MAPREDUCE-279
          Hide
          Sharad Agarwal added a comment -

          Attaching the pdf having state transition diagrams. Note that for making finite state machine, I had to introduce more states than what we explicitly have right now. Not surprising, the state machine of Job turned out to be quite complex due to various stages like setup, cleanup and handling of failures and user kill actions. A lot of these currently we do it in roundabout ways and even don't handle properly.

          An interesting thing to note is that if we have the clear state model in JT, then it becomes much much simpler to move to the model where TT is just the task executor agnostic to task type.

          Show
          Sharad Agarwal added a comment - Attaching the pdf having state transition diagrams. Note that for making finite state machine, I had to introduce more states than what we explicitly have right now. Not surprising, the state machine of Job turned out to be quite complex due to various stages like setup, cleanup and handling of failures and user kill actions. A lot of these currently we do it in roundabout ways and even don't handle properly. An interesting thing to note is that if we have the clear state model in JT, then it becomes much much simpler to move to the model where TT is just the task executor agnostic to task type.
          Hide
          Sharad Agarwal added a comment -

          I want to revive this discussion. We have accumulated enough technical debt and it is getting harder and harder to maintain and add features to jobtracker. Sometime back I captured the state machine of various entities in job tracker. Will upload the document shortly.

          Show
          Sharad Agarwal added a comment - I want to revive this discussion. We have accumulated enough technical debt and it is getting harder and harder to maintain and add features to jobtracker. Sometime back I captured the state machine of various entities in job tracker. Will upload the document shortly.
          Hide
          Arun C Murthy added a comment -

          An interesting point which came about while on HADOOP-1839 is that, as of now, there isn't a straight-forward Status maintained for the TaskInProgress class, rather it is computed everytime - another thing to fix.

          Show
          Arun C Murthy added a comment - An interesting point which came about while on HADOOP-1839 is that, as of now, there isn't a straight-forward Status maintained for the TaskInProgress class, rather it is computed everytime - another thing to fix.
          Hide
          Enis Soztutar added a comment -

          Working on HADOOP-135, i have indeed realized the same design issues Arun pointed out. The main weakness in JT is its dependency to taskids. The other outstanding fact is that JobInProgress does most of the work for Task attempt management. Ideally the data and control flow should obey JobTracker -> JobInProgress -> TaskInProgress -> task_attempt, and the classes should delegate as much work as possible to the next.

          A big +1 to the following change :
          JobInProgress -> Job
          TaskInProgress -> Task
          taskid -> replace with a new TaskAttempt

          I think we should be careful about the change in JobStatus to hold List<TaskStatus>, since then a call to JobSubmissionProtocol.getJobStatus() may result in Seriallizing all the TaskStatuses.

          Show
          Enis Soztutar added a comment - Working on HADOOP-135 , i have indeed realized the same design issues Arun pointed out. The main weakness in JT is its dependency to taskids. The other outstanding fact is that JobInProgress does most of the work for Task attempt management. Ideally the data and control flow should obey JobTracker -> JobInProgress -> TaskInProgress -> task_attempt, and the classes should delegate as much work as possible to the next. A big +1 to the following change : JobInProgress -> Job TaskInProgress -> Task taskid -> replace with a new TaskAttempt I think we should be careful about the change in JobStatus to hold List<TaskStatus>, since then a call to JobSubmissionProtocol.getJobStatus() may result in Seriallizing all the TaskStatuses.
          Hide
          Arun C Murthy added a comment -

          Forgot to mention that the above patch depends on the HADOOP-1395 patch (https://issues.apache.org/jira/secure/attachment/12357704/HADOOP-1395_1_20070521.patch) ...

          Show
          Arun C Murthy added a comment - Forgot to mention that the above patch depends on the HADOOP-1395 patch ( https://issues.apache.org/jira/secure/attachment/12357704/HADOOP-1395_1_20070521.patch ) ...
          Hide
          Arun C Murthy added a comment -

          While thinking about a generic dfa (HADOOP-1395) I started wondering about some places where it could be applicable and this I thought could be the right place to put down some early thoughts...

          I've attached a half-baked patch to illustrate how Job/Task/TaskAttempt (i.e. JobInProgress/TaskInProgress/Task in current nomenclature) could be modelled as a dfa...

          Appreciate if people could chime in... what do hadoop-

          {dev|users}

          think? Is it an overkill? Impractical? Feasible?

          Thoughts?

          Show
          Arun C Murthy added a comment - While thinking about a generic dfa ( HADOOP-1395 ) I started wondering about some places where it could be applicable and this I thought could be the right place to put down some early thoughts... I've attached a half-baked patch to illustrate how Job/Task/TaskAttempt (i.e. JobInProgress/TaskInProgress/Task in current nomenclature) could be modelled as a dfa... Appreciate if people could chime in... what do hadoop- {dev|users} think? Is it an overkill? Impractical? Feasible? Thoughts?
          Hide
          Doug Cutting added a comment -

          +1, this sounds like a good direction to me.

          Show
          Doug Cutting added a comment - +1, this sounds like a good direction to me.

            People

            • Assignee:
              Sharad Agarwal
              Reporter:
              Arun C Murthy
            • Votes:
              0 Vote for this issue
              Watchers:
              24 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development