Index: core/src/main/java/org/apache/hama/bsp/BSPMaster.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPMaster.java (revision 1182623) +++ core/src/main/java/org/apache/hama/bsp/BSPMaster.java (working copy) @@ -733,6 +733,8 @@ JobStatus status = jip.getStatus(); status.setStartTime(jip.getStartTime()); + status.setNumOfTasks(jip.getNumOfTasks()); + // Sets the user name status.setUsername(jip.getProfile().getUser()); status.setName(jip.getJobName()); Index: core/src/main/java/org/apache/hama/bsp/BSPPeer.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPPeer.java (revision 1182623) +++ core/src/main/java/org/apache/hama/bsp/BSPPeer.java (working copy) @@ -227,8 +227,8 @@ BSPMessageSerializer msgSerializer = null; if (this.conf.getBoolean("bsp.checkpoint.enabled", false)) { msgSerializer = new BSPMessageSerializer(conf.getInt( - "bsp.checkpoint.port", - Integer.parseInt(CheckpointRunner.DEFAULT_PORT))); + "bsp.checkpoint.port", Integer + .valueOf(CheckpointRunner.DEFAULT_PORT))); } this.messageSerializer = msgSerializer; } @@ -237,8 +237,8 @@ try { if (LOG.isDebugEnabled()) LOG.debug("reinitialize(): " + getPeerName()); - this.server = RPC.getServer(this, peerAddress.getHostName(), - peerAddress.getPort(), conf); + this.server = RPC.getServer(this, peerAddress.getHostName(), peerAddress + .getPort(), conf); server.start(); LOG.info(" BSPPeer address:" + peerAddress.getHostName() + " port:" + peerAddress.getPort()); @@ -389,15 +389,8 @@ public void process(WatchedEvent event) { this.complete = true; synchronized (mutex) { - LOG.info(">>>>>>>>>>>>>>> at superstep " + getSuperstepCount() + LOG.debug(">>>>>>>>>>>>>>> at superstep " + getSuperstepCount() + " taskid:" + taskid.toString() + " is notified."); - /* - * try { Stat s = zk.exists(pathToSuperstepZnode+"/ready", false); - * if(null != s) { zk.delete(pathToSuperstepZnode+"/ready", 0); } } - * catch(KeeperException.NoNodeException nne) { - * LOG.warn("Ignore because znode may be deleted.", nne); } - * catch(Exception e) { throw new RuntimeException(e); } - */ mutex.notifyAll(); } } @@ -464,8 +457,9 @@ + taskid.getJobID().toString() + "/" + getSuperstepCount(); while (true) { List znodes = zk.getChildren(pathToSuperstepZnode, false); - LOG.info("leaveBarrier() !!! checking znodes cotnains /ready node or not: at superstep:" - + getSuperstepCount() + " znode:" + znodes); + LOG + .info("leaveBarrier() !!! checking znodes contnains /ready node or not: at superstep:" + + getSuperstepCount() + " znode:" + znodes); if (znodes.contains("ready")) { znodes.remove("ready"); } @@ -597,20 +591,23 @@ } protected BSPPeerInterface getBSPPeerConnection(InetSocketAddress addr) - throws NullPointerException { + throws NullPointerException, IOException { BSPPeerInterface peer; synchronized (this.peers) { peer = peers.get(addr); - if (peer == null) { - try { - peer = (BSPPeerInterface) RPC.getProxy(BSPPeerInterface.class, - BSPPeerInterface.versionID, addr, this.conf); - } catch (IOException e) { - LOG.error(e); + int retries = 0; + while (peer != null) { + peer = (BSPPeerInterface) RPC.getProxy(BSPPeerInterface.class, + BSPPeerInterface.versionID, addr, this.conf); + + retries++; + if (retries > 10) { + umbilical.fatalError(taskid, addr + " doesn't repond."); } - this.peers.put(addr, peer); } + + this.peers.put(addr, peer); } return peer; @@ -630,8 +627,8 @@ "Peername must consist of exactly ONE \":\"! Given peername was: " + peerName); } - return new InetSocketAddress(peerAddrParts[0], - Integer.parseInt(peerAddrParts[1])); + return new InetSocketAddress(peerAddrParts[0], Integer + .valueOf(peerAddrParts[1])); } @Override Index: core/src/main/java/org/apache/hama/bsp/JobInProgress.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/JobInProgress.java (revision 1182623) +++ core/src/main/java/org/apache/hama/bsp/JobInProgress.java (working copy) @@ -133,6 +133,10 @@ return finishTime; } + public int getNumOfTasks() { + return tasks.length; + } + /** * @return the number of desired tasks. */ @@ -181,7 +185,7 @@ LOG.debug("numBSPTasks: " + numBSPTasks); } - // adjust number of map tasks to actual number of splits + // adjust number of BSP tasks to actual number of splits this.tasks = new TaskInProgress[numBSPTasks]; for (int i = 0; i < numBSPTasks; i++) { tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(), Index: core/src/main/java/org/apache/hama/bsp/JobStatus.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/JobStatus.java (revision 1182623) +++ core/src/main/java/org/apache/hama/bsp/JobStatus.java (working copy) @@ -97,7 +97,8 @@ private String user; private long superstepCount; private String name; - + private int tasks; + private long finishTime; public JobStatus() { @@ -172,6 +173,14 @@ public synchronized void setRunState(int state) { this.runState = state; } + + public synchronized void setNumOfTasks(int tasks) { + this.tasks = tasks; + } + + public synchronized int getNumOfTasks() { + return tasks; + } public synchronized long getSuperstepCount() { return superstepCount; Index: core/src/main/java/org/apache/hama/bsp/TaskInProgress.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/TaskInProgress.java (revision 1182623) +++ core/src/main/java/org/apache/hama/bsp/TaskInProgress.java (working copy) @@ -87,7 +87,7 @@ this.jobFile = jobFile; this.partition = partition; - this.id = new TaskID(jobId, partition); + init(jobId); } public TaskInProgress(BSPJobID jobId, String jobFile, BSPMaster master, @@ -99,9 +99,14 @@ this.setConf(conf); this.partition = partition; + init(jobId); + } + + private void init(BSPJobID jobId2) { this.id = new TaskID(jobId, partition); + this.startTime = System.currentTimeMillis(); } - + /** * Return a Task that can be sent to a GroomServer for execution. */ Index: core/src/main/java/org/apache/hama/util/BSPServletUtil.java =================================================================== --- core/src/main/java/org/apache/hama/util/BSPServletUtil.java (revision 1182623) +++ core/src/main/java/org/apache/hama/util/BSPServletUtil.java (working copy) @@ -61,7 +61,8 @@ if (jobs.length > 0) { sb.append("\n"); sb.append("" + "" + "" - + "" + "" + "\n"); + + "" + "" + "" + + "\n"); for (JobStatus status : jobs) { sb.append("\n"); } @@ -89,13 +92,14 @@ StringBuilder sb = new StringBuilder(); sb.append("
\n"); sb.append("
JobidUserNameSuperStepStarttime
SuperStepsTasksStarttime
"); @@ -71,8 +72,10 @@ sb.append(""); sb.append(status.getName()); sb.append(""); - sb.append(status.progress()); + sb.append(status.getSuperstepCount()); sb.append(""); + sb.append(status.getNumOfTasks()); + sb.append(""); sb.append(new Date(status.getStartTime())); sb.append("
\n"); - sb.append("\n"); - sb.append("" - + "" - + "" + - "" + - "" + - "\n"); + sb + .append("\n"); + sb + .append("" + + "" + + "" + + "" + + "" + "\n"); for (Entry entry : status .getActiveGroomServerStatus().entrySet()) { sb.append("
Groom Servers
NameHost# maximum tasks# current running tasks# current failuresLast seen
Groom Servers
NameHost# maximum tasks# current running tasks# current failuresLast seen
"); Index: core/src/main/webapp/bspmaster/bspjob.jsp =================================================================== --- core/src/main/webapp/bspmaster/bspjob.jsp (revision 1182623) +++ core/src/main/webapp/bspmaster/bspjob.jsp (working copy) @@ -37,11 +37,12 @@ State: <%=state.toString() %>

- +
- + + @@ -50,6 +51,7 @@ +
Name UserSuperStepSuperStepsTasks StartTime FinishTime
<%=status.getName() %> <%=status.getUsername() %> <%=status.getSuperstepCount() %><%=status.getNumOfTasks() %> <%=new Date(status.getStartTime()).toString() %> <% if(status.getFinishTime() != 0L) {out.write(new Date(status.getFinishTime()).toString());} %>