Index: src/test/java/org/apache/hadoop/hbase/ServerManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/ServerManager.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/ServerManager.java (revision 0) @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.StringTokenizer; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Shell; + + +/** + * Utility to start/kill servers in a distributed environment. Assumes Unix-like + * commands are available like 'ps', 'kill', etc. Also assumes the user running + * the test has enough "power" to start & stop servers on the remote machines + * (for example, the test user could be the same user as the user the daemon is + * running as) + */ +public abstract class ServerManager { + private static final Log LOG = LogFactory.getLog(ServerManager.class); + + enum Operation {STOP, RESUME, START, KILL}; + public enum ServerTypeName {REGIONSERVER, MASTER}; //hardcoded for now + + final static String REGIONSERVER = "regionserver"; + final static String MASTER = "master"; + final String scratchSpaceDir; + private final String slavesFile; + private Random rand = new Random(System.currentTimeMillis()); + private static String TEST_CONF_FILE = "test-conf.xml"; + + public ServerManager(String scratchSpaceDir) { + this.scratchSpaceDir = scratchSpaceDir; + this.slavesFile = scratchSpaceDir + Path.SEPARATOR + + "slave-" + rand.nextInt(); + } + + public static ServerManager getInstanceOfManager(ServerTypeName type, + String scratchSpaceDir) { + if (type == ServerTypeName.REGIONSERVER) { //this could be replaced by config key/val mappings + return new RegionServerManager(scratchSpaceDir); + } + return null; + } + + public static RandomKiller getRandomKillerInstance( + Collection types, String scratchSpaceDir) { + return new RandomKiller(types, scratchSpaceDir); + } + + /** + * The following public methods are for controlling servers using an instance + * of this class. Currently these are not thread-safe. + */ + public void stopServer(Collection servers) throws IOException { + runOperationOnServers(Operation.STOP, servers); + } + public void resumeServer(Collection servers) throws IOException { + runOperationOnServers(Operation.RESUME, servers); + } + public void startServer(Collection servers) throws IOException { + runOperationOnServers(Operation.START, servers); + } + public void killServer(Collection servers) throws IOException { + runOperationOnServers(Operation.KILL, servers); + } + public void stopServer(String server) throws IOException { + ArrayList arr = new ArrayList(); + arr.add(server); + runOperationOnServers(Operation.STOP, arr); + } + public void resumeServer(String server) throws IOException { + ArrayList servers = new ArrayList(); + servers.add(server); + runOperationOnServers(Operation.RESUME, servers); + } + public void startServer(String server) throws IOException { + ArrayList servers = new ArrayList(); + servers.add(server); + runOperationOnServers(Operation.START, servers); + } + public void killServer(String server) throws IOException { + ArrayList servers = new ArrayList(); + servers.add(server); + runOperationOnServers(Operation.KILL, servers); + } + + private void runOperationOnServers(Operation operation, + Collection servers) throws IOException { + try { + writeServersToAffectFile(servers); + runOperation(operation); + } finally { + cleanupScratchSpace(); + } + } + + private void writeServersToAffectFile(Collection servers) + throws IOException { + //not threadsafe + FileOutputStream fos = new FileOutputStream(new File(slavesFile)); + for (String server : servers) { + LOG.info(new Date() + " Marking server on host: " + server); + fos.write((server + "\n").getBytes()); + } + fos.close(); + } + + private void cleanupScratchSpace() { + new File(slavesFile).delete(); + } + + /** + * The following protected methods are expected to be implemented by the + * respective server extensions/implementations of ServerManager + */ + /** + * Get the env name that the server scripts use to point to the slaves file + * (like HBASE_REGIONSERVERS file in the case of hbase). + * IMPORTANT NOTE: MAKE SURE that the scripts on the machine where this test + * is run from doesn't set the env internally (that would interfere with the + * tests) + * @return + */ + protected abstract String getSlavesFileEnvName(); + /** + * Get the env name that the server scripts use to point to the HOME of + * the installation (like HBASE_HOME in the case of hbase) + * @return + */ + protected abstract String getHomeEnvName(); + /** + * The Ssh command path that ssh'es out to the remote nodes and runs commands + * (like bin/regionservers.sh in the case of hbase). If the path doesn't + * begin with a slash, the assumption is that the path is relative to + * the return value of {@link #getHomeEnvName()}, and the path is normalized + * by this class before using it + * @return + */ + protected abstract String getSshCommandPath(); + /** + * The command for 'stopping' the server + * @return + */ + protected abstract String getStopCommand(); + /** + * The command for 'resuming' a 'stopped' server + * @return + */ + protected abstract String getResumeCommand(); + /** + * The command for 'starting' the server + * @return + */ + protected abstract String getStartCommand(); + /** + * The command for 'killing' the server + * @return + */ + protected abstract String getKillCommand(); + + protected abstract ServerTypeName getType(); + + private void runOperation(Operation operation) throws IOException { + Map hMap = new HashMap(); + hMap.put(getSlavesFileEnvName(), slavesFile); + + StringTokenizer strToken; + if (operation == Operation.STOP) { + strToken = new StringTokenizer(getStopCommand(), " "); + } else if(operation == Operation.RESUME) { + strToken = new StringTokenizer(getResumeCommand(), " "); + } else if(operation == Operation.START) { + strToken = new StringTokenizer(getStartCommand(), " "); + } else if(operation == Operation.KILL) { + strToken = new StringTokenizer(getKillCommand(), " "); + } else { + throw new RuntimeException(operation + " currently not supported"); + } + + String commandArgs[] = new String[strToken.countTokens() + 1]; + int i = 0; + commandArgs[i++] = normalizeSshCommandPath(getSshCommandPath()); + while (strToken.hasMoreTokens()) { + commandArgs[i++] = strToken.nextToken(); + } + String output = Shell.execCommand(hMap, commandArgs); + if (output != null && !output.equals("")) { + LOG.info(output); + } + } + private String normalizeSshCommandPath(String command) { + if (command.startsWith("/")) { + return command; + } + final String home; + if ((home = System.getenv(getHomeEnvName())) != null) { + command = home + Path.SEPARATOR + command; + } + return command; + } + + static class RandomKiller extends Thread { + Map> runningServersMap + = new TreeMap>(); + Map> nonRunningServersMap + = new TreeMap>(); + List managers; + String hostFile; + boolean pause = false; + final long SLEEP_BETWEEN_OPS = 5000; + Object pauseMonitor = new Object(); + + public RandomKiller(Collection types, + String scratchSpaceDir) { + Configuration conf = new Configuration(false); + conf.addResource(TEST_CONF_FILE); + for (ServerTypeName t : types) { + ServerManager s = getInstanceOfManager(t, scratchSpaceDir); + String[] servers = conf.getStrings(t.toString()); + ArrayList serverList = new ArrayList(); + for (String str : servers) { + serverList.add(str); + } + runningServersMap.put(s, serverList); + managers.add(s); + } + this.setDaemon(true); + this.setName("RandomStartStopServers"); + } + + public void pauseInjector() { + this.pause = true; + } + + public void resumeInjector() { + this.pause = false; + pauseMonitor.notifyAll(); + } + + private void mightpause() throws InterruptedException { + synchronized (pauseMonitor) { + while (pause) { + pauseMonitor.wait(); + } + } + } + + @Override + public void run() { + Random rand = new Random(); + int size = runningServersMap.size(); + while (true) { + //stop/start in a loop (very simple for now) + try { + int idx = rand.nextInt(size); + ServerManager manager = managers.get(idx); + //there should be at least one server running (refine this later) + List running = runningServersMap.get(manager); + //try to kill some server + if (running.size() > 1) { + int idx1 = rand.nextInt(running.size()); + String serverToStop = running.remove(idx1); + manager.killServer(serverToStop); + List nonRunning = nonRunningServersMap.get(manager); + if (nonRunning == null) { + nonRunning = new ArrayList(); + nonRunningServersMap.put(manager, nonRunning); + } + nonRunning.add(serverToStop); + Thread.sleep(SLEEP_BETWEEN_OPS); + mightpause(); + } else { + Thread.sleep(1000); //don't want this to be a tight loop + } + + idx = rand.nextInt(size); + manager = managers.get(idx); + List nonRunning = nonRunningServersMap.get(manager); + //try to start some server + if (nonRunning.size() >= 1) { + int idx1 = rand.nextInt(nonRunning.size()); + String serverToStart = nonRunning.remove(idx1); + manager.startServer(serverToStart); + running = runningServersMap.get(manager); + running.add(serverToStart); + Thread.sleep(SLEEP_BETWEEN_OPS); + mightpause(); + } else { + Thread.sleep(1000); //don't want this to be a tight loop + } + } catch (Exception ex) {//ignore + } + } + } + } +} + +class RegionServerManager extends ServerManager { + final private String STOP_COMMAND = "ps uwwx | grep java | grep " + + "proc_regionserver"+ " |" + + " grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -s STOP"; + final private String RESUME_COMMAND = "ps uwwx | grep java | grep " + + "proc_regionserver"+ " |" + + " grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -s CONT"; + final private String START_COMMAND = System.getenv("HBASE_HOME") + + "/bin/hbase-daemon.sh start regionserver"; + final private String KILL_COMMAND = "ps uwwx | grep java | grep " + + "proc_regionserver"+ " |" + + " grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -9"; + RegionServerManager(String scratchSpaceDir) { + super(scratchSpaceDir); + } + + @Override + protected String getSlavesFileEnvName() { + return "HBASE_REGIONSERVERS"; + } + @Override + protected String getStopCommand() { + return STOP_COMMAND; + } + @Override + protected String getResumeCommand() { + return RESUME_COMMAND; + } + @Override + protected String getStartCommand() { + return START_COMMAND; + } + @Override + protected String getKillCommand() { + return KILL_COMMAND; + } + @Override + protected String getHomeEnvName() { + return "HBASE_HOME"; + } + @Override + protected String getSshCommandPath() { + return "bin/regionservers.sh"; + } + @Override + protected ServerTypeName getType() { + return ServerTypeName.REGIONSERVER; + } +} + Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1344575) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -102,7 +102,7 @@ private boolean passedZkCluster = false; private MiniDFSCluster dfsCluster = null; - private MiniHBaseCluster hbaseCluster = null; + private HBaseCluster hbaseCluster = null; private MiniMRCluster mrCluster = null; // Directory where we put the data for this instance of HBaseTestingUtility @@ -126,6 +126,16 @@ */ public static final String BASE_TEST_DIRECTORY_KEY = "test.build.data.basedirectory"; + + /** + * Is this test using a real cluster + */ + public boolean realCluster; + + /** + * Configuration that controls whether this utility assumes a running cluster + */ + public static final String IS_REAL_CLUSTER = "cluster.real"; /** * Default base directory for test output. @@ -150,6 +160,7 @@ public HBaseTestingUtility(Configuration conf) { this.conf = conf; + this.realCluster = conf.getBoolean(IS_REAL_CLUSTER, false); } /** @@ -539,7 +550,10 @@ t.close(); LOG.info("Minicluster is up"); //getHBaseAdmin(); - return this.hbaseCluster; + //TODO: this is explicitly typecasting. Once we have all the callers of the + //API switch over to accepting HBaseCluster (as opposed to MiniHBaseCluster), + //as the return value, we can remove this typecast + return (MiniHBaseCluster)this.hbaseCluster; } /** @@ -567,7 +581,14 @@ * @see #startMiniCluster() */ public MiniHBaseCluster getMiniHBaseCluster() { - return this.hbaseCluster; + if (this.hbaseCluster instanceof MiniHBaseCluster) { + //TODO: this is explicitly typecasting. Once we have all the callers of the + //API switch over to accepting HBaseCluster (as opposed to MiniHBaseCluster), + //as the return value, we can remove this typecast + return (MiniHBaseCluster)this.hbaseCluster; + } + throw new RuntimeException(hbaseCluster + " not an instance of " + + MiniHBaseCluster.class.getName()); } /** @@ -603,7 +624,7 @@ if (this.hbaseCluster != null) { this.hbaseCluster.shutdown(); // Wait till hbase is down before going on to shutdown zk. - this.hbaseCluster.join(); + this.hbaseCluster.waitUntilShutDown(); this.hbaseCluster = null; } } @@ -641,7 +662,14 @@ * @throws IOException */ public void flush() throws IOException { - this.hbaseCluster.flushcache(); + if (this.hbaseCluster instanceof MiniHBaseCluster) { + //TODO: this is explicitly typecasting. Once we have all the callers of the + //API switch over to accepting HBaseCluster (as opposed to MiniHBaseCluster), + //as the return value, we can remove this typecast + ((MiniHBaseCluster)this.hbaseCluster).flushcache(); + } + throw new RuntimeException(hbaseCluster + " not an instance of " + + MiniHBaseCluster.class.getName()); } /** @@ -649,7 +677,14 @@ * @throws IOException */ public void flush(byte [] tableName) throws IOException { - this.hbaseCluster.flushcache(tableName); + if (this.hbaseCluster instanceof MiniHBaseCluster) { + //TODO: this is explicitly typecasting. Once we have all the callers of the + //API switch over to accepting HBaseCluster (as opposed to MiniHBaseCluster), + //as the return value, we can remove this typecast + ((MiniHBaseCluster)this.hbaseCluster).flushcache(tableName); + } + throw new RuntimeException(hbaseCluster + " not an instance of " + + MiniHBaseCluster.class.getName()); } @@ -963,6 +998,18 @@ Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") }; + + public static final byte[][] NON_EMPTY_KEYS = { + Bytes.toBytes("bbb"), + Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), + Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), + Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"), + Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), + Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), + Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), + Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"), + Bytes.toBytes("xxx"), Bytes.toBytes("yyy") + }; /** * Creates many regions names "aaa" to "zzz". @@ -1045,7 +1092,12 @@ HBaseAdmin admin = getHBaseAdmin(); if (admin.isTableEnabled(table.getTableName())) { for(HRegionInfo hri : newRegions) { - hbaseCluster.getMaster().assignRegion(hri); + if (hbaseCluster instanceof MiniHBaseCluster) { + ((MiniHBaseCluster)hbaseCluster).getMaster().assignRegion(hri); + } else { + ((RealHBaseCluster)hbaseCluster).getMaster() + .assign(hri.getRegionName()); + } } } @@ -1158,7 +1210,12 @@ byte [] firstrow = metaRows.get(0); LOG.debug("FirstRow=" + Bytes.toString(firstrow)); int index = hbaseCluster.getServerWith(firstrow); - return hbaseCluster.getRegionServerThreads().get(index).getRegionServer(); + if (hbaseCluster instanceof MiniHBaseCluster) { + return ((MiniHBaseCluster)hbaseCluster).getRegionServerThreads() + .get(index).getRegionServer(); + } else { + throw new RuntimeException("Not implemented yet"); + } } /** @@ -1225,8 +1282,11 @@ * @throws Exception */ public void expireMasterSession() throws Exception { - HMaster master = hbaseCluster.getMaster(); - expireSession(master.getZooKeeper(), master); + if (hbaseCluster instanceof MiniHBaseCluster) { + HMaster master = ((MiniHBaseCluster)hbaseCluster).getMaster(); + expireSession(master.getZooKeeper(), master); + } + throw new RuntimeException("Not implemented yet"); } /** @@ -1235,8 +1295,11 @@ * @throws Exception */ public void expireRegionServerSession(int index) throws Exception { - HRegionServer rs = hbaseCluster.getRegionServer(index); - expireSession(rs.getZooKeeper(), rs); + if (hbaseCluster instanceof MiniHBaseCluster) { + HRegionServer rs = ((MiniHBaseCluster)hbaseCluster).getRegionServer(index); + expireSession(rs.getZooKeeper(), rs); + } + throw new RuntimeException("Not implemented yet"); } public void expireSession(ZooKeeperWatcher nodeZK, Server server) @@ -1274,7 +1337,7 @@ * @return hbase cluster */ public MiniHBaseCluster getHBaseCluster() { - return hbaseCluster; + return getMiniHBaseCluster(); } /** @@ -1411,12 +1474,22 @@ public boolean ensureSomeRegionServersAvailable(final int num) throws IOException { boolean startedServer = false; - - for (int i=hbaseCluster.getLiveRegionServerThreads().size(); i= serverNumber) { + int prevClusterSize = admin.getClusterStatus().getServersSize(); + serverManager.startServer(realHosts.get(serverNumber-1)); + try { + Thread.sleep(5000); //give it a few + } catch(Exception e){} + if (admin.getClusterStatus().getServersSize() > prevClusterSize) { + LOG.info(log +" "+realHosts.get(serverNumber-1)); + } else { + LOG.error("Couldn't start up " + realHosts.get(serverNumber-1)); + } + } + } + /** figure out how many regions are currently being served. */ private int getRegionCount() throws IOException { int total = 0; - for (HRegionServer server : getOnlineRegionServers()) { - total += server.getOnlineRegions().size(); + for (HRegionInterfaceWithServerName server : getOnlineRegionServers()) { + total += server.hRegion.getOnlineRegions().size(); } + if (realCluster) { + + } return total; } @@ -153,20 +240,28 @@ waitForAllRegionsAssigned(); int regionCount = getRegionCount(); - List servers = getOnlineRegionServers(); - double avg = UTIL.getHBaseCluster().getMaster().getAverageLoad(); + List servers = getOnlineRegionServers(); + double avg; + if (realCluster) { + HBaseAdmin admin = new HBaseAdmin(UTIL.getConfiguration()); + ClusterStatus clusterStatus = admin.getClusterStatus(); + avg = clusterStatus.getAverageLoad(); + + } else { + avg = UTIL.getHBaseCluster().getMaster().getAverageLoad(); + } int avgLoadPlusSlop = (int)Math.ceil(avg * (1 + slop)); int avgLoadMinusSlop = (int)Math.floor(avg * (1 - slop)) - 1; LOG.debug("There are " + servers.size() + " servers and " + regionCount + " regions. Load Average: " + avg + " low border: " + avgLoadMinusSlop + ", up border: " + avgLoadPlusSlop + "; attempt: " + i); - for (HRegionServer server : servers) { - int serverLoad = server.getOnlineRegions().size(); - LOG.debug(server.getServerName() + " Avg: " + avg + " actual: " + serverLoad); + for (HRegionInterfaceWithServerName server : servers) { + int serverLoad = server.hRegion.getOnlineRegions().size(); + LOG.debug(server.serverName + " Avg: " + avg + " actual: " + serverLoad); if (!(avg > 2.0 && serverLoad <= avgLoadPlusSlop && serverLoad >= avgLoadMinusSlop)) { - LOG.debug(server.getServerName() + " Isn't balanced!!! Avg: " + avg + + LOG.debug(server.serverName + " Isn't balanced!!! Avg: " + avg + " actual: " + serverLoad + " slop: " + slop); success = false; } @@ -179,7 +274,12 @@ Thread.sleep(10000); } catch (InterruptedException e) {} - UTIL.getHBaseCluster().getMaster().balance(); + if (realCluster) { + HBaseAdmin admin = new HBaseAdmin(UTIL.getConfiguration()); + admin.getMaster().balance(); + } else { + UTIL.getHBaseCluster().getMaster().balance(); + } continue; } @@ -191,13 +291,32 @@ fail("After 5 attempts, region assignments were not balanced."); } - private List getOnlineRegionServers() { - List list = new ArrayList(); - for (JVMClusterUtil.RegionServerThread rst : - UTIL.getHBaseCluster().getRegionServerThreads()) { - if (rst.getRegionServer().isOnline()) { - list.add(rst.getRegionServer()); + private List getOnlineRegionServers() { + List list = + new ArrayList(); + if (realCluster) { + try { + HBaseAdmin admin = new HBaseAdmin(UTIL.getConfiguration()); + ClusterStatus clusterStatus = admin.getClusterStatus(); + Collectionservers = clusterStatus.getServers(); + HConnection connection = admin.getConnection(); + for (ServerName hsi : servers) { + HRegionInterface server = + connection.getHRegionConnection(hsi.getHostname(), hsi.getPort()); + list.add(new HRegionInterfaceWithServerName(hsi, + server)); + } + } catch (Exception e) { + throw new RuntimeException(e); } + } else { + for (JVMClusterUtil.RegionServerThread rst : + UTIL.getHBaseCluster().getRegionServerThreads()) { + if (rst.getRegionServer().isOnline()) { + list.add(new HRegionInterfaceWithServerName( + rst.getRegionServer().getServerName(),rst.getRegionServer())); + } + } } return list; } @@ -206,12 +325,96 @@ * Wait until all the regions are assigned. */ private void waitForAllRegionsAssigned() throws IOException { - while (getRegionCount() < 22) { - // while (!cluster.getMaster().allRegionsAssigned()) { - LOG.debug("Waiting for there to be 22 regions, but there are " + getRegionCount() + " right now."); + if (realCluster) { + //check that there are no regions in transition + HBaseAdmin admin = new HBaseAdmin(UTIL.getConfiguration()); + int count; + while ((count = admin.getClusterStatus().getRegionsInTransition().size()) + != 0) { + LOG.debug("Waiting for "+ count + " regions to settle down"); + try { + Thread.sleep(1000); + } catch (InterruptedException e) {} + } + } else { + while (getRegionCount() < 22) { + // while (!cluster.getMaster().allRegionsAssigned()) { + LOG.debug("Waiting for there to be 22 regions, but there are " + getRegionCount() + " right now."); + try { + Thread.sleep(1000); + } catch (InterruptedException e) {} + } + } + } + private static class HRegionInterfaceWithServerName { + ServerName serverName; + HRegionInterface hRegion; + HRegionInterfaceWithServerName(ServerName serverName, + HRegionInterface hRegion) { + this.serverName = serverName; + this.hRegion = hRegion; + } + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration c) { + this.conf = c; + } + + @Override + public int run(String[] args) throws Exception { + if (args.length != 1) { + System.err.println("Usage: java "); + return -1; + } + List hosts = new ArrayList(); + String filename = args[0]; + BufferedReader b = new BufferedReader(new FileReader(filename)); + String str; + while ((str=b.readLine()) != null) { + hosts.add(str); + } + if (hosts.size() == 0) { + System.err.println("The hostlist file is empty!"); + return -1; + } + this.realCluster = true; + this.realHosts = hosts; + this.serverManager = ServerManager.getInstanceOfManager( + ServerManager.ServerTypeName.REGIONSERVER, "/tmp"); + //try for a name that is random enough so that probability + //that it already exists is really low + String tablename = "test" + + new Random(System.currentTimeMillis()).nextInt(Integer.MAX_VALUE); + this.createTableDesc(tablename); + try { + //Killall servers + serverManager.killServer(realHosts); + //start the first server + serverManager.startServer(realHosts.get(0)); + this.testRebalanceOnRegionServerNumberChange(); + } finally { try { - Thread.sleep(1000); - } catch (InterruptedException e) {} + this.deleteTable(tablename); + } catch (Exception ex){ex.printStackTrace();}//ignore } + return 0; } + + public static void main(String args[]) throws Exception { + int status = -1; + try { + TestRegionRebalancing testClass = new TestRegionRebalancing(); + status = testClass.run(args); + } catch (Exception ex) { + ex.printStackTrace(); + LOG.error("Exiting due to " + ex); + } + System.exit(status); + } } Index: src/test/java/org/apache/hadoop/hbase/HBaseCluster.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseCluster.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/HBaseCluster.java (revision 0) @@ -0,0 +1,153 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +/** + * This class defines methods that can help with managing HBase clusters + * from unit tests and system tests. + */ +public abstract class HBaseCluster { + static final Log LOG = LogFactory.getLog(HBaseCluster.class.getName()); + private Configuration conf; + + private static String IS_LOCAL = "hbase.cluster.islocal"; + + /** + * Start a MiniHBaseCluster. + * @param conf Configuration to be used for cluster + * @param numRegionServers initial number of region servers to start. + * @throws IOException + */ + public HBaseCluster(Configuration conf, int numRegionServers) + throws IOException, InterruptedException { + this(conf, 1, numRegionServers); + } + + /** + * Start a MiniHBaseCluster. + * @param conf Configuration to be used for cluster + * @param numMasters initial number of masters to start. + * @param numRegionServers initial number of region servers to start. + * @throws IOException + */ + public HBaseCluster(Configuration conf, int numMasters, + int numRegionServers) + throws IOException, InterruptedException { + this.conf = conf; + conf.set(HConstants.MASTER_PORT, "0"); + init(conf, numMasters, numRegionServers); + } + + public static HBaseCluster getHBaseClusterInstance(Configuration conf, + int numMasters, int numRegionServers) throws IOException, InterruptedException { + if (conf.getBoolean(IS_LOCAL, true)) { + return new MiniHBaseCluster(conf, numMasters, numRegionServers); + } else { + return new RealHBaseCluster(conf, numMasters, numRegionServers); + } + } + + public Configuration getConfiguration() { + return this.conf; + } + + abstract void init(final Configuration conf, final int nMasterNodes, + final int nRegionNodes) throws IOException; + + /** + * Cause a region server to exit doing basic clean up only on its way out. + * @param serverNumber Used as index into a list. + */ + public abstract String abortRegionServer(int serverNumber); + + /** + * Wait for the specified region server to stop. Removes this thread from list + * of running threads. + * @param serverNumber + * @return Name of region server that just went down. + */ + public abstract String waitOnRegionServer(final int serverNumber); + + /** + * Cause a master to exit without shutting down entire cluster. + * @param serverNumber Used as index into a list. + */ + public abstract String abortMaster(int serverNumber); + + /** + * Wait for the specified master to stop. Removes this thread from list + * of running threads. + * @param serverNumber + * @return Name of master that just went down. + */ + public abstract String waitOnMaster(final int serverNumber); + + /** + * Blocks until there is an active master and that master has completed + * initialization. + * + * @return true if an active master becomes available. false if there are no + * masters left. + * @throws InterruptedException + */ + public abstract boolean waitForActiveAndReadyMaster() + throws InterruptedException; + + /** + * Wait for Mini HBase Cluster to shut down. + */ + public abstract void waitUntilShutDown(); + + /** + * Shut down the mini HBase cluster + * @throws IOException + */ + public abstract void shutdown() throws IOException; + + /** + * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} + * of HRS carrying regionName. Returns -1 if none found. + */ + public int getServerWithMeta() { + return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName()); + } + + /** + * Get the location of the specified region + * @param regionName Name of the region in bytes + * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} + * of HRS carrying .META.. Returns -1 if none found. + */ + public abstract int getServerWith(byte[] regionName); + + /** + * Counts the total numbers of regions being served by the currently online + * region servers by asking each how many regions they have. Does not look + * at META at all. Count includes catalog tables. + * @return number of regions being served by all region servers + */ + public abstract long countServedRegions(); +} Index: src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 1344575) +++ src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy) @@ -44,7 +44,7 @@ * if we are running on DistributedFilesystem, create a FileSystem instance * each and will close down their instance on the way out. */ -public class MiniHBaseCluster { +public class MiniHBaseCluster extends HBaseCluster { static final Log LOG = LogFactory.getLog(MiniHBaseCluster.class.getName()); private Configuration conf; public LocalHBaseCluster hbaseCluster; @@ -71,9 +71,7 @@ public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers) throws IOException, InterruptedException { - this.conf = conf; - conf.set(HConstants.MASTER_PORT, "0"); - init(numMasters, numRegionServers); + super(conf, numMasters, numRegionServers); } public Configuration getConfiguration() { @@ -179,7 +177,7 @@ } private void init(final int nMasterNodes, final int nRegionNodes) - throws IOException, InterruptedException { + throws IOException { try { // start up a LocalHBaseCluster hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0, @@ -526,4 +524,15 @@ } return count; } + + @Override + void init(Configuration conf, int nMasterNodes, int nRegionNodes) + throws IOException { + init(nMasterNodes, nRegionNodes); + } + + @Override + public void waitUntilShutDown() { + this.hbaseCluster.join(); + } } Index: src/test/java/org/apache/hadoop/hbase/RealHBaseCluster.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/RealHBaseCluster.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/RealHBaseCluster.java (revision 0) @@ -0,0 +1,322 @@ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerManager.ServerTypeName; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.ipc.HMasterInterface; +import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.regionserver.HRegionServer; + +public class RealHBaseCluster extends HBaseCluster { + private ServerManager regionServerManager; + private ServerManager masterServerManager; + private static String TMP_SPACE = "/tmp"; + private String HOST_NAMES = "host-names.xml"; + private static int DEFAULT_PORT = 60020; //fix this to look at config + List regionServers = new ArrayList(); + List masterServers = new ArrayList(); + //simple bitmap indexes + int runningRegionServers[]; + int runningMasterServers[]; + Configuration conf; + static int index = 0; + + public RealHBaseCluster(Configuration conf, int numMasters, + int numRegionServers) throws IOException, InterruptedException { + super(conf, numMasters, numRegionServers); + regionServerManager = + ServerManager.getInstanceOfManager(ServerTypeName.REGIONSERVER, + TMP_SPACE); + masterServerManager = + ServerManager.getInstanceOfManager(ServerTypeName.MASTER, + TMP_SPACE); + } + + @Override + void init(Configuration conf, int nMasterNodes, int nRegionNodes) + throws IOException { + for (ServerTypeName t : ServerTypeName.values()) { + ServerManager s = ServerManager.getInstanceOfManager(t, TMP_SPACE); + String[] servers = conf.getStrings(t.toString()); + ArrayList serverList = new ArrayList(); + for (String str : servers) { + serverList.add(str); + } + if (t.toString().equals("REGIONSERVER")) { + regionServerManager = s; + regionServers.addAll(serverList); + } else if (t.toString().equals("MASTER")) { + masterServerManager = s; + masterServers.addAll(serverList); + } + } + + for (String server : regionServers) { + //kill all servers to begin with + regionServerManager.killServer(server); + } + for (String server : masterServers) { + //kill all servers to begin with + masterServerManager.killServer(server); + } + runningMasterServers = new int[masterServers.size()]; + runningRegionServers = new int[regionServers.size()]; + //TODO: cleanup filesystem and ZooKeeper + + //start the number of master nodes and region servers as asked for + for (int i = 0; i < nMasterNodes; i++) { + masterServerManager.startServer(masterServers.get(i)); + //index into the list of masters + runningMasterServers[i] = 1; + } + for (int i = 0; i < nRegionNodes; i++) { + regionServerManager.startServer(regionServers.get(i)); + //index into the list of regionservers + runningRegionServers[i] = 1; + } + this.conf = HBaseConfiguration.create(conf); + } + + public void startRegionServer() throws IOException { + //start a new region server + int idx = -1; + for (int i = 0; i < runningRegionServers.length; i++) { + if (runningRegionServers[i] == 0) { + idx = i; + break; + } + } + if (idx == -1) { + throw new IOException("Don't have any more RS host!"); + } + + String server = regionServers.get(idx); + LOG.info("Starting RS " + server); + regionServerManager.startServer(server); + //manipulate the indexes to the global list + runningRegionServers[idx] = 1; + } + + @Override + public String abortRegionServer(int serverNumber) { + try { + String server = regionServers.get(serverNumber); + LOG.info("Aborting RS " + server); + regionServerManager.killServer(server); + } catch (IOException e) { + LOG.warn("Failed to abort region server " + e); + throw new RuntimeException(e); + } + runningRegionServers[serverNumber] = 0; + return regionServers.get(serverNumber); + } + + @Override + public String waitOnRegionServer(int serverNumber) { + throw new RuntimeException("Not implemented yet"); + } + + public void startMaster() throws IOException { + //start a new master + int idx = -1; + for (int i = 0; i < runningMasterServers.length; i++) { + if (runningRegionServers[i] == 0) { + idx = i; + break; + } + } + if (idx == -1) { + throw new IOException("Don't have any more RS host!"); + } + String server = masterServers.get(idx); + LOG.info("Starting Master " + server); + masterServerManager.startServer(server); + //manipulate the indexes to the global list + runningMasterServers[idx] = 1; + } + + public HMasterInterface getMaster() + throws ZooKeeperConnectionException, MasterNotRunningException { + HConnection conn = HConnectionManager.getConnection(conf); + return conn.getMaster(); + } + + public HMasterInterface getMasterInterface(int serverNumber) { + throw new RuntimeException("Not implemented yet"); + } + + @Override + public String abortMaster(int serverNumber) { + if (runningMasterServers[serverNumber] == 0) { + throw new RuntimeException("Master " + serverNumber + " not running!"); + } + String server = masterServers.get(serverNumber); + try { + masterServerManager.killServer(server); + } catch (IOException e) { + // TODO Auto-generated catch block + LOG.warn("Failed to kill Master " + server); + throw new RuntimeException(e); + } + return server; + } + + public String stopMaster(int serverNumber) { + if (runningMasterServers[serverNumber] == 0) { + throw new RuntimeException("Master " + serverNumber + " not running!"); + } + String server = masterServers.get(serverNumber); + try { + masterServerManager.stopServer(server); + } catch (IOException e) { + // TODO Auto-generated catch block + LOG.warn("Failed to kill Master " + server); + throw new RuntimeException(e); + } + return server; + } + + @Override + public String waitOnMaster(int serverNumber) { + throw new RuntimeException("Not implemented yet"); + } + + @Override + public boolean waitForActiveAndReadyMaster() throws InterruptedException { + while (true) { + try { + getMaster(); + return true; + } catch (MasterNotRunningException m) { + LOG.warn("Master not started yet " + m); + } catch (ZooKeeperConnectionException e) { + // TODO Auto-generated catch block + LOG.warn("Failed to connect to ZK " + e); + } + Thread.sleep(1000); + } + } + + public List getLiveMasters() { + List masters = new ArrayList(); + for (int i = 0; i < runningMasterServers.length; i++) { + if (runningMasterServers[i] == 1) { + masters.add(masterServers.get(i)); + } + } + return masters; + } + + @Override + public void waitUntilShutDown() { + //Simply wait for a few seconds for now (after issuing serverManager.kill + throw new RuntimeException("Not implemented yet"); + } + + @Override + public void shutdown() throws IOException { + // TODO Auto-generated method stub + for (int i = 0; i< runningMasterServers.length; i++) { + if (runningMasterServers[i] == 1) { + masterServerManager.killServer(masterServers.get(i)); + } + } + for (int i = 0; i< runningRegionServers.length; i++) { + if (runningRegionServers[i] == 1) { + masterServerManager.killServer(regionServers.get(i)); + } + } + } + + public List getRegionServers() { + return regionServers; + } + + public List getLiveRegionServers() { + List servers = new ArrayList(); + for (int i = 0; i < runningRegionServers.length; i++) { + if (runningRegionServers[i] == 1) { + servers.add(regionServers.get(i)); + } + } + return servers; + } + + public HRegionInterface getRegionServer(int serverNumber) { + String server = regionServers.get(serverNumber); + try { + return HConnectionManager.getConnection(conf).getHRegionConnection(server, + DEFAULT_PORT); + } catch (ZooKeeperConnectionException e) { + LOG.warn("Failed to connect to ZK " + e); + throw new RuntimeException("Failed to connect to ZK " + e); + } catch (IOException e) { + LOG.warn("Failed to get regionserver " + serverNumber + " " + e); + throw new RuntimeException("Failed to get regionserver " + + serverNumber + " " + e); + } + } + + @Override + public int getServerWith(byte[] regionName) { + for (int i = 0; i < runningRegionServers.length; i++) { + if (runningRegionServers[i] == 1) { + try { + HRegionInterface hrs = + HConnectionManager.getConnection(conf) + .getHRegionConnection(regionServers.get(i), + DEFAULT_PORT); + List regions = hrs.getOnlineRegions(); + String regionNameStr = new String(regionName); + for (HRegionInfo info : regions) { + if (info.getRegionNameAsString().equals(regionNameStr)) { + return i; + } + } + } catch (ZooKeeperConnectionException e) { + throw new RuntimeException("Failed to connect to ZK " + e); + } catch (IOException e) { + throw new RuntimeException("Failed to get server with " + + regionName + " " + e); + } + } + } + return -1; + } + + @Override + public long countServedRegions() { + int num = 0; + for (int i = 0; i < runningRegionServers.length; i++) { + if (runningRegionServers[i] == 1) { + try { + HRegionInterface hrs = + HConnectionManager.getConnection(conf) + .getHRegionConnection(regionServers.get(i), + DEFAULT_PORT); + List regions = hrs.getOnlineRegions(); + num += regions.size(); + } catch (ZooKeeperConnectionException e) { + throw new RuntimeException("Failed to connect to ZK " + e); + } catch (IOException e) { + throw new RuntimeException("Failed to get regions from " + + regionServers.get(i)); + } + } + } + return num; + } +} Index: bin/hbase =================================================================== --- bin/hbase (revision 1344575) +++ bin/hbase (working copy) @@ -305,7 +305,7 @@ # Exec unless HBASE_NOEXEC is set. if [ "${HBASE_NOEXEC}" != "" ]; then - "$JAVA" -XX:OnOutOfMemoryError="kill -9 %p" $JAVA_HEAP_MAX $HBASE_OPTS -classpath "$CLASSPATH" $CLASS "$@" + "$JAVA" -Dproc_$COMMAND {-XX:OnOutOfMemoryError="kill -9 %p" $JAVA_HEAP_MAX $HBASE_OPTS -classpath "$CLASSPATH" $CLASS "$@" else - exec "$JAVA" -XX:OnOutOfMemoryError="kill -9 %p" $JAVA_HEAP_MAX $HBASE_OPTS -classpath "$CLASSPATH" $CLASS "$@" + exec "$JAVA" -Dproc_$COMMAND -XX:OnOutOfMemoryError="kill -9 %p" $JAVA_HEAP_MAX $HBASE_OPTS -classpath "$CLASSPATH" $CLASS "$@" fi