Index: src/examples/org/apache/hama/examples/PiEstimator.java =================================================================== --- src/examples/org/apache/hama/examples/PiEstimator.java (revision 1033343) +++ src/examples/org/apache/hama/examples/PiEstimator.java (working copy) @@ -99,7 +99,7 @@ // Set the job name bsp.setJobName("pi estimation example"); bsp.setBspClass(MyEstimator.class); - bsp.setNumBspTask(10); + bsp.setNumBspTask(1); BSPJobClient jobClient = new BSPJobClient(conf); ClusterStatus cluster = jobClient.getClusterStatus(true); Index: src/examples/org/apache/hama/examples/SerializePrinting.java =================================================================== --- src/examples/org/apache/hama/examples/SerializePrinting.java (revision 1033343) +++ src/examples/org/apache/hama/examples/SerializePrinting.java (working copy) @@ -75,7 +75,7 @@ // Set the job name bsp.setJobName("serialize printing"); bsp.setBspClass(HelloBSP.class); - bsp.setNumBspTask(10); + bsp.setNumBspTask(1); BSPJobClient.runJob(bsp); } Index: src/java/org/apache/hama/bsp/BSPMaster.java =================================================================== --- src/java/org/apache/hama/bsp/BSPMaster.java (revision 1033343) +++ src/java/org/apache/hama/bsp/BSPMaster.java (working copy) @@ -584,6 +584,17 @@ jobs.put(job.getProfile().getJobID(), job); taskScheduler.addJob(job); } + + + // TODO Later, we should use the JobInProgressListener -- edwardyoon + try { + LOG.debug(">> init tasks."); + job.initTasks(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return job.getStatus(); } Index: src/java/org/apache/hama/bsp/JobInProgress.java =================================================================== --- src/java/org/apache/hama/bsp/JobInProgress.java (revision 1033343) +++ src/java/org/apache/hama/bsp/JobInProgress.java (working copy) @@ -46,6 +46,7 @@ } static final Log LOG = LogFactory.getLog(JobInProgress.class); + boolean tasksInited = false; Configuration conf; JobProfile profile; @@ -62,15 +63,16 @@ // private LocalFileSystem localFs; private BSPJobID jobId; final BSPMaster master; - List tasks; + TaskInProgress tasks[] = new TaskInProgress[0]; private long superstepCounter; + int numBSPTasks = 0; + int clusterSize; + public JobInProgress(BSPJobID jobId, BSPMaster master, Configuration conf) throws IOException { this.conf = conf; this.jobId = jobId; - - this.tasks = new ArrayList(); this.localFs = FileSystem.getLocal(conf); this.master = master; @@ -88,7 +90,9 @@ FileSystem fs = jobDir.getFileSystem(conf); jobFile = new Path(jobDir, "job.xml"); fs.copyToLocalFile(jobFile, localJobFile); - BSPJobContext job = new BSPJobContext(localJobFile, jobId); + BSPJob job = new BSPJob(jobId, localJobFile.toString()); + this.numBSPTasks = job.getNumBspTask(); + LOG.debug(">>> " + job.getNumBspTask()); this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job .getJobName()); @@ -100,9 +104,6 @@ } - // /////////////////////////////////////////////////// - // Accessors for the JobInProgress - // /////////////////////////////////////////////////// public JobProfile getProfile() { return profile; } @@ -124,6 +125,13 @@ } /** + * @return the number of desired tasks. + */ + public int desiredBSPTasks() { + return numBSPTasks; + } + + /** * @return The JobID of this JobInProgress. */ public BSPJobID getJobID() { @@ -139,16 +147,54 @@ // /////////////////////////////////////////////////// // Create/manage tasks // /////////////////////////////////////////////////// + + public synchronized void initTasks() throws IOException { + if (tasksInited) { + return; + } + + // adjust number of map tasks to actual number of splits + this.tasks = new TaskInProgress[numBSPTasks]; + for (int i = 0; i < numBSPTasks; i++) { + tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(), + this.master, this.conf, this, i); + } + + // Update job status + //this.status = new JobStatus(this.status.getJobID(), 1.0f, 1.0f, + // JobStatus.RUNNING); + + tasksInited = true; + LOG.debug("Job is initialized."); + } + public synchronized Task obtainNewTask(GroomServerStatus status, int clusterSize, int numUniqueHosts) { - LOG.debug("clusterSize: " + clusterSize); + /* + if (this.status.getRunState() != JobStatus.RUNNING) { + LOG.info("Cannot create task split for " + profile.getJobID()); + return null; + } + */ + + int target = findNewMapTask(status, clusterSize, numUniqueHosts); + if (target == -1) { + return null; + } Task result = null; try { + /* TaskInProgress tip = new TaskInProgress(getJobID(), this.jobFile .toString(), this.master, this.conf, this, numUniqueHosts); - tasks.add(tip); - result = tip.getTaskToRun(status); + tasks.add(tip); + + for (int i = 0; i < tasks.length; i++) { + result = tasks[0].getTaskToRun(status); + } + */ + + result = tasks[0].getTaskToRun(status); } catch (IOException e) { e.printStackTrace(); } @@ -156,6 +202,15 @@ return result; } + private int findNewMapTask(GroomServerStatus status, int clusterSize, + int numUniqueHosts) { + + // Update latest-known cluster size + this.clusterSize = clusterSize; + + return 0; + } + public void completedTask(TaskInProgress tip, TaskStatus status) { String taskid = status.getTaskId(); updateTaskStatus(tip, status); @@ -174,9 +229,6 @@ } } - this.status = new JobStatus(this.status.getJobID(), 1.0f, 1.0f, - JobStatus.RUNNING); - if (allDone) { LOG.debug("Job successfully done."); @@ -186,13 +238,14 @@ } } - public synchronized void updateTaskStatus(TaskInProgress tip, TaskStatus taskStatus) { + public synchronized void updateTaskStatus(TaskInProgress tip, + TaskStatus taskStatus) { tip.updateStatus(taskStatus); // update tip if (superstepCounter < taskStatus.getSuperstepCount()) { superstepCounter = taskStatus.getSuperstepCount(); // TODO Later, we have to update JobInProgress status here - + } }