Index: core/src/main/java/org/apache/hama/bsp/GroomServer.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/GroomServer.java (revision 1221622) +++ core/src/main/java/org/apache/hama/bsp/GroomServer.java (working copy) @@ -57,6 +57,7 @@ import org.apache.hama.ipc.BSPPeerProtocol; import org.apache.hama.ipc.GroomProtocol; import org.apache.hama.ipc.MasterProtocol; +import org.apache.hama.util.BSPNetUtils; import org.apache.hama.zookeeper.QuorumPeer; import org.apache.log4j.LogManager; import org.apache.zookeeper.WatchedEvent; @@ -146,19 +147,14 @@ LOG.info("Launch " + actions.length + " tasks."); assignedPeerNames = new HashMap(); - int i = 0; + int prevPort = Constants.DEFAULT_PEER_PORT; - // TODO find another way to manage all activate peers. for (GroomServerAction action : actions) { Task t = ((LaunchTaskAction) action).getTask(); - int peerPort = (Constants.DEFAULT_PEER_PORT + i); - assignedPeerNames.put(t.getTaskID(), peerPort); - - i++; - } - - for (GroomServerAction action : actions) { + prevPort = BSPNetUtils.getNextAvailable(prevPort); + assignedPeerNames.put(t.getTaskID(), prevPort); + if (action instanceof LaunchTaskAction) { startNewTask((LaunchTaskAction) action); } else { Index: core/src/main/java/org/apache/hama/util/BSPNetUtils.java =================================================================== --- core/src/main/java/org/apache/hama/util/BSPNetUtils.java (revision 1221622) +++ core/src/main/java/org/apache/hama/util/BSPNetUtils.java (working copy) @@ -17,16 +17,22 @@ */ package org.apache.hama.util; +import java.io.IOException; +import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.net.UnknownHostException; +import java.util.NoSuchElementException; +import org.apache.hama.Constants; import org.apache.mina.util.AvailablePortFinder; /** * NetUtils for our needs. */ public class BSPNetUtils { + public static final int MAX_PORT_NUMBER = 65535; /** * Gets the canonical hostname of this machine. @@ -78,4 +84,55 @@ Integer.valueOf(peerAddrParts[1])); } + /** + * Checks to see if a specific port is available. + * + * @param port the port to check for availability + */ + public static boolean available(int port) { + if (port < Constants.DEFAULT_PEER_PORT || port > MAX_PORT_NUMBER) { + throw new IllegalArgumentException("Invalid start port: " + port); + } + + ServerSocket ss = null; + DatagramSocket ds = null; + try { + ss = new ServerSocket(port); + ss.setReuseAddress(true); + ds = new DatagramSocket(port); + ds.setReuseAddress(true); + return true; + } catch (IOException e) { + } finally { + if (ds != null) { + ds.close(); + } + + if (ss != null) { + try { + ss.close(); + } catch (IOException e) { + /* should not be thrown */ + } + } + } + + return false; + } + + public static int getNextAvailable(int fromPort) { + if ((fromPort < Constants.DEFAULT_PEER_PORT) + || (fromPort > MAX_PORT_NUMBER)) { + throw new IllegalArgumentException("Invalid start port: " + fromPort); + } + + for (int i = fromPort + 1; i <= MAX_PORT_NUMBER; i++) { + if (available(i)) { + return i; + } + } + + throw new NoSuchElementException("Could not find an available port " + + "above " + fromPort); + } }