Index: src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java (revision 1083993) +++ src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java (working copy) @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; import org.junit.AfterClass; @@ -136,6 +137,28 @@ cluster.shutdown(); } } + + @Test public void testMiniZooKeeper() throws Exception { + MiniZooKeeperCluster cluster1 = this.hbt.startMiniZKCluster(); + try { + assertEquals(0, cluster1.getZooKeeperCandidateNum()); + assertTrue((cluster1.killCurrentZooKeeper() == -1)); + } finally { + cluster1.shutdown(); + } + + this.hbt.shutdownMiniZKCluster(); + + MiniZooKeeperCluster cluster2 = this.hbt.startMiniZKCluster(5); + try { + assertEquals(4, cluster2.getZooKeeperCandidateNum()); + assertTrue((cluster2.killCurrentZooKeeper() > 0)); + assertTrue((cluster2.killCurrentZooKeeper() > 0)); + assertEquals(2, cluster2.getZooKeeperCandidateNum()); + } finally { + cluster2.shutdown(); + } + } @Test public void testMiniDFSCluster() throws Exception { MiniDFSCluster cluster = this.hbt.startMiniDFSCluster(1); Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1083993) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -245,18 +245,38 @@ * @return zk cluster started. */ public MiniZooKeeperCluster startMiniZKCluster() throws Exception { - return startMiniZKCluster(setupClusterTestBuildDir()); + return startMiniZKCluster(setupClusterTestBuildDir(),1); } + + /** + * Call this if you only want a zk cluster. + * @param zooKeeperServerNum + * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster. + * @throws Exception + * @see #shutdownMiniZKCluster() + * @return zk cluster started. + */ + public MiniZooKeeperCluster startMiniZKCluster(int zooKeeperServerNum) + throws Exception { + return startMiniZKCluster(setupClusterTestBuildDir(), zooKeeperServerNum); + } + private MiniZooKeeperCluster startMiniZKCluster(final File dir) + throws Exception { + return startMiniZKCluster(dir,1); + } + + private MiniZooKeeperCluster startMiniZKCluster(final File dir, + int zooKeeperServerNum) throws Exception { this.passedZkCluster = false; if (this.zkCluster != null) { throw new IOException("Cluster already running at " + dir); } this.zkCluster = new MiniZooKeeperCluster(); - int clientPort = this.zkCluster.startup(dir); + int clientPort = this.zkCluster.startup(dir,zooKeeperServerNum); this.conf.set("hbase.zookeeper.property.clientPort", Integer.toString(clientPort)); return this.zkCluster; Index: src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java (revision 1083993) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java (working copy) @@ -28,6 +28,8 @@ import java.net.BindException; import java.net.InetSocketAddress; import java.net.Socket; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -48,14 +50,21 @@ private static final int CONNECTION_TIMEOUT = 30000; private boolean started; - private int clientPort = 21818; // use non-standard port + private int defaultClientPort = 21818; // use non-standard port + private int clientPort = defaultClientPort; + private int zooKeeperCandidateNum = 0; + private NIOServerCnxn.Factory standaloneServerFactory; + private int currentZooKeeper; + private List zooKeeperServers; private int tickTime = 0; /** Create mini ZooKeeper cluster. */ public MiniZooKeeperCluster() { this.started = false; + currentZooKeeper = -1; + zooKeeperServers = new ArrayList(); } public void setClientPort(int clientPort) { @@ -69,6 +78,10 @@ public void setTickTime(int tickTime) { this.tickTime = tickTime; } + + public int getZooKeeperCandidateNum() { + return zooKeeperCandidateNum; + } // / XXX: From o.a.zk.t.ClientBase private static void setupTestEnv() { @@ -79,30 +92,41 @@ System.setProperty("zookeeper.preAllocSize", "100"); FileTxnLog.setPreallocSize(100); } + + public int startup(File baseDir) throws IOException, + InterruptedException { + return startup(baseDir,1); + } /** * @param baseDir + * @param numZooKeeperServers * @return ClientPort server bound to. * @throws IOException * @throws InterruptedException */ - public int startup(File baseDir) throws IOException, + public int startup(File baseDir, int numZooKeeperServers) throws IOException, InterruptedException { + if (numZooKeeperServers <= 0) + return -1; setupTestEnv(); shutdown(); - - File dir = new File(baseDir, "zookeeper").getAbsoluteFile(); - recreateDir(dir); - - int tickTimeToUse; - if (this.tickTime > 0) { - tickTimeToUse = this.tickTime; - } else { - tickTimeToUse = TICK_TIME; + + for (int i = 0; i < numZooKeeperServers; i++) { + File dir = new File(baseDir, "zookeeper_"+i).getAbsoluteFile(); + recreateDir(dir); + + int tickTimeToUse; + if (this.tickTime > 0) { + tickTimeToUse = this.tickTime; + } else { + tickTimeToUse = TICK_TIME; + } + ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse); + zooKeeperServers.add(server); } - ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse); while (true) { try { standaloneServerFactory = @@ -115,13 +139,14 @@ } break; } - standaloneServerFactory.startup(server); - + currentZooKeeper = 0; + standaloneServerFactory.startup(zooKeeperServers.get(currentZooKeeper)); if (!waitForServerUp(clientPort, CONNECTION_TIMEOUT)) { throw new IOException("Waiting for startup of standalone server"); } started = true; + zooKeeperCandidateNum = numZooKeeperServers-1; LOG.info("Started MiniZK Server on client port: " + clientPort); return clientPort; } @@ -151,8 +176,58 @@ } started = false; + zooKeeperCandidateNum = 0; + LOG.info("Shutdown MiniZK Server on client port: " + clientPort); } + + /**@return clientPort return clientPort if there is another ZooKeeper Candidate can run; return + * -1, if there is no candidates. + * @throws IOException + * @throws InterruptedException + */ + public int killCurrentZooKeeper() throws IOException, + InterruptedException { + if (!started) { + return -1; + } + // Shutdown the current one + standaloneServerFactory.shutdown(); + if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) { + throw new IOException("Waiting for shutdown of standalone server"); + } + LOG.info("Kill the current MiniZK Server on client port: " + + clientPort); + + if (zooKeeperCandidateNum == 0) { + return -1; + } + // Start another ZooKeeper Server + clientPort = defaultClientPort; + while (true) { + try { + standaloneServerFactory = + new NIOServerCnxn.Factory(new InetSocketAddress(clientPort)); + } catch (BindException e) { + LOG.info("Failed binding ZK Server to client port: " + clientPort); + //this port is already in use. try to use another + clientPort++; + continue; + } + break; + } + currentZooKeeper = (++currentZooKeeper) % zooKeeperServers.size() ; + standaloneServerFactory.startup(zooKeeperServers.get(currentZooKeeper)); + if (!waitForServerUp(clientPort, CONNECTION_TIMEOUT)) { + throw new IOException("Waiting for startup of standalone server"); + } + + started = true; + zooKeeperCandidateNum--; + LOG.info("Started another candidate MiniZK Server on client port: " + clientPort); + return clientPort; + } + // XXX: From o.a.zk.t.ClientBase private static boolean waitForServerDown(int port, long timeout) { long start = System.currentTimeMillis();