Index: src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java =================================================================== --- src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (revision 395866) +++ src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (working copy) @@ -17,6 +17,7 @@ package org.apache.hadoop.mapred; import java.io.*; +import java.util.Vector; /** * Protocol that a JobClient and the central JobTracker use to communicate. The @@ -64,4 +65,17 @@ * prior to submitting the job. */ public String getFilesystemName() throws IOException; + + /** + * @return a vector with running jobs + */ + public Vector runningJobs(); + /** + * @return a vector with completed jobs + */ + public Vector completedJobs(); + /** + * @return a vector with failed jobs + */ + public Vector failedJobs(); } Index: src/java/org/apache/hadoop/mapred/JobStatus.java =================================================================== --- src/java/org/apache/hadoop/mapred/JobStatus.java (revision 395866) +++ src/java/org/apache/hadoop/mapred/JobStatus.java (working copy) @@ -26,7 +26,7 @@ * * @author Mike Cafarella **************************************************/ -class JobStatus implements Writable { +public class JobStatus implements Writable { static { // register a ctor WritableFactories.setFactory Index: src/java/org/apache/hadoop/mapred/JobInProgress.java =================================================================== --- src/java/org/apache/hadoop/mapred/JobInProgress.java (revision 395866) +++ src/java/org/apache/hadoop/mapred/JobInProgress.java (working copy) @@ -30,7 +30,7 @@ // and its latest JobStatus, plus a set of tables for // doing bookkeeping of its Tasks. /////////////////////////////////////////////////////// -class JobInProgress { +public class JobInProgress { public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.mapred.JobInProgress"); JobProfile profile; Index: src/java/org/apache/hadoop/mapred/JobTrackerInfoServer.java =================================================================== --- src/java/org/apache/hadoop/mapred/JobTrackerInfoServer.java (revision 395866) +++ src/java/org/apache/hadoop/mapred/JobTrackerInfoServer.java (working copy) @@ -29,7 +29,7 @@ * * @author Mike Cafarella *******************************************************/ -class JobTrackerInfoServer { +public class JobTrackerInfoServer { public static class RedirectHandler extends AbstractHttpHandler { public void handle(String pathInContext, String pathParams, HttpRequest request, HttpResponse response) throws HttpException, IOException { Index: src/java/org/apache/hadoop/mapred/JobClient.java =================================================================== --- src/java/org/apache/hadoop/mapred/JobClient.java (revision 395866) +++ src/java/org/apache/hadoop/mapred/JobClient.java (working copy) @@ -176,7 +176,7 @@ this.conf = conf; String tracker = conf.get("mapred.job.tracker", "local"); if ("local".equals(tracker)) { - this.jobSubmitClient = new LocalJobRunner(conf); + this.jobSubmitClient = LocalJobRunner.getInstance(conf); } else { this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, Index: src/java/org/apache/hadoop/mapred/LocalJobRunner.java =================================================================== --- src/java/org/apache/hadoop/mapred/LocalJobRunner.java (revision 395866) +++ src/java/org/apache/hadoop/mapred/LocalJobRunner.java (working copy) @@ -25,7 +25,7 @@ import org.apache.hadoop.util.LogFormatter; /** Implements MapReduce locally, in-process, for debugging. */ -class LocalJobRunner implements JobSubmissionProtocol { +public class LocalJobRunner implements JobSubmissionProtocol { public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.mapred.LocalJobRunner"); @@ -34,8 +34,12 @@ private Configuration conf; private int map_tasks = 0; private int reduce_tasks = 0; + private static LocalJobRunner fInstance; + private String localMachine; - private class Job extends Thread + private long startTime; + + public class Job extends Thread implements TaskUmbilicalProtocol { private String file; private String id; @@ -44,6 +48,16 @@ private JobStatus status = new JobStatus(); private ArrayList mapIds = new ArrayList(); private MapOutputFile mapoutputFile; + private long startTime = 0; + private long finishTime = 0; + int mapDone = 0; + int reduceDone = 0; + private String mapTaskId=""; + private String mapStateString=""; + private String reduceTaskId=""; + private String reduceStateString=""; + private StringBuffer mapDiagnostic; + private StringBuffer reduceDiagnostic; private JobProfile profile; private Path localFile; private FileSystem localFs; @@ -53,7 +67,6 @@ this.id = "job_" + newId(); this.mapoutputFile = new MapOutputFile(); this.mapoutputFile.setConf(conf); - this.localFile = new JobConf(conf).getLocalPath("localRunner/"+id+".xml"); this.localFs = FileSystem.getNamed("local", conf); @@ -63,9 +76,12 @@ "http://localhost:8080/", job.getJobName()); this.status.jobid = id; this.status.runState = JobStatus.RUNNING; - + jobs.put(id, this); + this.startTime = System.currentTimeMillis(); + this.mapDiagnostic = new StringBuffer(); + this.reduceDiagnostic = new StringBuffer(); this.start(); } @@ -88,6 +104,7 @@ map.setConf(job); map_tasks += 1; map.run(job, this); + mapDone++; map_tasks -= 1; } @@ -113,9 +130,11 @@ reduce_tasks += 1; reduce.run(job, this); reduce_tasks -= 1; + this.reduceDone++; this.mapoutputFile.removeAll(reduceId); this.status.runState = JobStatus.SUCCEEDED; + finishTime = System.currentTimeMillis(); } catch (Throwable t) { this.status.runState = JobStatus.FAILED; @@ -141,17 +160,27 @@ public void progress(String taskId, float progress, String state) { LOG.info(state); + float taskIndex = mapIds.indexOf(taskId); if (taskIndex >= 0) { // mapping float numTasks = mapIds.size(); status.mapProgress = (taskIndex/numTasks)+(progress/numTasks); + this.mapTaskId = taskId; + this.mapStateString = state; } else { status.reduceProgress = progress; + this.reduceTaskId = taskId; + this.reduceStateString = state; } } public void reportDiagnosticInfo(String taskid, String trace) { - // Ignore for now + float taskIndex = mapIds.indexOf(taskid); + if (taskIndex >= 0) { + this.mapDiagnostic.append(trace); + } else { + this.reduceDiagnostic.append(trace); + } } public boolean ping(String taskid) throws IOException { @@ -170,12 +199,52 @@ public synchronized void fsError(String message) throws IOException { LOG.severe("FSError: "+ message); } + + public String getId() { + return this.id; + } + + public JobStatus getStatus() { + return this.status; + } + public long getStartTime() { + return this.startTime; + } + + public long getFinishTime(){ + return this.finishTime; + } + + public int getDesiredMaps() { + return this.mapIds.size(); + } + + public int getDesiredReduces() { + return 1; + } + + public int getFinishedMaps() { + return this.mapDone; + } + + public int getFinishedReduces() { + return this.reduceDone; + } } - public LocalJobRunner(Configuration conf) throws IOException { + public static LocalJobRunner getInstance(Configuration configuration) throws IOException { + if(fInstance == null) { + fInstance = new LocalJobRunner(configuration); + } + return fInstance; + } + + private LocalJobRunner(Configuration conf) throws IOException { this.fs = FileSystem.get(conf); this.conf = conf; + this.localMachine = "local"; + this.startTime = System.currentTimeMillis(); } // JobSubmissionProtocol methods @@ -185,7 +254,9 @@ } public void killJob(String id) { - ((Thread)jobs.get(id)).stop(); + Job job = (Job) jobs.get(id); + job.finishTime = System.currentTimeMillis(); + ((Thread) job).stop(); } public JobProfile getJobProfile(String id) { @@ -194,10 +265,21 @@ } public TaskReport[] getMapTaskReports(String id) { - return new TaskReport[0]; + float progress = getJobStatus(id).mapProgress; + StringBuffer buffer = getJob(id).mapDiagnostic; + String stateString = getJob(id).mapStateString; + String taskId = getJob(id).mapTaskId; + return new TaskReport[] { new TaskReport( + taskId, progress, stateString, new String[] { buffer.toString() }) }; } + public TaskReport[] getReduceTaskReports(String id) { - return new TaskReport[0]; + float progress = getJobStatus(id).reduceProgress; + StringBuffer buffer = getJob(id).reduceDiagnostic; + String stateString = getJob(id).reduceStateString; + String taskId = getJob(id).reduceTaskId; + return new TaskReport[] { new TaskReport( + taskId, progress, stateString, new String[] { buffer.toString() }) }; } public JobStatus getJobStatus(String id) { @@ -212,4 +294,53 @@ public ClusterStatus getClusterStatus() { return new ClusterStatus(1, map_tasks, reduce_tasks, 1); } + + public Vector runningJobs() { + return getJobs(JobStatus.RUNNING); + } + + public Vector completedJobs() { + return getJobs(JobStatus.SUCCEEDED); + } + + public Vector failedJobs() { + return getJobs(JobStatus.FAILED); + } + + /** + * @param status + * @return a job by status + */ + public Vector getJobs(int status) { + Vector v = new Vector(); + for (Iterator it = this.jobs.values().iterator(); it.hasNext();) { + Job job = (Job) it.next(); + if (job.getStatus().getRunState() == status) { + v.add(job); + } + } + return v; + } + + /** + * @param jobId + * @return the job by id + */ + public Job getJob(String jobId) { + return (Job) this.jobs.get(jobId); + } + + /** + * @return the name of the machine + */ + public String getJobTrackerMachine() { + return this.localMachine; + } + + /** + * @return the start time + */ + public long getStartTime() { + return this.startTime; + } } Index: src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java =================================================================== --- src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java (revision 395866) +++ src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java (working copy) @@ -28,7 +28,7 @@ * * @author Mike Cafarella **************************************************/ -class TaskTrackerStatus implements Writable { +public class TaskTrackerStatus implements Writable { static { // register a ctor WritableFactories.setFactory Index: src/java/org/apache/hadoop/mapred/JobProfile.java =================================================================== --- src/java/org/apache/hadoop/mapred/JobProfile.java (revision 395866) +++ src/java/org/apache/hadoop/mapred/JobProfile.java (working copy) @@ -26,7 +26,7 @@ * * @author Mike Cafarella **************************************************/ -class JobProfile implements Writable { +public class JobProfile implements Writable { static { // register a ctor WritableFactories.setFactory Index: src/java/org/apache/hadoop/dfs/DFSClient.java =================================================================== --- src/java/org/apache/hadoop/dfs/DFSClient.java (revision 395866) +++ src/java/org/apache/hadoop/dfs/DFSClient.java (working copy) @@ -38,7 +38,7 @@ * * @author Mike Cafarella, Tessa MacDuff ********************************************************/ -class DFSClient implements FSConstants { +public class DFSClient implements FSConstants { public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.fs.DFSClient"); static int MAX_BLOCK_ACQUIRE_FAILURES = 3; ClientProtocol namenode; Index: src/java/org/apache/hadoop/dfs/DistributedFileSystem.java =================================================================== --- src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (revision 395866) +++ src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (working copy) @@ -176,7 +176,7 @@ return "DFS[" + dfs + "]"; } - DFSClient getClient() { + public DFSClient getClient() { return dfs; } Index: src/java/org/apache/hadoop/dfs/DatanodeInfo.java =================================================================== --- src/java/org/apache/hadoop/dfs/DatanodeInfo.java (revision 395866) +++ src/java/org/apache/hadoop/dfs/DatanodeInfo.java (working copy) @@ -27,7 +27,7 @@ * * @author Mike Cafarella **************************************************/ -class DatanodeInfo implements Writable, Comparable { +public class DatanodeInfo implements Writable, Comparable { static { // register a ctor WritableFactories.setFactory