Index: src/java/org/apache/hama/bsp/BSPPeer.java =================================================================== --- src/java/org/apache/hama/bsp/BSPPeer.java (리비전 1032436) +++ src/java/org/apache/hama/bsp/BSPPeer.java (작업 사본) @@ -61,6 +61,7 @@ protected final ConcurrentLinkedQueue localQueue = new ConcurrentLinkedQueue(); protected Set allPeerNames = new HashSet(); protected InetSocketAddress peerAddress; + protected TaskStatus currentTaskStatus; /** * @@ -68,8 +69,10 @@ public BSPPeer(Configuration conf) throws IOException { this.conf = conf; - String bindAddress = conf.get(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST); - int bindPort = conf.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT); + String bindAddress = conf.get(Constants.PEER_HOST, + Constants.DEFAULT_PEER_HOST); + int bindPort = conf + .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT); bspRoot = conf.get(Constants.ZOOKEEPER_ROOT, Constants.DEFAULT_ZOOKEEPER_ROOT); zookeeperAddr = conf.get(Constants.ZOOKEEPER_QUORUM) @@ -84,7 +87,8 @@ public void reinitialize() { try { LOG.debug("reinitialize(): " + getPeerName()); - server = RPC.getServer(this, peerAddress.getHostName(), peerAddress.getPort(), conf); + server = RPC.getServer(this, peerAddress.getHostName(), peerAddress + .getPort(), conf); server.start(); } catch (IOException e) { e.printStackTrace(); @@ -129,8 +133,7 @@ * org.apache.hadoop.io.Writable, org.apache.hadoop.io.Writable) */ @Override - public void send(String peerName, BSPMessage msg) - throws IOException { + public void send(String peerName, BSPMessage msg) throws IOException { LOG.debug("Send bytes (" + msg.getData().toString() + ") to " + peerName); ConcurrentLinkedQueue queue = outgoingQueues.get(peerName); if (queue == null) { @@ -170,7 +173,7 @@ } } - // Should we clearing outgoingQueues? + // Clear outgoing queues. this.outgoingQueues.clear(); enterBarrier(); @@ -180,13 +183,14 @@ // the number of peers, and the load of zookeeper. // It should fixed to some flawless way. leaveBarrier(); + currentTaskStatus.incrementSuperstepCount(); } protected boolean enterBarrier() throws KeeperException, InterruptedException { LOG.debug("[" + getPeerName() + "] enter the enterbarrier"); try { - zk.create(bspRoot + "/" + getPeerName(), new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.EPHEMERAL); + zk.create(bspRoot + "/" + getPeerName(), new byte[0], + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { @@ -265,7 +269,7 @@ } /** - * @return the string as host:port of this Peer + * @return the string as host:port of this Peer */ public String getPeerName() { return peerAddress.getHostName() + ":" + peerAddress.getPort(); @@ -292,9 +296,26 @@ this.allPeerNames = new HashSet(allPeerNames); } - @Override + /** + * @return the number of messages + */ public int getNumCurrentMessages() { return localQueue.size(); } + /** + * Sets the current status + * + * @param currentTaskStatus + */ + public void setCurrentTaskStatus(TaskStatus currentTaskStatus) { + this.currentTaskStatus = currentTaskStatus; + } + + /** + * @return the count of superstpes + */ + public long getSuperstepCount() { + return currentTaskStatus.getSuperstepCount(); + } } Index: src/java/org/apache/hama/bsp/GroomServer.java =================================================================== --- src/java/org/apache/hama/bsp/GroomServer.java (리비전 1032436) +++ src/java/org/apache/hama/bsp/GroomServer.java (작업 사본) @@ -534,6 +534,7 @@ public void launchTask() throws IOException { taskStatus.setRunState(TaskStatus.State.RUNNING); + bspPeer.setCurrentTaskStatus(taskStatus); this.runner = task.createRunner(bspPeer, this.jobConf); this.runner.start(); Index: src/java/org/apache/hama/bsp/JobInProgress.java =================================================================== --- src/java/org/apache/hama/bsp/JobInProgress.java (리비전 1032436) +++ src/java/org/apache/hama/bsp/JobInProgress.java (작업 사본) @@ -64,7 +64,8 @@ private BSPJobID jobId; final BSPMaster master; List tasks; - + private long superstepCounter; + public JobInProgress(BSPJobID jobId, BSPMaster master, Configuration conf) throws IOException { this.conf = conf; @@ -77,7 +78,8 @@ this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP); this.startTime = System.currentTimeMillis(); status.setStartTime(startTime); - + this.superstepCounter = 0; + this.localJobFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId + ".xml"); this.localJarFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId @@ -186,6 +188,8 @@ } public void updateTaskStatus(TaskInProgress tip, TaskStatus status) { + superstepCounter -= tip.getTaskStatus(status.getTaskId()).getSuperstepCount(); + superstepCounter += status.getSuperstepCount(); tip.updateStatus(status); // update tip } Index: src/java/org/apache/hama/bsp/TaskInProgress.java =================================================================== --- src/java/org/apache/hama/bsp/TaskInProgress.java (리비전 1032436) +++ src/java/org/apache/hama/bsp/TaskInProgress.java (작업 사본) @@ -194,4 +194,8 @@ public void updateStatus(TaskStatus status) { taskStatuses.put(status.getTaskId(), status); } + + public TaskStatus getTaskStatus(String taskId) { + return this.taskStatuses.get(taskId); + } } Index: src/java/org/apache/hama/bsp/TaskStatus.java =================================================================== --- src/java/org/apache/hama/bsp/TaskStatus.java (리비전 1032436) +++ src/java/org/apache/hama/bsp/TaskStatus.java (작업 사본) @@ -45,7 +45,8 @@ private volatile State runState; private String stateString; private String groomServer; - + private long superstepCount; + private long startTime; private long finishTime; @@ -56,6 +57,7 @@ */ public TaskStatus() { taskId = new String(); + this.superstepCount = 0; } public TaskStatus(String taskId, float progress, State runState, @@ -66,6 +68,7 @@ this.stateString = stateString; this.groomServer = groomServer; this.phase = phase; + this.superstepCount = 0; } // ////////////////////////////////////////////////// @@ -208,6 +211,20 @@ this.finishTime = finishTime; } } + + /** + * @return The number of BSP super steps executed by the task. + */ + public long getSuperstepCount() { + return superstepCount; + } + + /** + * Increments the number of BSP super steps executed by the task. + */ + public void incrementSuperstepCount() { + superstepCount += 1; + } @Override public Object clone() { @@ -232,6 +249,7 @@ this.phase = WritableUtils.readEnum(in, Phase.class); this.startTime = in.readLong(); this.finishTime = in.readLong(); + this.superstepCount = in.readLong(); } @Override @@ -243,5 +261,6 @@ WritableUtils.writeEnum(out, phase); out.writeLong(startTime); out.writeLong(finishTime); + out.writeLong(superstepCount); } } Index: src/test/org/apache/hama/bsp/TestBSPPeer.java =================================================================== --- src/test/org/apache/hama/bsp/TestBSPPeer.java (리비전 1032436) +++ src/test/org/apache/hama/bsp/TestBSPPeer.java (작업 사본) @@ -27,7 +27,6 @@ import java.util.Set; import junit.framework.AssertionFailedError; - import net.sourceforge.groboutils.junit.v1.MultiThreadedTestRunner; import net.sourceforge.groboutils.junit.v1.TestRunnable; @@ -97,6 +96,9 @@ peerNames.add("localhost:" + (30000 + i)); } peer.setAllPeerNames(peerNames); + TaskStatus currentTaskStatus = new TaskStatus("localhost:" + + lastTwoDigitsOfPort, 0, null, null, null, null); + peer.setCurrentTaskStatus(currentTaskStatus); } @Override @@ -135,14 +137,16 @@ e.printStackTrace(); } - verifyPayload(); + verifyPayload(i); } } - private void verifyPayload() { + private void verifyPayload(int round) { int numMessages = peer.getNumCurrentMessages(); + assertEquals(round, ((int) peer.getSuperstepCount() -1 )); + LOG.info("[" + peer.getPeerName() + "] verifying " + numMessages - + " messages"); + + " messages at " + round + " round"); if (lastTwoDigitsOfPort < 10) { assertEquals(20, numMessages); @@ -169,7 +173,7 @@ } } - public void testSync() throws Throwable { + public void testSync() throws Throwable { conf.setInt("bsp.peers.num", NUM_PEER); conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");