Index: core/src/main/java/org/apache/hama/bsp/BSPMaster.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPMaster.java (revision 1185529) +++ core/src/main/java/org/apache/hama/bsp/BSPMaster.java (working copy) @@ -495,9 +495,7 @@ zk.delete(bspRoot + "/" + node, 0); } } catch (KeeperException e) { - e.printStackTrace(); } catch (InterruptedException e) { - e.printStackTrace(); } } Index: core/src/main/java/org/apache/hama/bsp/BSPPeer.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPPeer.java (revision 1185529) +++ core/src/main/java/org/apache/hama/bsp/BSPPeer.java (working copy) @@ -90,6 +90,12 @@ public String getPeerName(); /** + * @param index + * @return The name of n-th peer from sorted array by name. + */ + public String getPeerName(int index); + + /** * @return The names of all the peers executing tasks from the same job * (including this peer). */ Index: core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (revision 1185529) +++ core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (working copy) @@ -26,6 +26,7 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.net.UnknownHostException; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -82,11 +83,13 @@ private InetSocketAddress peerAddress; private TaskStatus currentTaskStatus; - private TaskAttemptID taskid; + private TaskAttemptID taskId; private BSPPeerProtocol umbilical; private final BSPMessageSerializer messageSerializer; + private String[] allPeers; + public static final class BSPSerializableMessage implements Writable { final AtomicReference path = new AtomicReference(); final AtomicReference bundle = new AtomicReference(); @@ -206,12 +209,12 @@ * * @param conf is the configuration file containing bsp peer host, port, etc. * @param umbilical is the bsp protocol used to contact its parent process. - * @param taskid is the id that current process holds. + * @param taskId is the id that current process holds. */ - public BSPPeerImpl(Configuration conf, TaskAttemptID taskid, + public BSPPeerImpl(Configuration conf, TaskAttemptID taskId, BSPPeerProtocol umbilical) throws IOException { this.conf = conf; - this.taskid = taskid; + this.taskId = taskId; this.umbilical = umbilical; String bindAddress = conf.get(Constants.PEER_HOST, @@ -221,8 +224,7 @@ bspRoot = conf.get(Constants.ZOOKEEPER_ROOT, Constants.DEFAULT_ZOOKEEPER_ROOT); quorumServers = QuorumPeer.getZKQuorumServersString(conf); - if (LOG.isDebugEnabled()) - LOG.debug("Quorum " + quorumServers); + LOG.debug("Quorum " + quorumServers); peerAddress = new InetSocketAddress(bindAddress, bindPort); BSPMessageSerializer msgSerializer = null; if (this.conf.getBoolean("bsp.checkpoint.enabled", false)) { @@ -231,6 +233,8 @@ .valueOf(CheckpointRunner.DEFAULT_PORT))); } this.messageSerializer = msgSerializer; + + reinitialize(); } public void reinitialize() { @@ -252,6 +256,17 @@ } catch (IOException e) { LOG.error("Fail while reinitializing zookeeeper!", e); } + + try { + allPeers = zk.getChildren("/" + taskId.getJobID().toString(), this) + .toArray(new String[0]); + } catch (KeeperException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + Arrays.sort(allPeers); } @Override @@ -292,7 +307,7 @@ private String checkpointedPath() { String backup = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/"); String ckptPath = backup + jobConf.getJobID().toString() + "/" - + getSuperstepCount() + "/" + this.taskid.toString(); + + getSuperstepCount() + "/" + this.taskId.toString(); if (LOG.isDebugEnabled()) LOG.debug("Messages are to be saved to " + ckptPath); return ckptPath; @@ -317,7 +332,7 @@ try { peer = getBSPPeerConnection(entry.getKey()); } catch (NullPointerException ne) { - umbilical.fatalError(taskid, entry.getKey().getHostName() + umbilical.fatalError(taskId, entry.getKey().getHostName() + " doesn't exists."); } } @@ -338,7 +353,7 @@ leaveBarrier(); currentTaskStatus.incrementSuperstepCount(); - umbilical.statusUpdate(taskid, currentTaskStatus); + umbilical.statusUpdate(taskId, currentTaskStatus); // Clear outgoing queues. clearOutgoingQueues(); @@ -389,8 +404,6 @@ public void process(WatchedEvent event) { this.complete = true; synchronized (mutex) { - LOG.debug(">>>>>>>>>>>>>>> at superstep " + getSuperstepCount() - + " taskid:" + taskid.toString() + " is notified."); mutex.notifyAll(); } } @@ -405,7 +418,7 @@ synchronized (zk) { createZnode(bspRoot); final String pathToJobIdZnode = bspRoot + "/" - + taskid.getJobID().toString(); + + taskId.getJobID().toString(); createZnode(pathToJobIdZnode); final String pathToSuperstepZnode = pathToJobIdZnode + "/" + getSuperstepCount(); @@ -431,8 +444,8 @@ + " is " + znodes.size() + ". Znodes include " + znodes); if (size < jobConf.getNumBspTask()) { - LOG.info("xxxx 1. At superstep: " + getSuperstepCount() - + " which task is waiting? " + taskid.toString() + LOG.info("1. At superstep: " + getSuperstepCount() + + " which task is waiting? " + taskId.toString() + " stat is null? " + readyStat); while (!barrierWatcher.isComplete()) { if (!hasReady) { @@ -441,11 +454,11 @@ } } } - LOG.debug("xxxx 2. at superstep: " + getSuperstepCount() - + " after waiting ..." + taskid.toString()); + LOG.debug("2. at superstep: " + getSuperstepCount() + + " after waiting ..." + taskId.toString()); } else { LOG.debug("---> at superstep: " + getSuperstepCount() - + " task that is creating /ready znode:" + taskid.toString()); + + " task that is creating /ready znode:" + taskId.toString()); createEphemeralZnode(pathToSuperstepZnode + "/ready"); } } @@ -454,7 +467,7 @@ protected boolean leaveBarrier() throws KeeperException, InterruptedException { final String pathToSuperstepZnode = bspRoot + "/" - + taskid.getJobID().toString() + "/" + getSuperstepCount(); + + taskId.getJobID().toString() + "/" + getSuperstepCount(); while (true) { List znodes = zk.getChildren(pathToSuperstepZnode, false); LOG @@ -464,8 +477,10 @@ znodes.remove("ready"); } final int size = znodes.size(); - LOG.info("leaveBarrier() at superstep:" + getSuperstepCount() + + LOG.debug("leaveBarrier() at superstep:" + getSuperstepCount() + " znode size: (" + size + ") znodes:" + znodes); + if (null == znodes || znodes.isEmpty()) return true; if (1 == size) { @@ -484,7 +499,7 @@ final String highest = znodes.get(size - 1); LOG.info("leaveBarrier() at superstep: " + getSuperstepCount() - + " taskid:" + taskid.toString() + " lowest: " + lowest + " highest:" + + " taskid:" + taskId.toString() + " lowest: " + lowest + " highest:" + highest); synchronized (mutex) { @@ -495,7 +510,7 @@ public void process(WatchedEvent event) { synchronized (mutex) { LOG.debug("leaveBarrier() at superstep: " - + getSuperstepCount() + " taskid:" + taskid.toString() + + getSuperstepCount() + " taskid:" + taskId.toString() + " highest notify lowest."); mutex.notifyAll(); } @@ -504,7 +519,7 @@ if (null != s) { LOG.debug("leaveBarrier(): superstep:" + getSuperstepCount() - + " taskid:" + taskid.toString() + " wait for higest notify."); + + " taskid:" + taskId.toString() + " wait for higest notify."); mutex.wait(); } } else { @@ -512,7 +527,7 @@ if (null != s1) { LOG.info("leaveBarrier() znode at superstep:" + getSuperstepCount() - + " taskid:" + taskid.toString() + " exists, so delete it."); + + " taskid:" + taskId.toString() + " exists, so delete it."); try { zk.delete(getNodeName(), 0); } catch (KeeperException.NoNodeException nne) { @@ -526,7 +541,7 @@ public void process(WatchedEvent event) { synchronized (mutex) { LOG.debug("leaveBarrier() at superstep: " - + getSuperstepCount() + " taskid:" + taskid.toString() + + getSuperstepCount() + " taskid:" + taskId.toString() + " lowest notify other nodes."); mutex.notifyAll(); } @@ -534,7 +549,7 @@ }); if (null != s2) { LOG.debug("leaveBarrier(): superstep:" + getSuperstepCount() - + " taskid:" + taskid.toString() + " wait for lowest notify."); + + " taskid:" + taskId.toString() + " wait for lowest notify."); mutex.wait(); } } @@ -543,8 +558,8 @@ } private String getNodeName() { - return bspRoot + "/" + taskid.getJobID().toString() + "/" - + getSuperstepCount() + "/" + taskid.toString(); + return bspRoot + "/" + taskId.getJobID().toString() + "/" + + getSuperstepCount() + "/" + taskId.toString(); } @Override @@ -627,19 +642,14 @@ @Override public String[] getAllPeerNames() { - String[] result = null; - try { - result = zk.getChildren("/" + jobConf.getJobID().toString(), this) - .toArray(new String[0]); - } catch (KeeperException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - return result; + return allPeers; } + @Override + public String getPeerName(int index) { + return allPeers[index]; + } + /** * @return the number of messages */ Index: core/src/main/java/org/apache/hama/bsp/GroomServer.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/GroomServer.java (revision 1185529) +++ core/src/main/java/org/apache/hama/bsp/GroomServer.java (working copy) @@ -153,23 +153,13 @@ assignedPeerNames = new HashMap(); int i = 0; - // add peers to BSPMaster. + // add peers to Zookeeper. // TODO find another way to manage all activate peers. for (GroomServerAction action : actions) { Task t = ((LaunchTaskAction) action).getTask(); int peerPort = (Constants.DEFAULT_PEER_PORT + i); - - try { - zk.create("/" + t.getJobID().toString() + "/" + groomHostName + ":" - + peerPort, new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.EPHEMERAL); - } catch (KeeperException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - + registerPeerAddress(t.getJobID(), peerPort); assignedPeerNames.put(t.getTaskID(), peerPort); i++; @@ -198,6 +188,24 @@ } } } + + /** + * Register peer address to share all addresses among tasks. + * + * @param jobID + * @param peerPort + */ + private void registerPeerAddress(BSPJobID jobID, int peerPort) { + try { + zk.create( + "/" + jobID.toString() + "/" + groomHostName + ":" + peerPort, + new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + } catch (KeeperException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } } private class Instructor extends Thread { @@ -841,8 +849,8 @@ } public void reportProgress(TaskStatus taskStatus) { -// LOG.info(task.getTaskID() + " " + taskStatus.getProgress() + "% " -// + taskStatus.getStateString()); + // LOG.info(task.getTaskID() + " " + taskStatus.getProgress() + "% " + // + taskStatus.getStateString()); if (this.done) { LOG.info(task.getTaskID() @@ -945,14 +953,11 @@ defaultConf.setInt(Constants.PEER_PORT, peerPort); BSPPeerImpl bspPeer = new BSPPeerImpl(defaultConf, taskid, umbilical); - bspPeer.reinitialize(); bspPeer.setJobConf(job); - TaskStatus taskStatus = new TaskStatus(task.getJobID(), task.getTaskID(), - 0, TaskStatus.State.RUNNING, "running", host, - TaskStatus.Phase.STARTING); - - bspPeer.setCurrentTaskStatus(taskStatus); + bspPeer.setCurrentTaskStatus(new TaskStatus(task.getJobID(), task + .getTaskID(), 0, TaskStatus.State.RUNNING, "running", host, + TaskStatus.Phase.STARTING)); try { // use job-specified working directory Index: examples/src/main/java/org/apache/hama/examples/PiEstimator.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/PiEstimator.java (revision 1185527) +++ examples/src/main/java/org/apache/hama/examples/PiEstimator.java (working copy) @@ -36,7 +36,6 @@ import org.apache.zookeeper.KeeperException; public class PiEstimator { - private static String MASTER_TASK = "master.task."; private static Path TMP_OUTPUT = new Path("/tmp/pi-example/output"); public static class MyEstimator extends BSP { @@ -46,7 +45,8 @@ @Override public void setup(BSPPeer peer) { - this.masterTask = conf.get(MASTER_TASK); + // Choose one as a master + this.masterTask = peer.getPeerName(0); } @Override @@ -131,12 +131,6 @@ bsp.setNumBspTask(cluster.getMaxTasks()); } - // Choose one as a master - for (String hostName : cluster.getActiveGroomNames().keySet()) { - conf.set(MASTER_TASK, hostName); - break; - } - FileSystem fileSys = FileSystem.get(conf); initTempDir(fileSys);