The goal is to build a discrete event simulator to simulate conditions under which a Hadoop Map-Reduce Scheduler performs on a large-scale Map-Reduce cluster running a specific workload.
Mumak takes as input a reasonably large workload (e.g. a month's worth of jobs from production cluster(s)) and simulates them in a matter of hours if not minutes on very few machines.
What is it not?
It is a non-goal to simulate the actual map/reduce tasks themselves.
The scope of Version 1.0 does not include specifics of trying to simulate the actual workload itself. It will merely take a digest of the Hadoop Map-Reduce JobHistory of all jobs in the workload, and faithfully assume the actual run-time of individual tasks from the digest without simulating the tasks themselves. Clearly this will not try and simulate resources and their utilization on the actual tasktrackers, interaction between running tasks on the tasktrackers etc. The simulation of individual tasks is left for future versions.
Some other simplifications are also made (mainly due to the lacking of such information from the job trace):
- No job dependency. Jobs are faithfully submitted to the cluster as defined in the job trace.
- No modeling of failure correlations (eg a few task attempts fail due to a node failure, but in the simulation run, the same set of task attempts may run on different nodes).
What goes in? What comes out?
The 'workload' alluded to in the previous sections needs elaboration. The proposal is to use the job-history for all jobs which are part of the workload. The Hadoop Map-Reduce per-job job-history is a very detailed log of each component task with run-times, counters etc. We can use this to generate a per-job digest with all relevant information. Thus, it is quite sufficient and feasible to collect workload from different clusters (research, production etc.) to be then used during simulation.
More specifically, the following is a list of details it simulates:
- It would simulate a cluster of the same size and network topology as where the source trace comes from. The reason for this restriction is because data locality is an important factor to the scheduler decisions and scaling the job traces obtained from cluster A and try to simulate it on cluster B with different diameters require a much thorough understanding.
- It would simulate failures faithfully as recorded in the job trace. Namely, if a particular task attempt fails in the trace, it would fail in the simulation.
- It would replay the same number of map tasks and reduce tasks as specified in the job digest.
- It would use the inputsplit locations as are recorded in the job trace.
The simulator will generate the same job-history for each of the simulated jobs. Thus we can use the same tools for slicing and dicing the output of the simulator.
Design & Architecture
An overarching design goal for Mumak is that we should be able to use the exact same Map-Reduce Schedulers (listed above) as-is without any changes. This implies that we use the same interfaces used by Hadoop Map-Reduce so that it is trivial to plug-in the Scheduler of interest.
Along the same lines it is a legitimate goal to use all relevant Hadoop Map-Reduce interfaces between various components so that it is trivial to replace each by the appropriate Hadoop Map-Reduce component (e.g. run the simulator in a emulation mode with real Map-Reduce clusters etc. in future).
Mumak consists of the following components:
- Discrete Event Simulator Engine with an event-queue
- Simulated JobTracker
- Simulated Cluster (set of tasktrackers)
- Client for handling job-submission
The Simulator Engine is the heart of Mumak. It manages all the discrete events in virtual time and fires the appropriate handlers (JobClient, TaskTracker) when the events occur. Typically each event responded to by a component results in a new set of events to be fired in the future (virtual time).
Some of the various event-types are:
- HeartbeatEvent - An event which instructs a specific Tasktracker to send a heartbeat to the JobTracker.
- TaskCompletionEvent - An event which denotes the completion (success/failure) of a specific map or reduce task which is sent to the TaskTracker.
- JobSubmissionEvent - An event which instructs the JobClient to submit a specific job to the JobTracker
The JobTracker is driver for the Map-Reduce Scheduler. On receipt of heartbeats from various TaskTrackers it 'tracks' progress of the current jobs and forwards the appropriate information to the Scheduler to allow it to make the task-scheduling decisions. The simulated JobTracker uses the virtual time to allow the scheduler to make scheduling decisions.
The JobTracker also uses the per-job digest to fill-in information about expected runtime for each of the tasks scheduled by the Scheduler to get Mumakil to simulate run-times for each task.
The JobTracker is purely reactive in the sense that it only reacts to hearbeats sent by TaskTrackers. Further more it does not directly handle any events from the Engine, it only responds to the InterTrackerProtocol.heartbeat calls as in the real-world.
The simulated cluster consists of an appropriate number of simulated TaskTrackers which respond to events generated by Engine. Each simulated TaskTracker maintains state about currently running tasks (all tasks are 'running' till an appropriate TaskCompletionEvent fires) and sends periodic status updates to the JobTracker on receipt of HeartbeatEvent.
When a HeartbeatEvent fires, the appropriate TaskTracker build status-reports for each of the running tasks and sends a hearbeat to the JobTracker (InterTrackerProtocol.heartbeat). The JobTracker updates its data-structures (JobInProgress, TaskInProgress etc.) to refect the latest state and forwards information to the Scheduler. If any new tasks are be to scheduled on this TaskTracker the JobTracker also fills in expected run-times for each via information gleaned from the job-digest. The TaskTracker then processes the instructions to launch the new tasks and responds to the Engine by inserting a set of new TaskCompletionEvents for the new tasks into the EventQueue.
When a TaskCompletionEvent fires, the appropriate TaskTracker marks the relevant task as complete and forwards that information to the JobTracker on the next HeartbeatEvent.
The JobClient responds to JobSubmissionEvents sent by the Engine and submits the appropriate jobs to the JobTracker via the standard JobSubmissionProtocol.
Job Summary for Simulation
The following can be derived from job history file by rumen:
- Detailed job trace with properties and counters of each task attempt (of each task of each job in a workload).
- Digest of jobs in a workload. From the jobs in the workload, we can derive statistical information of tasks to build a model which can help us fabricate tasks which not even scheduled to run (e.g. tasks of a failed job which were never run since the job was declared as FAILED soon after submission). Along the same lines, the digest will also have statistical details for helping modelling run-times for data-local maps, rack-local maps and off-rack maps based on data in the job-history logs. This is necessary for simulating tasks which might be scheduled on different nodes in the simulation run by the scheduler.
How to deal with failure in workload?
We will try to faithfully model task failures by replaying failed task-attempts by using information in the detailed job-traces.
We also plan to build a simple statistical model of task failures which can then be used to simulate tasks which were never scheduled since the job failed early etc.
Simulating Reduce Tasks
In Mumak 1.0 we do not plan to simulate the running of the actual map/reduce tasks. Given that it is not possible to simulate the implicit dependency between completion of maps, the shuffle phase and the start of the reduce phase of the reduce tasks. Hence, we have decided to use a special AllMapsFinished event generated by the SimulatedJobTracker to trigger the start of the reduce-phase. For the same reasons, we have to model the total runtime of the reduce task as the summation of the time taken for completion of all maps and the time taken for individual task to complete the reduce-phase by itself. Thus, we are not going to try modelling the shuffle phase accurately.
Furthermore, we will ignore map-task failures due to failed shuffles since we are not simulating the shuffle-phase.