Index: src/java/org/apache/hama/bsp/BSPPeer.java =================================================================== --- src/java/org/apache/hama/bsp/BSPPeer.java (revision 1135388) +++ src/java/org/apache/hama/bsp/BSPPeer.java (working copy) @@ -35,6 +35,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; import org.apache.hama.Constants; +import org.apache.hama.zookeeper.QuorumPeer; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; @@ -81,10 +82,9 @@ .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT); bspRoot = conf.get(Constants.ZOOKEEPER_ROOT, Constants.DEFAULT_ZOOKEEPER_ROOT); - zookeeperAddr = conf.get(Constants.ZOOKEEPER_QUORUM) - + ":" - + conf.getInt(Constants.ZOOKEPER_CLIENT_PORT, - Constants.DEFAULT_ZOOKEPER_CLIENT_PORT); + + zookeeperAddr = QuorumPeer.getZKQuorumServersString(conf); + // TODO: may require to dynamic reflect the underlying // network e.g. ip address, port. peerAddress = new InetSocketAddress(bindAddress, bindPort); Index: src/java/org/apache/hama/zookeeper/QuorumPeer.java =================================================================== --- src/java/org/apache/hama/zookeeper/QuorumPeer.java (revision 1135388) +++ src/java/org/apache/hama/zookeeper/QuorumPeer.java (working copy) @@ -279,4 +279,75 @@ } return properties; } + + /** + * Return the ZK Quorum servers string given zk properties returned by + * makeZKProps + * @param properties + * @return Quorum servers String + */ + public static String getZKQuorumServersString(Properties properties) { + String clientPort = null; + List servers = new ArrayList(); + + // The clientPort option may come after the server.X hosts, so we need to + // grab everything and then create the final host:port comma separated list. + boolean anyValid = false; + for (Entry property : properties.entrySet()) { + String key = property.getKey().toString().trim(); + String value = property.getValue().toString().trim(); + if (key.equals("clientPort")) { + clientPort = value; + } + else if (key.startsWith("server.")) { + String host = value.substring(0, value.indexOf(':')); + servers.add(host); + try { + //noinspection ResultOfMethodCallIgnored + InetAddress.getByName(host); + anyValid = true; + } catch (UnknownHostException e) { + LOG.warn(StringUtils.stringifyException(e)); + } + } + } + + if (!anyValid) { + LOG.error("no valid quorum servers found in " + Constants.ZOOKEEPER_CONFIG_NAME); + return null; + } + + if (clientPort == null) { + LOG.error("no clientPort found in " + Constants.ZOOKEEPER_CONFIG_NAME); + return null; + } + + if (servers.isEmpty()) { + LOG.fatal("No server.X lines found in conf/zoo.cfg. Hama must have a " + + "ZooKeeper cluster configured for its operation."); + return null; + } + + StringBuilder hostPortBuilder = new StringBuilder(); + for (int i = 0; i < servers.size(); ++i) { + String host = servers.get(i); + if (i > 0) { + hostPortBuilder.append(','); + } + hostPortBuilder.append(host); + hostPortBuilder.append(':'); + hostPortBuilder.append(clientPort); + } + + return hostPortBuilder.toString(); + } + + /** + * Return the ZK Quorum servers string given the specified configuration. + * @param conf + * @return Quorum servers + */ + public static String getZKQuorumServersString(Configuration conf) { + return getZKQuorumServersString(makeZKProps(conf)); + } }