Index: core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (revision 1508502) +++ core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (working copy) @@ -59,10 +59,7 @@ private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class); public static enum PeerCounter { - COMPRESSED_MESSAGES, SUPERSTEP_SUM, TASK_INPUT_RECORDS, - TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, - MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, - COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS + COMPRESSED_MESSAGES, SUPERSTEP_SUM, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS } private final Configuration conf; @@ -169,6 +166,15 @@ .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT); peerAddress = new InetSocketAddress(bindAddress, bindPort); + // This function call may change the current peer address + initializeMessaging(); + + conf.set(Constants.PEER_HOST, peerAddress.getHostName()); + conf.setInt(Constants.PEER_PORT, peerAddress.getPort()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Initialized Messaging service."); + } initializeIO(); initializeSyncService(superstep, state); @@ -182,11 +188,6 @@ setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 1.0f, state, stateString, peerAddress.getHostName(), phase, counters)); - initilizeMessaging(); - if (LOG.isDebugEnabled()) { - LOG.debug("Initialized Messaging service."); - } - final String combinerName = conf.get(Constants.COMBINER_CLASS); if (combinerName != null) { combiner = (Combiner) ReflectionUtils.newInstance( @@ -290,9 +291,10 @@ return in.getPos(); } - public final void initilizeMessaging() throws ClassNotFoundException { + public final void initializeMessaging() throws ClassNotFoundException { messenger = MessageManagerFactory.getMessageManager(conf); messenger.init(taskId, this, conf, peerAddress); + peerAddress = messenger.getListenerAddress(); } public final void initializeSyncService(long superstep, TaskStatus.State state) Index: core/src/main/java/org/apache/hama/bsp/GroomServer.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/GroomServer.java (revision 1508502) +++ core/src/main/java/org/apache/hama/bsp/GroomServer.java (working copy) @@ -127,7 +127,6 @@ /** Map from taskId -> TaskInProgress. */ Map runningTasks = null; Map finishedTasks = null; - Map assignedPeerNames = null; Map runningJobs = null; // new nexus between GroomServer and BSPMaster @@ -163,17 +162,9 @@ } if (actions != null) { - // assignedPeerNames = new HashMap(); - int prevPort = Constants.DEFAULT_PEER_PORT; for (GroomServerAction action : actions) { if (action instanceof LaunchTaskAction) { - Task t = ((LaunchTaskAction) action).getTask(); - - synchronized (assignedPeerNames) { - prevPort = BSPNetUtils.getNextAvailable(prevPort); - assignedPeerNames.put(t.getTaskID(), prevPort); - } LOG.info("Launch " + actions.length + " tasks."); startNewTask((LaunchTaskAction) action); } else if (action instanceof KillTaskAction) { @@ -200,10 +191,6 @@ RecoverTaskAction recoverAction = (RecoverTaskAction) action; Task t = recoverAction.getTask(); LOG.info("Recovery action task." + t.getTaskID()); - synchronized (assignedPeerNames) { - prevPort = BSPNetUtils.getNextAvailable(prevPort); - assignedPeerNames.put(t.getTaskID(), prevPort); - } try { startRecoveryTask(recoverAction); } catch (IOException e) { @@ -337,8 +324,6 @@ this.conf.set(Constants.PEER_HOST, localHostname); this.conf.set(Constants.GROOM_RPC_HOST, localHostname); this.maxCurrentTasks = conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3); - this.assignedPeerNames = new HashMap( - 2 * this.maxCurrentTasks); int rpcPort = -1; String rpcAddr = null; @@ -1233,7 +1218,8 @@ defaultConf); final BSPTask task = (BSPTask) umbilical.getTask(taskid); - int peerPort = umbilical.getAssignedPortNum(taskid); + int peerPort = Constants.DEFAULT_PEER_PORT; + peerPort = BSPNetUtils.getNextAvailable(peerPort); defaultConf.addResource(new Path(task.getJobFile())); BSPJob job = new BSPJob(task.getJobID(), task.getJobFile()); @@ -1368,11 +1354,6 @@ } @Override - public int getAssignedPortNum(TaskAttemptID taskid) { - return assignedPeerNames.get(taskid); - } - - @Override public void process(WatchedEvent event) { } Index: core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (revision 1508502) +++ core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (working copy) @@ -335,12 +335,14 @@ @SuppressWarnings("rawtypes") private static final ConcurrentHashMap MANAGER_MAP = new ConcurrentHashMap(); + private InetSocketAddress selfAddress; @Override public void init(TaskAttemptID attemptId, BSPPeer peer, Configuration conf, InetSocketAddress peerAddress) { super.init(attemptId, peer, conf, peerAddress); MANAGER_MAP.put(peerAddress, this); + selfAddress = peerAddress; } @SuppressWarnings("unchecked") @@ -353,6 +355,11 @@ 1L); } } + + @Override + public InetSocketAddress getListenerAddress() { + return selfAddress; + } } public static class LocalUmbilical implements BSPPeerProtocol { @@ -400,12 +407,6 @@ throws IOException, InterruptedException { return true; } - - @Override - public int getAssignedPortNum(TaskAttemptID taskid) { - return 0; - } - } public static class LocalSyncClient extends BSPPeerSyncClient { Index: core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (revision 1508502) +++ core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (working copy) @@ -65,8 +65,7 @@ protected SynchronizedQueue localQueueForNextIteration; // this peer object is just used for counter incrementation protected BSPPeer peer; - // the peer address of this peer - protected InetSocketAddress peerAddress; + // the task attempt id protected TaskAttemptID attemptId; @@ -89,7 +88,6 @@ this.attemptId = attemptId; this.peer = peer; this.conf = conf; - this.peerAddress = peerAddress; this.localQueue = getReceiverQueue(); this.localQueueForNextIteration = getSynchronizedReceiverQueue(); this.maxCachedConnections = conf.getInt(MAX_CACHED_CONNECTIONS_KEY, 100); Index: core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (revision 1508502) +++ core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (working copy) @@ -27,12 +27,14 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hama.Constants; import org.apache.hama.bsp.BSPMessageBundle; import org.apache.hama.bsp.BSPPeer; import org.apache.hama.bsp.BSPPeerImpl; import org.apache.hama.bsp.TaskAttemptID; import org.apache.hama.bsp.message.compress.BSPCompressedBundle; import org.apache.hama.ipc.HamaRPCProtocolVersion; +import org.apache.hama.util.BSPNetUtils; import org.apache.hama.util.LRUCache; /** @@ -45,7 +47,7 @@ private static final Log LOG = LogFactory .getLog(HadoopMessageManagerImpl.class); - private Server server = null; + private Server server; private LRUCache> peersLRUCache = null; @@ -74,11 +76,16 @@ private final void startRPCServer(Configuration conf, InetSocketAddress peerAddress) { try { - this.server = RPC.getServer(this, peerAddress.getHostName(), - peerAddress.getPort(), conf); + String bindAddress = conf.get(Constants.PEER_HOST, + Constants.DEFAULT_PEER_HOST); + InetSocketAddress selfAddress = new InetSocketAddress(bindAddress, 0); + + this.server = RPC.getServer(this, selfAddress.getHostName(), + selfAddress.getPort(), conf); server.start(); - LOG.info(" BSPPeer address:" + peerAddress.getHostName() + " port:" - + peerAddress.getPort()); + + LOG.info(" BSPPeer address:" + server.getListenerAddress().getHostName() + + " port:" + server.getListenerAddress().getPort()); } catch (IOException e) { LOG.error("Fail to start RPC server!", e); throw new RuntimeException("RPC Server could not be launched!"); @@ -104,7 +111,7 @@ if (compressor != null && (bundle.getApproximateSize() > conf.getLong( "hama.messenger.compression.threshold", 1048576))) { - + BSPCompressedBundle compMsgBundle = compressor.compressBundle(bundle); bspPeerConnection.put(compMsgBundle); peer.incrementCounter(BSPPeerImpl.PeerCounter.COMPRESSED_MESSAGES, 1L); @@ -115,7 +122,7 @@ } /** - * @param addr, socket address to which BSP Peer Connection will be + * @param addr , socket address to which BSP Peer Connection will be * established * @return BSP Peer Connection, tried to return cached connection, else * returns a new connection and caches it @@ -158,4 +165,12 @@ return versionID; } + @Override + public InetSocketAddress getListenerAddress() { + if (this.server != null) { + return this.server.getListenerAddress(); + } + return null; + } + } Index: core/src/main/java/org/apache/hama/bsp/message/MessageManager.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (revision 1508502) +++ core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (working copy) @@ -45,7 +45,12 @@ /** * Init can be used to start servers and initialize internal state. If you are - * implementing a subclass, please call the super version of this method. + * implementing a subclass, please call the super version of this method. The + * socket address provided may be used for initializing the server connection. + * If it is used or not used, the message manager should provide a unique + * InetSocketAddress that identifies the server for the peer listening on the + * socket. This socket address should be returned in + * {@link MessageManager#getListenerAddress()} * */ public void init(TaskAttemptID attemptId, BSPPeer peer, @@ -123,4 +128,9 @@ public void registerListener(MessageEventListener listener) throws IOException; + /** + * Returns the server address on which the incoming connections are listening + * on. + */ + public InetSocketAddress getListenerAddress(); } Index: core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java =================================================================== --- core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java (revision 1508502) +++ core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java (working copy) @@ -67,10 +67,4 @@ boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) throws IOException, InterruptedException; - /** - * @param taskid - * @return assigned port number - */ - int getAssignedPortNum(TaskAttemptID taskid); - } Index: core/src/main/java/org/apache/hama/util/BSPNetUtils.java =================================================================== --- core/src/main/java/org/apache/hama/util/BSPNetUtils.java (revision 1508502) +++ core/src/main/java/org/apache/hama/util/BSPNetUtils.java (working copy) @@ -75,14 +75,15 @@ * @return the InetSocketAddress of the given BSP peer */ public static InetSocketAddress getAddress(String peerName) { - String[] peerAddrParts = peerName.split(":"); - if (peerAddrParts.length != 2) { + int index = peerName.lastIndexOf(':'); + if (index <= 0 || index == peerName.length() - 1) { throw new ArrayIndexOutOfBoundsException( - "Peername must consist of exactly ONE \":\"! Given peername was: " - + peerName); + "Invalid host and port information. " + + "Peername must consist of atleast ONE \":\"! " + + "Given peername was: " + peerName); } - return new InetSocketAddress(peerAddrParts[0], - Integer.valueOf(peerAddrParts[1])); + return new InetSocketAddress(peerName.substring(0, index), + Integer.valueOf(peerName.substring(index + 1))); } /** Index: core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java =================================================================== --- core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (revision 1508502) +++ core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (working copy) @@ -148,11 +148,6 @@ return true; } - @Override - public int getAssignedPortNum(TaskAttemptID taskid) { - return 0; - } - public synchronized int getPingCount() { return pingCount; } Index: core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java =================================================================== --- core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (revision 1508502) +++ core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (working copy) @@ -150,6 +150,12 @@ this.listener = listener; } + @Override + public InetSocketAddress getListenerAddress() { + // TODO Auto-generated method stub + return null; + } + } public static class TestBSPPeer implements Index: core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java =================================================================== --- core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java (revision 1508502) +++ core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java (working copy) @@ -41,13 +41,13 @@ bsp.setOutputPath(TestBSPMasterGroomServer.OUTPUT_PATH); conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600); - bsp.setNumBspTask(2); + bsp.setNumBspTask(3); bsp.setInputFormat(NullInputFormat.class); FileSystem fileSys = FileSystem.get(conf); if (bsp.waitForCompletion(true)) { - TestBSPMasterGroomServer.checkOutput(fileSys, conf, 2); + TestBSPMasterGroomServer.checkOutput(fileSys, conf, 3); } } Index: core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java =================================================================== --- core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java (revision 1508502) +++ core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; +import org.apache.hama.Constants; import org.apache.hama.bsp.BSPMessageBundle; import org.apache.hama.bsp.BSPPeer; import org.apache.hama.bsp.BSPPeerImpl; @@ -73,12 +74,16 @@ InetSocketAddress peer = new InetSocketAddress( BSPNetUtils.getCanonicalHostname(), BSPNetUtils.getFreePort() + (increment++)); + conf.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST); + conf.setInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT); + BSPPeer dummyPeer = new BSPPeerImpl( conf, FileSystem.get(conf), new Counters()); TaskAttemptID id = new TaskAttemptID("1", 1, 1, 1); messageManager.init(id, dummyPeer, conf, peer); + peer = messageManager.getListenerAddress(); String peerName = peer.getHostName() + ":" + peer.getPort(); - + System.out.println("Peer is " + peerName); messageManager.send(peerName, new IntWritable(1337)); Iterator>> messageIterator = messageManager Index: yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (revision 1508502) +++ yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (working copy) @@ -405,9 +405,4 @@ } - @Override - public int getAssignedPortNum(TaskAttemptID taskid) { - return 0; - } - }