Index: src/java/org/apache/hama/bsp/BSPMaster.java =================================================================== --- src/java/org/apache/hama/bsp/BSPMaster.java (리비전 1005310) +++ src/java/org/apache/hama/bsp/BSPMaster.java (작업 사본) @@ -100,7 +100,8 @@ /** * Start the BSPMaster process, listen on the indicated hostname/port */ - public BSPMaster(HamaConfiguration conf) throws IOException, InterruptedException { + public BSPMaster(HamaConfiguration conf) throws IOException, + InterruptedException { this(conf, generateNewIdentifier()); } @@ -413,7 +414,7 @@ job.completedTask(tip, report); } else if (report.getRunState() == TaskStatus.State.FAILED) { // TODO Tell the job to fail the relevant task - + } else { job.updateTaskStatus(tip, report); } @@ -564,10 +565,36 @@ } @Override + public JobStatus[] jobsToComplete() throws IOException { + return getJobStatus(jobs.values(), true); + } + + @Override public JobStatus[] getAllJobs() throws IOException { - return null; + return getJobStatus(jobs.values(), false); } + private synchronized JobStatus[] getJobStatus(Collection jips, + boolean toComplete) { + if (jips == null) { + return new JobStatus[] {}; + } + List jobStatusList = new ArrayList(); + for (JobInProgress jip : jips) { + JobStatus status = jip.getStatus(); + if (toComplete) { + if (status.getRunState() == JobStatus.RUNNING + || status.getRunState() == JobStatus.PREP) { + jobStatusList.add(status); + } + } else { + jobStatusList.add(status); + } + } + + return jobStatusList.toArray(new JobStatus[jobStatusList.size()]); + } + @Override public synchronized String getFilesystemName() throws IOException { if (fs == null) { Index: src/java/org/apache/hama/bsp/LocalJobRunner.java =================================================================== --- src/java/org/apache/hama/bsp/LocalJobRunner.java (리비전 1005310) +++ src/java/org/apache/hama/bsp/LocalJobRunner.java (작업 사본) @@ -30,7 +30,7 @@ this.fs = FileSystem.get(conf); this.conf = conf; } - + @Override public JobStatus[] getAllJobs() throws IOException { // TODO Auto-generated method stub @@ -180,4 +180,10 @@ // TODO Auto-generated method stub } } + + @Override + public JobStatus[] jobsToComplete() throws IOException { + // TODO Auto-generated method stub + return null; + } } Index: src/java/org/apache/hama/bsp/TaskInProgress.java =================================================================== --- src/java/org/apache/hama/bsp/TaskInProgress.java (리비전 1005310) +++ src/java/org/apache/hama/bsp/TaskInProgress.java (작업 사본) @@ -90,6 +90,7 @@ public Task getTaskToRun(GroomServerStatus status) throws IOException { Task t = null; + // TODO use the TaskID, instead of String. String taskid = null; if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts)) { taskid = new String("task_" + status.getGroomName() + "_" + nextTaskId); Index: src/java/org/apache/hama/ipc/JobSubmissionProtocol.java =================================================================== --- src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (리비전 1005310) +++ src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (작업 사본) @@ -26,86 +26,98 @@ import org.apache.hama.bsp.TaskAttemptID; /** - * Protocol that a groom server and the central BSP Master use to communicate. This - * interface will contains several methods: submitJob, killJob, and killTask. + * Protocol that a groom server and the central BSP Master use to communicate. + * This interface will contains several methods: submitJob, killJob, and + * killTask. */ public interface JobSubmissionProtocol extends HamaRPCProtocolVersion { - + /** * Allocate a new id for the job. + * * @return job id * @throws IOException */ public BSPJobID getNewJobId() throws IOException; - - + /** - * Submit a Job for execution. Returns the latest profile for - * that job. - * The job files should be submitted in system-dir/jobName. - * + * Submit a Job for execution. Returns the latest profile for that job. The + * job files should be submitted in system-dir/jobName. + * * @param jobID * @param jobFile * @return jobStatus * @throws IOException */ - //public JobStatus submitJob(BSPJobID jobName) throws IOException; + // public JobStatus submitJob(BSPJobID jobName) throws IOException; public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException; - + /** * Get the current status of the cluster + * * @param detailed if true then report groom names as well * @return summary of the state of the cluster */ public ClusterStatus getClusterStatus(boolean detailed) throws IOException; - + /** * Grab a handle to a job that is already known to the BSPMaster. - * @return Profile of the job, or null if not found. + * + * @return Profile of the job, or null if not found. */ public JobProfile getJobProfile(BSPJobID jobid) throws IOException; - + /** * Grab a handle to a job that is already known to the BSPMaster. + * * @return Status of the job, or null if not found. */ public JobStatus getJobStatus(BSPJobID jobid) throws IOException; - + /** - * A BSP system always operates on a single filesystem. This - * function returns the fs name. ('local' if the localfs; 'addr:port' - * if dfs). The client can then copy files into the right locations - * prior to submitting the job. + * A BSP system always operates on a single filesystem. This function returns + * the fs name. ('local' if the localfs; 'addr:port' if dfs). The client can + * then copy files into the right locations prior to submitting the job. */ public String getFilesystemName() throws IOException; - - /** - * Get all the jobs submitted. + + /** + * Get the jobs that are not completed and not failed + * + * @return array of JobStatus for the running/to-be-run jobs. + */ + public JobStatus[] jobsToComplete() throws IOException; + + /** + * Get all the jobs submitted. + * * @return array of JobStatus for the submitted jobs */ public JobStatus[] getAllJobs() throws IOException; /** - * Grab the bspmaster system directory path where job-specific files are to be placed. + * Grab the bspmaster system directory path where job-specific files are to be + * placed. * * @return the system directory where job-specific files are to be placed. */ public String getSystemDir(); - + /** * Kill the indicated job */ public void killJob(BSPJobID jobid) throws IOException; - - + /** * Kill indicated task attempt. + * * @param taskId the id of the task to kill. - * @param shouldFail if true the task is failed and added to failed tasks list, otherwise - * it is just killed, w/o affecting job failure status. - */ - public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException; + * @param shouldFail if true the task is failed and added to failed tasks + * list, otherwise it is just killed, w/o affecting job failure + * status. + */ + public boolean killTask(TaskAttemptID taskId, boolean shouldFail) + throws IOException; - }