Index: core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java (revision 1692340) +++ core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java (working copy) @@ -51,15 +51,20 @@ private static final Log LOG = LogFactory .getLog(HamaAsyncMessageManagerImpl.class); + private static final int MAX_RETRY = 5; + private AsyncServer server; private LRUCache> peersLRUCache = null; + private static int retry = 0; + @SuppressWarnings("serial") @Override public final void init(TaskAttemptID attemptId, BSPPeer peer, HamaConfiguration conf, InetSocketAddress peerAddress) { super.init(attemptId, peer, conf, peerAddress); + retry = 0; startRPCServer(conf, peerAddress); peersLRUCache = new LRUCache>( maxCachedConnections) { @@ -87,7 +92,6 @@ } private void startServer(String hostName, int port) throws IOException { - int retry = 0; try { this.server = AsyncRPC.getServer(this, hostName, port, conf.getInt("hama.default.messenger.handler.threads.num", 5), false, @@ -98,12 +102,10 @@ + " port:" + server.getAddress().getPort()); } catch (BindException e) { LOG.warn("Address already in use. Retrying " + hostName + ":" + port + 1); - startServer(hostName, port + 1); - retry++; - - if (retry > 5) { + if (retry++ >= MAX_RETRY) { throw new RuntimeException("RPC Server could not be launched!"); } + startServer(hostName, port + 1); } }