Index: src/test/java/org/apache/hadoop/hbase/ServerManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/ServerManager.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/ServerManager.java (revision 0) @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.StringTokenizer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Shell; + + +/** + * Utility to start/kill servers in a distributed environment. Assumes Unix-like + * commands are available like 'ps', 'kill', etc. Also assumes the user running + * the test has enough "power" to start & stop servers on the remote machines + * (for example, the test user could be the same user as the user the daemon is + * running as) + */ +public abstract class ServerManager { + private static final Log LOG = LogFactory.getLog(ServerManager.class); + + enum Operation {STOP, RESUME, START, KILL}; + public enum ServerTypeName {REGIONSERVER, MASTER}; //hardcoded for now + + final static String REGIONSERVER = "regionserver"; + final static String MASTER = "master"; + final String scratchSpaceDir; + private final String slavesFile; + private Random rand = new Random(System.currentTimeMillis()); + + public ServerManager(String scratchSpaceDir) { + this.scratchSpaceDir = scratchSpaceDir; + this.slavesFile = scratchSpaceDir + Path.SEPARATOR + + "slave-" + rand.nextInt(); + } + + public static ServerManager getInstanceOfManager(ServerTypeName type, + String scratchSpaceDir) { + if (type == ServerTypeName.REGIONSERVER) { //this could be replaced by config key/val mappings + return new RegionServerManager(scratchSpaceDir); + } + return null; + } + + /** + * The following public methods are for controlling servers using an instance + * of this class. Currently these are not thread-safe. + */ + public void stopServer(Collection servers) throws IOException { + runOperationOnServers(Operation.STOP, servers); + } + public void resumeServer(Collection servers) throws IOException { + runOperationOnServers(Operation.RESUME, servers); + } + public void startServer(Collection servers) throws IOException { + runOperationOnServers(Operation.START, servers); + } + public void killServer(Collection servers) throws IOException { + runOperationOnServers(Operation.KILL, servers); + } + public void stopServer(String server) throws IOException { + ArrayList arr = new ArrayList(); + arr.add(server); + runOperationOnServers(Operation.STOP, arr); + } + public void resumeServer(String server) throws IOException { + ArrayList servers = new ArrayList(); + servers.add(server); + runOperationOnServers(Operation.RESUME, servers); + } + public void startServer(String server) throws IOException { + ArrayList servers = new ArrayList(); + servers.add(server); + runOperationOnServers(Operation.START, servers); + } + public void killServer(String server) throws IOException { + ArrayList servers = new ArrayList(); + servers.add(server); + runOperationOnServers(Operation.KILL, servers); + } + + private void runOperationOnServers(Operation operation, + Collection servers) throws IOException { + try { + writeServersToAffectFile(servers); + runOperation(operation); + } finally { + cleanupScratchSpace(); + } + } + + private void writeServersToAffectFile(Collection servers) + throws IOException { + //not threadsafe + FileOutputStream fos = new FileOutputStream(new File(slavesFile)); + for (String server : servers) { + LOG.info(new Date() + " Marking server on host: " + server); + fos.write((server + "\n").getBytes()); + } + fos.close(); + } + + private void cleanupScratchSpace() { + new File(slavesFile).delete(); + } + + /** + * The following protected methods are expected to be implemented by the + * respective server extensions/implementations of ServerManager + */ + /** + * Get the env name that the server scripts use to point to the slaves file + * (like HBASE_REGIONSERVERS file in the case of hbase). + * IMPORTANT NOTE: MAKE SURE that the scripts on the machine where this test + * is run from doesn't set the env internally (that would interfere with the + * tests) + * @return + */ + protected abstract String getSlavesFileEnvName(); + /** + * Get the env name that the server scripts use to point to the HOME of + * the installation (like HBASE_HOME in the case of hbase) + * @return + */ + protected abstract String getHomeEnvName(); + /** + * The Ssh command path that ssh'es out to the remote nodes and runs commands + * (like bin/regionservers.sh in the case of hbase). If the path doesn't + * begin with a slash, the assumption is that the path is relative to + * the return value of {@link #getHomeEnvName()}, and the path is normalized + * by this class before using it + * @return + */ + protected abstract String getSshCommandPath(); + /** + * The command for 'stopping' the server + * @return + */ + protected abstract String getStopCommand(); + /** + * The command for 'resuming' a 'stopped' server + * @return + */ + protected abstract String getResumeCommand(); + /** + * The command for 'starting' the server + * @return + */ + protected abstract String getStartCommand(); + /** + * The command for 'killing' the server + * @return + */ + protected abstract String getKillCommand(); + + protected abstract ServerTypeName getType(); + + private void runOperation(Operation operation) throws IOException { + Map hMap = new HashMap(); + hMap.put(getSlavesFileEnvName(), slavesFile); + + StringTokenizer strToken; + if (operation == Operation.STOP) { + strToken = new StringTokenizer(getStopCommand(), " "); + } else if(operation == Operation.RESUME) { + strToken = new StringTokenizer(getResumeCommand(), " "); + } else if(operation == Operation.START) { + strToken = new StringTokenizer(getStartCommand(), " "); + } else if(operation == Operation.KILL) { + strToken = new StringTokenizer(getKillCommand(), " "); + } else { + throw new RuntimeException(operation + " currently not supported"); + } + + String commandArgs[] = new String[strToken.countTokens() + 1]; + int i = 0; + commandArgs[i++] = normalizeSshCommandPath(getSshCommandPath()); + while (strToken.hasMoreTokens()) { + commandArgs[i++] = strToken.nextToken(); + } + String output = Shell.execCommand(hMap, commandArgs); + if (output != null && !output.equals("")) { + LOG.info(output); + } + } + private String normalizeSshCommandPath(String command) { + if (command.startsWith("/")) { + return command; + } + final String home; + if ((home = System.getenv(getHomeEnvName())) != null) { + command = home + Path.SEPARATOR + command; + } + return command; + } +} + +class RegionServerManager extends ServerManager { + final private String STOP_COMMAND = "ps uwwx | grep java | grep " + + "proc_regionserver"+ " |" + + " grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -s STOP"; + final private String RESUME_COMMAND = "ps uwwx | grep java | grep " + + "proc_regionserver"+ " |" + + " grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -s CONT"; + final private String START_COMMAND = System.getenv("HBASE_HOME") + + "/bin/hbase-daemon.sh start regionserver"; + final private String KILL_COMMAND = "ps uwwx | grep java | grep " + + "proc_regionserver"+ " |" + + " grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -9"; + RegionServerManager(String scratchSpaceDir) { + super(scratchSpaceDir); + } + + @Override + protected String getSlavesFileEnvName() { + return "HBASE_REGIONSERVERS"; + } + @Override + protected String getStopCommand() { + return STOP_COMMAND; + } + @Override + protected String getResumeCommand() { + return RESUME_COMMAND; + } + @Override + protected String getStartCommand() { + return START_COMMAND; + } + @Override + protected String getKillCommand() { + return KILL_COMMAND; + } + @Override + protected String getHomeEnvName() { + return "HBASE_HOME"; + } + @Override + protected String getSshCommandPath() { + return "bin/regionservers.sh"; + } + @Override + protected ServerTypeName getType() { + return ServerTypeName.REGIONSERVER; + } +} \ No newline at end of file Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1350658) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -102,7 +102,7 @@ private boolean passedZkCluster = false; private MiniDFSCluster dfsCluster = null; - private MiniHBaseCluster hbaseCluster = null; + private HBaseClusterShim hbaseCluster = null; private MiniMRCluster mrCluster = null; // Directory where we put the data for this instance of HBaseTestingUtility @@ -126,6 +126,16 @@ */ public static final String BASE_TEST_DIRECTORY_KEY = "test.build.data.basedirectory"; + + /** + * Is this test using a real cluster + */ + public boolean realCluster; + + /** + * Configuration that controls whether this utility assumes a running cluster + */ + public static final String IS_REAL_CLUSTER = "cluster.real"; /** * Default base directory for test output. @@ -150,6 +160,7 @@ public HBaseTestingUtility(Configuration conf) { this.conf = conf; + this.realCluster = conf.getBoolean(IS_REAL_CLUSTER, false); } /** @@ -539,9 +550,31 @@ t.close(); LOG.info("Minicluster is up"); //getHBaseAdmin(); - return this.hbaseCluster; + //TODO: this is explicitly typecasting. Once we have all the callers of the + //API switch over to accepting HBaseCluster (as opposed to MiniHBaseCluster), + //as the return value, we can remove this typecast + return (MiniHBaseCluster)this.hbaseCluster; } + public HBaseClusterShim startHBaseCluster(Configuration conf, + int nMasters, int nRegionServers) throws Exception { + Configuration c = new Configuration(conf); + HBaseClusterShim cluster = HBaseClusterShim.getHBaseClusterInstance(c, + nMasters, nRegionServers); + HTable t = new HTable(c, HConstants.META_TABLE_NAME); + ResultScanner s = t.getScanner(new Scan()); + while (s.next() != null) { + continue; + } + s.close(); + t.close(); + return cluster; + } + public HBaseClusterShim startHBaseCluster(Configuration conf, + int nRegionServers) throws Exception { + return startHBaseCluster(conf,1,nRegionServers); + } + /** * Starts the hbase cluster up again after shutting it down previously in a * test. Use this if you want to keep dfs/zk up and just stop/start hbase. @@ -567,7 +600,14 @@ * @see #startMiniCluster() */ public MiniHBaseCluster getMiniHBaseCluster() { - return this.hbaseCluster; + if (this.hbaseCluster instanceof MiniHBaseCluster) { + //TODO: this is explicitly typecasting. Once we have all the callers of the + //API switch over to accepting HBaseCluster (as opposed to MiniHBaseCluster), + //as the return value, we can remove this typecast + return (MiniHBaseCluster)this.hbaseCluster; + } + throw new RuntimeException(hbaseCluster + " not an instance of " + + MiniHBaseCluster.class.getName()); } /** @@ -603,7 +643,7 @@ if (this.hbaseCluster != null) { this.hbaseCluster.shutdown(); // Wait till hbase is down before going on to shutdown zk. - this.hbaseCluster.join(); + this.hbaseCluster.waitUntilShutDown(); this.hbaseCluster = null; } } @@ -641,7 +681,14 @@ * @throws IOException */ public void flush() throws IOException { - this.hbaseCluster.flushcache(); + if (this.hbaseCluster instanceof MiniHBaseCluster) { + //TODO: this is explicitly typecasting. Once we have all the callers of the + //API switch over to accepting HBaseCluster (as opposed to MiniHBaseCluster), + //as the return value, we can remove this typecast + ((MiniHBaseCluster)this.hbaseCluster).flushcache(); + } + throw new RuntimeException(hbaseCluster + " not an instance of " + + MiniHBaseCluster.class.getName()); } /** @@ -649,7 +696,14 @@ * @throws IOException */ public void flush(byte [] tableName) throws IOException { - this.hbaseCluster.flushcache(tableName); + if (this.hbaseCluster instanceof MiniHBaseCluster) { + //TODO: this is explicitly typecasting. Once we have all the callers of the + //API switch over to accepting HBaseCluster (as opposed to MiniHBaseCluster), + //as the return value, we can remove this typecast + ((MiniHBaseCluster)this.hbaseCluster).flushcache(tableName); + } + throw new RuntimeException(hbaseCluster + " not an instance of " + + MiniHBaseCluster.class.getName()); } @@ -963,6 +1017,18 @@ Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") }; + + public static final byte[][] NON_EMPTY_KEYS = { + Bytes.toBytes("bbb"), + Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), + Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), + Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"), + Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), + Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), + Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), + Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"), + Bytes.toBytes("xxx"), Bytes.toBytes("yyy") + }; /** * Creates many regions names "aaa" to "zzz". @@ -1045,7 +1111,12 @@ HBaseAdmin admin = getHBaseAdmin(); if (admin.isTableEnabled(table.getTableName())) { for(HRegionInfo hri : newRegions) { - hbaseCluster.getMaster().assignRegion(hri); + if (hbaseCluster instanceof MiniHBaseCluster) { + ((MiniHBaseCluster)hbaseCluster).getMaster().assignRegion(hri); + } else { + ((RealHBaseCluster)hbaseCluster).getMaster() + .assign(hri.getRegionName()); + } } } @@ -1158,7 +1229,12 @@ byte [] firstrow = metaRows.get(0); LOG.debug("FirstRow=" + Bytes.toString(firstrow)); int index = hbaseCluster.getServerWith(firstrow); - return hbaseCluster.getRegionServerThreads().get(index).getRegionServer(); + if (hbaseCluster instanceof MiniHBaseCluster) { + return ((MiniHBaseCluster)hbaseCluster).getRegionServerThreads() + .get(index).getRegionServer(); + } else { + throw new RuntimeException("Not implemented yet"); + } } /** @@ -1225,8 +1301,11 @@ * @throws Exception */ public void expireMasterSession() throws Exception { - HMaster master = hbaseCluster.getMaster(); - expireSession(master.getZooKeeper(), master); + if (hbaseCluster instanceof MiniHBaseCluster) { + HMaster master = ((MiniHBaseCluster)hbaseCluster).getMaster(); + expireSession(master.getZooKeeper(), master); + } + throw new RuntimeException("Not implemented yet"); } /** @@ -1235,8 +1314,11 @@ * @throws Exception */ public void expireRegionServerSession(int index) throws Exception { - HRegionServer rs = hbaseCluster.getRegionServer(index); - expireSession(rs.getZooKeeper(), rs); + if (hbaseCluster instanceof MiniHBaseCluster) { + HRegionServer rs = ((MiniHBaseCluster)hbaseCluster).getRegionServer(index); + expireSession(rs.getZooKeeper(), rs); + } + throw new RuntimeException("Not implemented yet"); } public void expireSession(ZooKeeperWatcher nodeZK, Server server) @@ -1274,6 +1356,10 @@ * @return hbase cluster */ public MiniHBaseCluster getHBaseCluster() { + return getMiniHBaseCluster(); + } + + public HBaseClusterShim getHBaseClusterShim() { return hbaseCluster; } @@ -1411,12 +1497,22 @@ public boolean ensureSomeRegionServersAvailable(final int num) throws IOException { boolean startedServer = false; - - for (int i=hbaseCluster.getLiveRegionServerThreads().size(); i getRegionServerThreads(); + + /** + * Cause a region server to exit doing basic clean up only on its way out. + * @param serverNumber Used as index into a list. + */ + public abstract String abortRegionServer(int serverNumber); + + /** + * Wait for the specified region server to stop. Removes this thread from list + * of running threads. + * @param serverNumber + * @return Name of region server that just went down. + */ + public abstract String waitOnRegionServer(final int serverNumber); + + /** + * Get the master instance + */ + public abstract HMaster getMaster(); + + /** + * Get the master instance + * @throws MasterNotRunningException + * @throws ZooKeeperConnectionException + */ + public abstract HMasterInterface getMasterInterface() + throws ZooKeeperConnectionException, MasterNotRunningException; + + /** + * Cause a master to exit without shutting down entire cluster. + * @param serverNumber Used as index into a list. + */ + public abstract String abortMaster(int serverNumber); + + /** + * Wait for the specified master to stop. Removes this thread from list + * of running threads. + * @param serverNumber + * @return Name of master that just went down. + */ + public abstract String waitOnMaster(final int serverNumber); + + /** + * Blocks until there is an active master and that master has completed + * initialization. + * + * @return true if an active master becomes available. false if there are no + * masters left. + * @throws InterruptedException + */ + public abstract boolean waitForActiveAndReadyMaster() + throws InterruptedException; + + /** + * Wait for Mini HBase Cluster to shut down. + */ + public abstract void waitUntilShutDown(); + + /** + * Shut down the mini HBase cluster + * @throws IOException + */ + public abstract void shutdown() throws IOException; + + /** + * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} + * of HRS carrying regionName. Returns -1 if none found. + */ + public int getServerWithMeta() { + return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName()); + } + + /** + * Get the location of the specified region + * @param regionName Name of the region in bytes + * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} + * of HRS carrying .META.. Returns -1 if none found. + */ + public abstract int getServerWith(byte[] regionName); + + /** + * Counts the total numbers of regions being served by the currently online + * region servers by asking each how many regions they have. Does not look + * at META at all. Count includes catalog tables. + * @return number of regions being served by all region servers + */ + public abstract long countServedRegions(); +} \ No newline at end of file Index: src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 1350658) +++ src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy) @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -44,7 +45,7 @@ * if we are running on DistributedFilesystem, create a FileSystem instance * each and will close down their instance on the way out. */ -public class MiniHBaseCluster { +public class MiniHBaseCluster extends HBaseClusterShim { static final Log LOG = LogFactory.getLog(MiniHBaseCluster.class.getName()); private Configuration conf; public LocalHBaseCluster hbaseCluster; @@ -71,9 +72,7 @@ public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers) throws IOException, InterruptedException { - this.conf = conf; - conf.set(HConstants.MASTER_PORT, "0"); - init(numMasters, numRegionServers); + super(conf, numMasters, numRegionServers); } public Configuration getConfiguration() { @@ -179,7 +178,7 @@ } private void init(final int nMasterNodes, final int nRegionNodes) - throws IOException, InterruptedException { + throws IOException { try { // start up a LocalHBaseCluster hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0, @@ -299,6 +298,13 @@ } return t; } + + /** + * Return the master interface + */ + public HMasterInterface getMasterInterface() { + return this.hbaseCluster.getActiveMaster(); + } /** * Returns the current active master, if available. @@ -526,4 +532,15 @@ } return count; } + + @Override + void init(Configuration conf, int nMasterNodes, int nRegionNodes) + throws IOException { + init(nMasterNodes, nRegionNodes); + } + + @Override + public void waitUntilShutDown() { + this.hbaseCluster.join(); + } } Index: src/test/java/org/apache/hadoop/hbase/RealHBaseCluster.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/RealHBaseCluster.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/RealHBaseCluster.java (revision 0) @@ -0,0 +1,334 @@ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerManager.ServerTypeName; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.ipc.HMasterInterface; +import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; + +public class RealHBaseCluster extends HBaseClusterShim { + private ServerManager regionServerManager; + private ServerManager masterServerManager; + private static String TMP_SPACE = "/tmp"; + private static int DEFAULT_PORT = 60020; //fix this to look at config + List regionServers = new ArrayList(); + List masterServers = new ArrayList(); + //simple bitmap indexes + int runningRegionServers[]; + int runningMasterServers[]; + Configuration conf; + static int index = 0; + + public RealHBaseCluster(Configuration conf, int numMasters, + int numRegionServers) throws IOException, InterruptedException { + super(conf, numMasters, numRegionServers); + } + + @Override + void init(Configuration conf, int nMasterNodes, int nRegionNodes) + throws IOException { + for (ServerTypeName t : ServerTypeName.values()) { + ServerManager s = ServerManager.getInstanceOfManager(t, TMP_SPACE); + String[] servers = conf.getStrings(t.toString()); + ArrayList serverList = new ArrayList(); + for (String str : servers) { + serverList.add(str); + } + if (t.toString().equals("REGIONSERVER")) { + regionServerManager = s; + regionServers.addAll(serverList); + } else if (t.toString().equals("MASTER")) { + masterServerManager = s; + masterServers.addAll(serverList); + } + } + + for (String server : regionServers) { + //kill all servers to begin with + regionServerManager.killServer(server); + } + for (String server : masterServers) { + //kill all servers to begin with + masterServerManager.killServer(server); + } + runningMasterServers = new int[masterServers.size()]; + runningRegionServers = new int[regionServers.size()]; + //TODO: cleanup filesystem and ZooKeeper + + //start the number of master nodes and region servers as asked for + for (int i = 0; i < nMasterNodes; i++) { + masterServerManager.startServer(masterServers.get(i)); + //index into the list of masters + runningMasterServers[i] = 1; + } + for (int i = 0; i < nRegionNodes; i++) { + regionServerManager.startServer(regionServers.get(i)); + //index into the list of regionservers + runningRegionServers[i] = 1; + } + this.conf = HBaseConfiguration.create(conf); + } + + public JVMClusterUtil.RealRegionServerThread startRegionServer() + throws IOException { + //start a new region server + int idx = -1; + for (int i = 0; i < runningRegionServers.length; i++) { + if (runningRegionServers[i] == 0) { + idx = i; + break; + } + } + if (idx == -1) { + throw new IOException("Don't have any more RS host!"); + } + + String server = regionServers.get(idx); + LOG.info("Starting RS " + server); + regionServerManager.startServer(server); + //manipulate the indexes to the global list + runningRegionServers[idx] = 1; + HRegionInterface regionServer = + HConnectionManager.getConnection(getConfiguration()) + .getHRegionConnection(server, DEFAULT_PORT); + JVMClusterUtil.RealRegionServerThread serverThread = + new JVMClusterUtil.RealRegionServerThread(regionServer); + return serverThread; + } + + @Override + public String abortRegionServer(int serverNumber) { + try { + String server = regionServers.get(serverNumber); + LOG.info("Aborting RS " + server); + regionServerManager.killServer(server); + } catch (IOException e) { + LOG.warn("Failed to abort region server " + e); + throw new RuntimeException(e); + } + runningRegionServers[serverNumber] = 0; + return regionServers.get(serverNumber); + } + + @Override + public String waitOnRegionServer(int serverNumber) { + throw new RuntimeException("Not implemented yet"); + } + + public void startMaster() throws IOException { + //start a new master + int idx = -1; + for (int i = 0; i < runningMasterServers.length; i++) { + if (runningRegionServers[i] == 0) { + idx = i; + break; + } + } + if (idx == -1) { + throw new IOException("Don't have any more RS host!"); + } + String server = masterServers.get(idx); + LOG.info("Starting Master " + server); + masterServerManager.startServer(server); + //manipulate the indexes to the global list + runningMasterServers[idx] = 1; + } + + public HMasterInterface getMasterInterface() + throws ZooKeeperConnectionException, MasterNotRunningException { + HConnection conn = HConnectionManager.getConnection(conf); + return conn.getMaster(); + } + + public HMasterInterface getMasterInterface(int serverNumber) { + throw new RuntimeException("Not implemented yet"); + } + + @Override + public String abortMaster(int serverNumber) { + if (runningMasterServers[serverNumber] == 0) { + throw new RuntimeException("Master " + serverNumber + " not running!"); + } + String server = masterServers.get(serverNumber); + try { + masterServerManager.killServer(server); + } catch (IOException e) { + // TODO Auto-generated catch block + LOG.warn("Failed to kill Master " + server); + throw new RuntimeException(e); + } + return server; + } + + public String stopMaster(int serverNumber) { + if (runningMasterServers[serverNumber] == 0) { + throw new RuntimeException("Master " + serverNumber + " not running!"); + } + String server = masterServers.get(serverNumber); + try { + masterServerManager.stopServer(server); + } catch (IOException e) { + // TODO Auto-generated catch block + LOG.warn("Failed to kill Master " + server); + throw new RuntimeException(e); + } + return server; + } + + @Override + public String waitOnMaster(int serverNumber) { + throw new RuntimeException("Not implemented yet"); + } + + @Override + public boolean waitForActiveAndReadyMaster() throws InterruptedException { + while (true) { + try { + getMasterInterface(); + return true; + } catch (MasterNotRunningException m) { + LOG.warn("Master not started yet " + m); + } catch (ZooKeeperConnectionException e) { + // TODO Auto-generated catch block + LOG.warn("Failed to connect to ZK " + e); + } + Thread.sleep(1000); + } + } + + public List getLiveMasters() { + List masters = new ArrayList(); + for (int i = 0; i < runningMasterServers.length; i++) { + if (runningMasterServers[i] == 1) { + masters.add(masterServers.get(i)); + } + } + return masters; + } + + @Override + public void waitUntilShutDown() { + //Simply wait for a few seconds for now (after issuing serverManager.kill + throw new RuntimeException("Not implemented yet"); + } + + @Override + public void shutdown() throws IOException { + // TODO Auto-generated method stub + for (int i = 0; i< runningMasterServers.length; i++) { + if (runningMasterServers[i] == 1) { + masterServerManager.killServer(masterServers.get(i)); + } + } + for (int i = 0; i< runningRegionServers.length; i++) { + if (runningRegionServers[i] == 1) { + masterServerManager.killServer(regionServers.get(i)); + } + } + } + + public List getRegionServers() { + return regionServers; + } + + public List getLiveRegionServers() { + List servers = new ArrayList(); + for (int i = 0; i < runningRegionServers.length; i++) { + if (runningRegionServers[i] == 1) { + servers.add(regionServers.get(i)); + } + } + return servers; + } + + public HRegionInterface getRegionServer(int serverNumber) { + String server = regionServers.get(serverNumber); + try { + return HConnectionManager.getConnection(conf).getHRegionConnection(server, + DEFAULT_PORT); + } catch (ZooKeeperConnectionException e) { + LOG.warn("Failed to connect to ZK " + e); + throw new RuntimeException("Failed to connect to ZK " + e); + } catch (IOException e) { + LOG.warn("Failed to get regionserver " + serverNumber + " " + e); + throw new RuntimeException("Failed to get regionserver " + + serverNumber + " " + e); + } + } + + @Override + public int getServerWith(byte[] regionName) { + for (int i = 0; i < runningRegionServers.length; i++) { + if (runningRegionServers[i] == 1) { + try { + HRegionInterface hrs = + HConnectionManager.getConnection(conf) + .getHRegionConnection(regionServers.get(i), + DEFAULT_PORT); + List regions = hrs.getOnlineRegions(); + String regionNameStr = new String(regionName); + for (HRegionInfo info : regions) { + if (info.getRegionNameAsString().equals(regionNameStr)) { + return i; + } + } + } catch (ZooKeeperConnectionException e) { + throw new RuntimeException("Failed to connect to ZK " + e); + } catch (IOException e) { + throw new RuntimeException("Failed to get server with " + + regionName + " " + e); + } + } + } + return -1; + } + + @Override + public long countServedRegions() { + int num = 0; + for (int i = 0; i < runningRegionServers.length; i++) { + if (runningRegionServers[i] == 1) { + try { + HRegionInterface hrs = + HConnectionManager.getConnection(conf) + .getHRegionConnection(regionServers.get(i), + DEFAULT_PORT); + List regions = hrs.getOnlineRegions(); + num += regions.size(); + } catch (ZooKeeperConnectionException e) { + throw new RuntimeException("Failed to connect to ZK " + e); + } catch (IOException e) { + throw new RuntimeException("Failed to get regions from " + + regionServers.get(i)); + } + } + } + return num; + } + + @Override + public RegionServerThread stopRegionServer(int serverNumber, + boolean shutdownFS) { + // TODO Auto-generated method stub + return null; + } + + @Override + public List getRegionServerThreads() { + // TODO Auto-generated method stub + return null; + } + + @Override + public HMaster getMaster() { + throw new RuntimeException("Not implemented in this class"); + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java (revision 1350658) +++ src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java (working copy) @@ -27,6 +27,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.ipc.HMasterInterface; +import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.ShutdownHook; @@ -43,6 +46,9 @@ public static class RegionServerThread extends Thread { private final HRegionServer regionServer; + public RegionServerThread() { + this.regionServer = null; + } public RegionServerThread(final HRegionServer r, final int index) { super(r, "RegionServer:" + index + ";" + r.getServerName()); this.regionServer = r; @@ -52,6 +58,10 @@ public HRegionServer getRegionServer() { return this.regionServer; } + + public HRegionInterface getRegionServerInterface() { + return this.regionServer; + } /** * Block until the region server has come online, indicating it is ready @@ -72,7 +82,35 @@ } } } + + /** + * The wrapper for real regionserver + * + */ + public static class RealRegionServerThread extends RegionServerThread { + private final HRegionInterface regionServerInterface; + public RealRegionServerThread(final HRegionInterface r) { + this.regionServerInterface = r; + } + + /** @return the region server */ + public HRegionInterface getRegionServerInterface() { + return this.regionServerInterface; + } + + /** + * Block until the region server has come online, indicating it is ready + * to be used. + */ + public void waitForServerOnline() { + return; //TODO: make this look at the RS state via RPC etc. + } + public HRegionServer getRegionServer() { + throw new RuntimeException("Not implemented in this class"); + } + } + /** * Creates a {@link RegionServerThread}. * Call 'start' on the returned thread to make it run. @@ -101,6 +139,23 @@ } return new JVMClusterUtil.RegionServerThread(server, index); } + /** + * Creates a {@link RegionServerThread}. + * Call 'start' on the returned thread to make it run. + * @param c Configuration to use. + * @param hrsc Class to create. + * @param index Used distinguishing the object returned. + * @throws IOException + * @return Region server added. + */ + public static JVMClusterUtil.RealRegionServerThread createRegionServerInterface( + final Configuration c, String hostname, int port) + throws IOException { + HRegionInterface server; + server = + HConnectionManager.getConnection(c).getHRegionConnection(hostname,port); + return new JVMClusterUtil.RealRegionServerThread(server); + } /** @@ -109,6 +164,9 @@ public static class MasterThread extends Thread { private final HMaster master; + public MasterThread() { + this.master = null; + } public MasterThread(final HMaster m, final int index) { super(m, "Master:" + index + ";" + m.getServerName()); this.master = m; @@ -134,8 +192,44 @@ } } } + public HMasterInterface getMasterInterface() { + return master; + } } + + public static class RealMasterThread extends MasterThread { + private final HMasterInterface masterInterface; + public RealMasterThread(final HMasterInterface m) { + this.masterInterface = m; + } + /** @return the master */ + public HMaster getMaster() { + throw new RuntimeException("Not implemented in this class"); + } + + /** + * Block until the master has come online, indicating it is ready + * to be used. + */ + public void waitForServerOnline() { + // The server is marked online after init begins but before race to become + // the active master. + while (!this.masterInterface.isMasterRunning()) { + //TODO: do we need to check for more conditions + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // continue waiting + } + } + } + + public HMasterInterface getMasterInterface() { + return masterInterface; + } + } + /** * Creates a {@link MasterThread}. * Call 'start' on the returned thread to make it run. Index: bin/hbase =================================================================== --- bin/hbase (revision 1350658) +++ bin/hbase (working copy) @@ -305,7 +305,7 @@ # Exec unless HBASE_NOEXEC is set. if [ "${HBASE_NOEXEC}" != "" ]; then - "$JAVA" -XX:OnOutOfMemoryError="kill -9 %p" $JAVA_HEAP_MAX $HBASE_OPTS -classpath "$CLASSPATH" $CLASS "$@" + "$JAVA" -Dproc_$COMMAND {-XX:OnOutOfMemoryError="kill -9 %p" $JAVA_HEAP_MAX $HBASE_OPTS -classpath "$CLASSPATH" $CLASS "$@" else - exec "$JAVA" -XX:OnOutOfMemoryError="kill -9 %p" $JAVA_HEAP_MAX $HBASE_OPTS -classpath "$CLASSPATH" $CLASS "$@" + exec "$JAVA" -Dproc_$COMMAND -XX:OnOutOfMemoryError="kill -9 %p" $JAVA_HEAP_MAX $HBASE_OPTS -classpath "$CLASSPATH" $CLASS "$@" fi