Index: core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java (revision 1693001) +++ core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java (working copy) @@ -51,8 +51,12 @@ private static final Log LOG = LogFactory .getLog(HamaMessageManagerImpl.class); + private static final int MAX_RETRY = 5; + private Server server; + private static int retry = 0; + private LRUCache> peersLRUCache = null; @SuppressWarnings("serial") @@ -60,6 +64,7 @@ public final void init(TaskAttemptID attemptId, BSPPeer peer, HamaConfiguration conf, InetSocketAddress peerAddress) { super.init(attemptId, peer, conf, peerAddress); + retry = 0; startRPCServer(conf, peerAddress); peersLRUCache = new LRUCache>( maxCachedConnections) { @@ -87,7 +92,6 @@ } private void startServer(String hostName, int port) throws IOException { - int retry = 0; try { this.server = RPC.getServer(this, hostName, port, conf.getInt("hama.default.messenger.handler.threads.num", 5), false, @@ -97,13 +101,13 @@ LOG.info("BSPPeer address:" + server.getListenerAddress().getHostName() + " port:" + server.getListenerAddress().getPort()); } catch (BindException e) { - LOG.warn("Address already in use. Retrying " + hostName + ":" + port + 1); - startServer(hostName, port + 1); - retry++; - - if (retry > 5) { + final int nextPort = port + 1; + LOG.warn("Address already in use. Retrying " + hostName + ":" + + nextPort); + if (retry++ >= MAX_RETRY) { throw new RuntimeException("RPC Server could not be launched!"); } + startServer(hostName, nextPort); } }