Index: src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java (revision 1084033) +++ src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java (working copy) @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; import org.junit.AfterClass; @@ -136,6 +137,47 @@ cluster.shutdown(); } } + + @Test public void testMiniZooKeeper() throws Exception { + MiniZooKeeperCluster cluster1 = this.hbt.startMiniZKCluster(); + try { + assertEquals(0, cluster1.getBackupZooKeeperServerNum()); + assertTrue((cluster1.killCurrentActiveZooKeeperServer() == -1)); + } finally { + cluster1.shutdown(); + } + + this.hbt.shutdownMiniZKCluster(); + + // set up zookeeper cluster with 5 zk servers + MiniZooKeeperCluster cluster2 = this.hbt.startMiniZKCluster(5); + int defaultClientPort = 21818; + cluster2.setDefaultClientPort(defaultClientPort); + try { + assertEquals(4, cluster2.getBackupZooKeeperServerNum()); + + // killing the current active zk server + assertTrue((cluster2.killCurrentActiveZooKeeperServer() >= defaultClientPort)); + assertTrue((cluster2.killCurrentActiveZooKeeperServer() >= defaultClientPort)); + assertEquals(2, cluster2.getBackupZooKeeperServerNum()); + assertEquals(3, cluster2.getZooKeeperServerNum()); + + // killing the backup zk servers + cluster2.killOneBackupZooKeeperServer(); + cluster2.killOneBackupZooKeeperServer(); + assertEquals(0, cluster2.getBackupZooKeeperServerNum()); + assertEquals(1, cluster2.getZooKeeperServerNum()); + + // killing the last zk server + assertTrue((cluster2.killCurrentActiveZooKeeperServer() == -1)); + // this should do nothing. + cluster2.killOneBackupZooKeeperServer(); + assertEquals(-1, cluster2.getBackupZooKeeperServerNum()); + assertEquals(0, cluster2.getZooKeeperServerNum()); + } finally { + cluster2.shutdown(); + } + } @Test public void testMiniDFSCluster() throws Exception { MiniDFSCluster cluster = this.hbt.startMiniDFSCluster(1); Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1084033) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -245,18 +245,38 @@ * @return zk cluster started. */ public MiniZooKeeperCluster startMiniZKCluster() throws Exception { - return startMiniZKCluster(setupClusterTestBuildDir()); + return startMiniZKCluster(setupClusterTestBuildDir(),1); } + + /** + * Call this if you only want a zk cluster. + * @param zooKeeperServerNum + * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster. + * @throws Exception + * @see #shutdownMiniZKCluster() + * @return zk cluster started. + */ + public MiniZooKeeperCluster startMiniZKCluster(int zooKeeperServerNum) + throws Exception { + return startMiniZKCluster(setupClusterTestBuildDir(), zooKeeperServerNum); + } + private MiniZooKeeperCluster startMiniZKCluster(final File dir) + throws Exception { + return startMiniZKCluster(dir,1); + } + + private MiniZooKeeperCluster startMiniZKCluster(final File dir, + int zooKeeperServerNum) throws Exception { this.passedZkCluster = false; if (this.zkCluster != null) { throw new IOException("Cluster already running at " + dir); } this.zkCluster = new MiniZooKeeperCluster(); - int clientPort = this.zkCluster.startup(dir); + int clientPort = this.zkCluster.startup(dir,zooKeeperServerNum); this.conf.set("hbase.zookeeper.property.clientPort", Integer.toString(clientPort)); return this.zkCluster; Index: src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java (revision 1084033) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java (working copy) @@ -28,6 +28,8 @@ import java.net.BindException; import java.net.InetSocketAddress; import java.net.Socket; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -48,27 +50,45 @@ private static final int CONNECTION_TIMEOUT = 30000; private boolean started; - private int clientPort = 21818; // use non-standard port - private NIOServerCnxn.Factory standaloneServerFactory; + private int defaultClientPort = 21818; // use non-standard port + private int clientPort = defaultClientPort; + + private List standaloneServerFactoryList; + private List zooKeeperServers; + private List clientPortList; + + private int activeZKServerIndex; private int tickTime = 0; /** Create mini ZooKeeper cluster. */ public MiniZooKeeperCluster() { this.started = false; + activeZKServerIndex = -1; + zooKeeperServers = new ArrayList(); + clientPortList = new ArrayList(); + standaloneServerFactoryList = new ArrayList(); } - public void setClientPort(int clientPort) { - this.clientPort = clientPort; + public void setDefaultClientPort(int clientPort) { + this.defaultClientPort = clientPort; } - public int getClientPort() { - return clientPort; + public int getDefaultClientPort() { + return defaultClientPort; } public void setTickTime(int tickTime) { this.tickTime = tickTime; } + + public int getBackupZooKeeperServerNum() { + return zooKeeperServers.size()-1; + } + + public int getZooKeeperServerNum() { + return zooKeeperServers.size(); + } // / XXX: From o.a.zk.t.ClientBase private static void setupTestEnv() { @@ -79,50 +99,70 @@ System.setProperty("zookeeper.preAllocSize", "100"); FileTxnLog.setPreallocSize(100); } + + public int startup(File baseDir) throws IOException, + InterruptedException { + return startup(baseDir,1); + } /** * @param baseDir + * @param numZooKeeperServers * @return ClientPort server bound to. * @throws IOException * @throws InterruptedException */ - public int startup(File baseDir) throws IOException, + public int startup(File baseDir, int numZooKeeperServers) throws IOException, InterruptedException { + if (numZooKeeperServers <= 0) + return -1; setupTestEnv(); - shutdown(); - - File dir = new File(baseDir, "zookeeper").getAbsoluteFile(); - recreateDir(dir); - - int tickTimeToUse; - if (this.tickTime > 0) { - tickTimeToUse = this.tickTime; - } else { - tickTimeToUse = TICK_TIME; - } - ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse); - while (true) { - try { - standaloneServerFactory = - new NIOServerCnxn.Factory(new InetSocketAddress(clientPort)); - } catch (BindException e) { - LOG.info("Failed binding ZK Server to client port: " + clientPort); - //this port is already in use. try to use another - clientPort++; - continue; + + // running all the ZK servers + for (int i = 0; i < numZooKeeperServers; i++) { + File dir = new File(baseDir, "zookeeper_"+i).getAbsoluteFile(); + recreateDir(dir); + clientPort = defaultClientPort; + int tickTimeToUse; + if (this.tickTime > 0) { + tickTimeToUse = this.tickTime; + } else { + tickTimeToUse = TICK_TIME; } - break; + ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse); + NIOServerCnxn.Factory standaloneServerFactory; + while (true) { + try { + standaloneServerFactory = + new NIOServerCnxn.Factory(new InetSocketAddress(clientPort)); + } catch (BindException e) { + LOG.info("Failed binding ZK Server to client port: " + clientPort); + //this port is already in use. try to use another + clientPort++; + continue; + } + break; + } + + // Start up this ZK server + standaloneServerFactory.startup(server); + if (!waitForServerUp(clientPort, CONNECTION_TIMEOUT)) { + throw new IOException("Waiting for startup of standalone server"); + } + + clientPortList.add(clientPort); + standaloneServerFactoryList.add(standaloneServerFactory); + zooKeeperServers.add(server); } - standaloneServerFactory.startup(server); - - if (!waitForServerUp(clientPort, CONNECTION_TIMEOUT)) { - throw new IOException("Waiting for startup of standalone server"); - } - + + // set the first one to be active ZK; Others are backups + activeZKServerIndex = 0; started = true; - LOG.info("Started MiniZK Server on client port: " + clientPort); + clientPort = clientPortList.get(activeZKServerIndex); + LOG.info("Started MiniZK Cluster and connect 1 ZK server " + + "on client port: " + clientPort); return clientPort; } @@ -144,14 +184,97 @@ if (!started) { return; } + // shut down all the zk servers + for (int i = 0; i < standaloneServerFactoryList.size(); i++) { + NIOServerCnxn.Factory standaloneServerFactory = + standaloneServerFactoryList.get(i); + int clientPort = clientPortList.get(i); + + standaloneServerFactory.shutdown(); + if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) { + throw new IOException("Waiting for shutdown of standalone server"); + } + } + // clear everything + started = false; + activeZKServerIndex = 0; + standaloneServerFactoryList.clear(); + clientPortList.clear(); + zooKeeperServers.clear(); + + LOG.info("Shutdown MiniZK cluster with all ZK servers"); + } + + /**@return clientPort return clientPort if there is another ZK backup can run + * when killing the current active; return -1, if there is no backups. + * @throws IOException + * @throws InterruptedException + */ + public int killCurrentActiveZooKeeperServer() throws IOException, + InterruptedException { + if (!started || activeZKServerIndex < 0 ) { + return -1; + } + + // Shutdown the current active one + NIOServerCnxn.Factory standaloneServerFactory = + standaloneServerFactoryList.get(activeZKServerIndex); + int clientPort = clientPortList.get(activeZKServerIndex); + standaloneServerFactory.shutdown(); if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) { throw new IOException("Waiting for shutdown of standalone server"); } - - started = false; + + // remove the current active zk server + standaloneServerFactoryList.remove(activeZKServerIndex); + clientPortList.remove(activeZKServerIndex); + zooKeeperServers.remove(activeZKServerIndex); + LOG.info("Kill the current active ZK servers in the cluster " + + "on client port: " + clientPort); + + if (standaloneServerFactoryList.size() == 0) { + // there is no backup servers; + return -1; + } + clientPort = clientPortList.get(activeZKServerIndex); + LOG.info("Activate a backup zk server in the cluster " + + "on client port: " + clientPort); + // return the next back zk server's port + return clientPort; } + + /** + * Kill one back up ZK servers + * @throws IOException + * @throws InterruptedException + */ + public void killOneBackupZooKeeperServer() throws IOException, + InterruptedException { + if (!started || activeZKServerIndex < 0 || + standaloneServerFactoryList.size() <= 1) { + return ; + } + + int backupZKServerIndex = activeZKServerIndex+1; + // Shutdown the current active one + NIOServerCnxn.Factory standaloneServerFactory = + standaloneServerFactoryList.get(backupZKServerIndex); + int clientPort = clientPortList.get(backupZKServerIndex); + + standaloneServerFactory.shutdown(); + if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) { + throw new IOException("Waiting for shutdown of standalone server"); + } + + // remove this backup zk server + standaloneServerFactoryList.remove(backupZKServerIndex); + clientPortList.remove(backupZKServerIndex); + zooKeeperServers.remove(backupZKServerIndex); + LOG.info("Kill one backup ZK servers in the cluster " + + "on client port: " + clientPort); + } // XXX: From o.a.zk.t.ClientBase private static boolean waitForServerDown(int port, long timeout) { Index: src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java (revision 1084033) +++ src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java (working copy) @@ -121,7 +121,7 @@ if (zkClientPort == 0) { throw new IOException("No config value for hbase.zookeeper.property.clientPort"); } - zooKeeperCluster.setClientPort(zkClientPort); + zooKeeperCluster.setDefaultClientPort(zkClientPort); int clientPort = zooKeeperCluster.startup(zkDataPath); if (clientPort != zkClientPort) { String errorMsg = "Couldnt start ZK at requested address of " +