Index: src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 941538) +++ src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy) @@ -29,6 +29,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -110,10 +111,32 @@ * Subclass so can get at protected methods (none at moment). */ public static class MiniHBaseClusterRegionServer extends HRegionServer { + private FileSystem fs = null; + public MiniHBaseClusterRegionServer(HBaseConfiguration conf) throws IOException { super(conf); } + + @Override + protected FileSystem getFileSystem() { + // TODO Auto-generated method stub + return this.fs != null? this.fs: super.getFileSystem(); + } + + /** + * Set alternate filesystem. Must call before regionserver is started. + * @param fs + */ + public void setAlternateFileSystemInstance(final FileSystem fs) { + this.fs = fs; + } + + public void kill() { + // Don't have hdfs shutdown on the way out. + setHDFSShutdownThreadOnExit(null); + super.kill(); + } } private void init(final int nRegionNodes) throws IOException { Index: src/test/org/apache/hadoop/hbase/master/TestMasterTransistions.java =================================================================== --- src/test/org/apache/hadoop/hbase/master/TestMasterTransistions.java (revision 941538) +++ src/test/org/apache/hadoop/hbase/master/TestMasterTransistions.java (working copy) @@ -19,15 +19,19 @@ */ package org.apache.hadoop.hbase.master; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.BindException; import java.util.Collection; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HMsg; @@ -68,6 +72,7 @@ * @throws Exception */ @BeforeClass public static void beforeAllTests() throws Exception { + TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true); // Start a cluster of two regionservers. TEST_UTIL.startMiniCluster(2); // Create a table of three families. This will assign a region. @@ -83,6 +88,55 @@ } /** + * Test adding in a new server before old one on same host+port is dead. + * Make the test more onerous by having the server under test carry the meta. + * If confusion between old and new, purportedly meta never comes back. Test + * that meta gets redeployed. + * @throws IOException + */ + @Test public void testAddingServerBeforeOldIsDead2413() throws IOException { + LOG.info("Running testAddingServerBeforeOldIsDead2413"); + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + int count = count(); + int metaIndex = cluster.getServerWithMeta(); + MiniHBaseClusterRegionServer metaHRS = + (MiniHBaseClusterRegionServer)cluster.getRegionServer(metaIndex); + int port = metaHRS.getServerInfo().getServerAddress().getPort(); + Configuration c = TEST_UTIL.getConfiguration(); + String oldPort = c.get(HConstants.REGIONSERVER_PORT, "0"); + try { + LOG.info("KILLED=" + metaHRS); + metaHRS.kill(); + c.set(HConstants.REGIONSERVER_PORT, Integer.toString(port)); + // Try and start new regionserver. It might clash with the old + // regionserver port so keep trying to get past the BindException. + HRegionServer hrs = null; + while (true) { + try { + hrs = cluster.startRegionServer().getRegionServer(); + break; + } catch (IOException e) { + if (e.getCause() != null && e.getCause() instanceof InvocationTargetException) { + InvocationTargetException ee = (InvocationTargetException)e.getCause(); + if (ee.getCause() != null && ee.getCause() instanceof BindException) { + LOG.info("BindException; retrying: " + e.toString()); + } + } + } + } + LOG.info("STARTED=" + hrs); + // Wait until he's been given at least 3 regions before we go on to try + // and count rows in table. + while (hrs.getOnlineRegions().size() > 3) Threads.sleep(100); + LOG.info(hrs.toString() + " has " + hrs.getOnlineRegions().size() + + " regions"); + assertEquals(count, count()); + } finally { + c.set(HConstants.REGIONSERVER_PORT, oldPort); + } + } + + /** * HBase2482 is about outstanding region openings. If any are outstanding * when a regionserver goes down, then they'll never deploy. They'll be * stuck in the regions-in-transition list for ever. This listener looks @@ -165,6 +219,7 @@ * @see HBASE-2482 */ @Test public void testKillRSWithOpeningRegion2482() throws Exception { + LOG.info("Running testKillRSWithOpeningRegion2482"); MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); // Count how many regions are online. They need to be all back online for // this test to succeed. @@ -336,6 +391,7 @@ * @see HBASE-2428 */ @Test public void testRegionCloseWhenNoMetaHBase2428() throws Exception { + LOG.info("Running testRegionCloseWhenNoMetaHBase2428"); MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); final HMaster master = cluster.getMaster(); int metaIndex = cluster.getServerWithMeta(); @@ -474,6 +530,23 @@ } /* + * @return Count of rows in TABLENAME + * @throws IOException + */ + private static int count() throws IOException { + HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME); + int rows = 0; + Scan scan = new Scan(); + ResultScanner s = t.getScanner(scan); + for (Result r = null; (r = s.next()) != null;) { + rows++; + } + s.close(); + LOG.info("Counted=" + rows); + return rows; + } + + /* * @param hri * @return Start key for hri (If start key is '', then return 'aaa'. */ Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 941538) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -135,6 +135,8 @@ // debugging and unit tests. protected volatile boolean abortRequested; + private boolean killed = false; + // If false, the file system has become unavailable protected volatile boolean fsOk; @@ -630,7 +632,9 @@ hlogRoller.interruptIfNecessary(); this.majorCompactionChecker.interrupt(); - if (abortRequested) { + if (killed) { + // Just skip out w/o closing regions. + } else if (abortRequested) { if (this.fsOk) { // Only try to clean up if the file system is available try { @@ -682,9 +686,11 @@ HBaseRPC.stopProxy(this.hbaseMaster); this.hbaseMaster = null; } - - join(); - this.zooKeeperWrapper.close(); + + if (!killed) { + join(); + this.zooKeeperWrapper.close(); + } if (this.shutdownHDFS.get()) { runThread(this.hdfsShutdownThread, this.conf.getLong("hbase.dfs.shutdown.wait", 30000)); @@ -789,7 +795,7 @@ // accessors will be going against wrong filesystem (unless all is set // to defaults). this.conf.set("fs.default.name", this.conf.get("hbase.rootdir")); - this.fs = FileSystem.get(this.conf); + this.fs = getFileSystem(this.conf); // Register shutdown hook for HRegionServer, runs an orderly shutdown // when a kill signal is recieved @@ -812,6 +818,17 @@ } /* + * Method so subclass can put in place an alternate filesystem instance. + * @param c + * @return FileSystem to use. + * @throws IOException + */ + protected FileSystem getFileSystem(final HBaseConfiguration c) + throws IOException { + return FileSystem.get(c); + } + + /* * @param r Region to get RegionLoad for. * @return RegionLoad instance. * @throws IOException @@ -1276,6 +1293,16 @@ stop(); } + /* + * Simulate a kill -9 of this server. + * Exits w/o closing regions or cleaninup logs but it does close socket in + * case want to bring up server on old hostname+port immediately. + */ + protected void kill() { + this.killed = true; + abort(); + } + /** * Wait on all threads to finish. * Presumption is that all closes and stops have already been called. Index: src/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java (revision 941538) +++ src/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java (working copy) @@ -241,6 +243,8 @@ // Get regions reassigned for (HRegionInfo info: regions) { + LOG.debug("Setting " + info.getRegionNameAsString() + + " to unassigned in shutdown processor"); master.regionManager.setUnassigned(info, true); } } @@ -272,13 +276,12 @@ public Boolean call() throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("process server shutdown scanning " + + LOG.debug("Process server shutdown scanning " + Bytes.toString(m.getRegionName()) + " on " + m.getServer()); } Scan scan = new Scan(); scan.addFamily(CATALOG_FAMILY); - long scannerId = server.openScanner( - m.getRegionName(), scan); + long scannerId = server.openScanner(m.getRegionName(), scan); scanMetaRegion(server, scannerId, m.getRegionName()); return true; } @@ -383,4 +386,4 @@ protected int getPriority() { return 2; // high but not highest priority } -} +} \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/master/ServerManager.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 941538) +++ src/java/org/apache/hadoop/hbase/master/ServerManager.java (working copy) @@ -84,9 +84,11 @@ * expires, but the server is still running. After the network is healed, * and it's server logs are recovered, it will be told to call server startup * because by then, its regions have probably been reassigned. + * + * newSetFromMap is means of making a Set out of a Map. */ protected final Set deadServers = - Collections.synchronizedSet(new HashSet()); + Collections.newSetFromMap(new ConcurrentHashMap()); /** SortedMap server load -> Set of server names */ final SortedMap> loadToServers = @@ -151,45 +153,32 @@ /** * Let the server manager know a new regionserver has come online * @param serverInfo - * @throws Leases.LeaseStillHeldException + * @throws IOException */ public void regionServerStartup(final HServerInfo serverInfo) - throws Leases.LeaseStillHeldException { + throws IOException { + // Test for case where we get a region startup message from a regionserver + // that has been quickly restarted but whose znode expiration handler has + // not yet run, or from a server whose fail we are currently processing. HServerInfo info = new HServerInfo(serverInfo); - String serverName = info.getServerName(); - if (serversToServerInfo.containsKey(serverName) || - deadServers.contains(serverName)) { - LOG.debug("Server start was rejected: " + serverInfo); - LOG.debug("serversToServerInfo.containsKey: " + serversToServerInfo.containsKey(serverName)); - LOG.debug("deadServers.contains: " + deadServers.contains(serverName)); - throw new Leases.LeaseStillHeldException(serverName); - } - - LOG.info("Received start message from: " + serverName); - // Go on to process the regionserver registration. - HServerLoad load = serversToLoad.remove(serverName); - if (load != null) { - // The startup message was from a known server. - // Remove stale information about the server's load. - synchronized (loadToServers) { - Set servers = loadToServers.get(load); - if (servers != null) { - servers.remove(serverName); - if (servers.size() > 0) - loadToServers.put(load, servers); - else - loadToServers.remove(load); - } + String hostAndPort = info.getServerAddress().toString(); + HServerInfo existingServer = + this.serverAddressToServerInfo.get(info.getServerAddress()); + if (existingServer != null) { + LOG.info("Server start rejected; we already have " + hostAndPort + + " registered; existingServer=" + existingServer + ", newServer=" + info); + if (existingServer.getStartCode() < info.getStartCode()) { + LOG.info("Triggering server recovery; existingServer looks stale"); + expireServer(existingServer); } + throw new Leases.LeaseStillHeldException(hostAndPort); } - HServerInfo storedInfo = serversToServerInfo.remove(serverName); - if (storedInfo != null && !master.closed.get()) { - // The startup message was from a known server with the same name. - // Timeout the old one right away. - master.getRootRegionLocation(); - RegionServerOperation op = new ProcessServerShutdown(master, storedInfo); - this.master.getRegionServerOperationQueue().put(op); + if (isDead(hostAndPort, true)) { + LOG.debug("Server start rejected; currently processing " + hostAndPort + + " failure"); + throw new Leases.LeaseStillHeldException(hostAndPort); } + LOG.info("Received start message from: " + info.getServerName()); recordNewServer(info); } @@ -214,7 +203,7 @@ info.setLoad(load); // We must set this watcher here because it can be set on a fresh start // or on a failover - Watcher watcher = new ServerExpirer(serverName, info.getServerAddress()); + Watcher watcher = new ServerExpirer(new HServerInfo(info)); master.getZooKeeperWrapper().updateRSLocationGetWatch(info, watcher); serversToServerInfo.put(serverName, info); serverAddressToServerInfo.put(info.getServerAddress(), info); @@ -313,7 +302,6 @@ synchronized (serversToServerInfo) { removeServerInfo(info.getServerName(), info.getServerAddress()); - serversToServerInfo.notifyAll(); } return new HMsg[] {REGIONSERVER_STOP}; @@ -330,57 +318,47 @@ */ private void processRegionServerExit(HServerInfo serverInfo, HMsg[] msgs) { assert msgs[0].getType() == Type.MSG_REPORT_EXITING; - synchronized (serversToServerInfo) { - try { - // This method removes ROOT/META from the list and marks them to be reassigned - // in addition to other housework. - if (removeServerInfo(serverInfo.getServerName(), - serverInfo.getServerAddress())) { - // Only process the exit message if the server still has registered info. - // Otherwise we could end up processing the server exit twice. - LOG.info("Region server " + serverInfo.getServerName() + - ": MSG_REPORT_EXITING"); - // Get all the regions the server was serving reassigned - // (if we are not shutting down). - if (!master.closed.get()) { - for (int i = 1; i < msgs.length; i++) { - LOG.info("Processing " + msgs[i] + " from " + - serverInfo.getServerName()); - assert msgs[i].getType() == Type.MSG_REGION_CLOSE; - HRegionInfo info = msgs[i].getRegionInfo(); - // Meta/root region offlining is handed in removeServerInfo above. - if (!info.isMetaRegion()) { - synchronized (master.regionManager) { - if (!master.regionManager.isOfflined( - info.getRegionNameAsString())) { - master.regionManager.setUnassigned(info, true); - } else { - master.regionManager.removeRegion(info); - } + // This method removes ROOT/META from the list and marks them to be + // reassigned in addition to other housework. + if (removeServerInfo(serverInfo.getServerName(), serverInfo.getServerAddress())) { + // Only process the exit message if the server still has registered info. + // Otherwise we could end up processing the server exit twice. + LOG.info("Region server " + serverInfo.getServerName() + + ": MSG_REPORT_EXITING"); + // Get all the regions the server was serving reassigned + // (if we are not shutting down). + if (!master.closed.get()) { + for (int i = 1; i < msgs.length; i++) { + LOG.info("Processing " + msgs[i] + " from " + + serverInfo.getServerName()); + assert msgs[i].getType() == Type.MSG_REGION_CLOSE; + HRegionInfo info = msgs[i].getRegionInfo(); + // Meta/root region offlining is handed in removeServerInfo above. + if (!info.isMetaRegion()) { + synchronized (master.regionManager) { + if (!master.regionManager.isOfflined(info.getRegionNameAsString())) { + master.regionManager.setUnassigned(info, true); + } else { + master.regionManager.removeRegion(info); } } } } - - // There should not be any regions in transition for this server - the - // server should finish transitions itself before closing - Map inTransition = - master.regionManager.getRegionsInTransitionOnServer( - serverInfo.getServerName()); - for (Map.Entry entry : inTransition.entrySet()) { - LOG.warn("Region server " + serverInfo.getServerName() + - " shut down with region " + entry.getKey() + " in transition " + - "state " + entry.getValue()); - master.regionManager.setUnassigned(entry.getValue().getRegionInfo(), true); - } } - // We don't need to return anything to the server because it isn't - // going to do any more work. - } finally { - serversToServerInfo.notifyAll(); + // There should not be any regions in transition for this server - the + // server should finish transitions itself before closing + Map inTransition = master.regionManager + .getRegionsInTransitionOnServer(serverInfo.getServerName()); + for (Map.Entry entry : inTransition.entrySet()) { + LOG.warn("Region server " + serverInfo.getServerName() + + " shut down with region " + entry.getKey() + " in transition " + + "state " + entry.getValue()); + master.regionManager.setUnassigned(entry.getValue().getRegionInfo(), + true); + } } - } + } } /** @@ -824,44 +802,59 @@ /** Watcher triggered when a RS znode is deleted */ private class ServerExpirer implements Watcher { - private String server; - private HServerAddress serverAddress; + private HServerInfo server; - ServerExpirer(String server, HServerAddress serverAddress) { - this.server = server; - this.serverAddress = serverAddress; + ServerExpirer(final HServerInfo hsi) { + this.server = hsi; } public void process(WatchedEvent event) { - if (event.getType().equals(EventType.NodeDeleted)) { - LOG.info(server + " znode expired"); - // Remove the server from the known servers list and update load info - serverAddressToServerInfo.remove(serverAddress); - HServerInfo info = serversToServerInfo.remove(server); - if (info != null) { - String serverName = info.getServerName(); - HServerLoad load = serversToLoad.remove(serverName); - if (load != null) { - synchronized (loadToServers) { - Set servers = loadToServers.get(load); - if (servers != null) { - servers.remove(serverName); - if(servers.size() > 0) - loadToServers.put(load, servers); - else - loadToServers.remove(load); - } - } - } - deadServers.add(server); - RegionServerOperation op = new ProcessServerShutdown(master, info); - master.getRegionServerOperationQueue().put(op); + if (!event.getType().equals(EventType.NodeDeleted)) return; + LOG.info(this.server.getServerName() + " znode expired"); + expireServer(this.server); + } + } + + /* + * Expire the passed server. Add it to list of deadservers and queue a + * shutdown processing. + * @param server Servername (format is host_port_startcode) + * @param serverAddress + */ + private synchronized void expireServer(final HServerInfo hsi) { + // First check a server to expire. + String serverName = hsi.getServerName(); + HServerInfo info = this.serversToServerInfo.get(serverName); + if (info == null) { + LOG.warn("No HServerInfo for " + serverName); + return; + } + if (this.deadServers.contains(serverName)) { + LOG.warn("Already processing shutdown of " + serverName); + return; + } + // Remove the server from the known servers lists and update load info + this.serverAddressToServerInfo.remove(info.getServerAddress()); + this.serversToServerInfo.remove(serverName); + HServerLoad load = this.serversToLoad.remove(serverName); + if (load != null) { + synchronized (this.loadToServers) { + Set servers = this.loadToServers.get(load); + if (servers != null) { + servers.remove(serverName); + if (servers.size() > 0) + this.loadToServers.put(load, servers); + else + this.loadToServers.remove(load); } - synchronized (serversToServerInfo) { - serversToServerInfo.notifyAll(); - } } } + // Add to dead servers and queue a shutdown processing. + LOG.debug("Added=" + serverName + + " to dead servers, added shutdown processing operation"); + this.deadServers.add(serverName); + this.master.getRegionServerOperationQueue(). + put(new ProcessServerShutdown(master, info)); } /** @@ -875,10 +868,25 @@ * @param serverName * @return true if server is dead */ - public boolean isDead(String serverName) { - return deadServers.contains(serverName); + public boolean isDead(final String serverName) { + return isDead(serverName, false); } + /** + * @param serverName Servername as either host:port or host_port_startcode. + * @param hostAndPortOnly True if serverName is host and + * port only (host:port) and if so, then we do a prefix compare (ignoring + * start codes) looking for dead server. + * @return true if server is dead + */ + boolean isDead(final String serverName, final boolean hostAndPortOnly) { + if (!hostAndPortOnly) return deadServers.contains(serverName); + for (String hostPortStartCode: this.deadServers) { + if (hostPortStartCode.startsWith(serverName)) return true; + } + return false; + } + public boolean canAssignUserRegions() { if (minimumServerCount == 0) { return true; @@ -889,4 +897,4 @@ public void setMinimumServerCount(int minimumServerCount) { this.minimumServerCount = minimumServerCount; } -} \ No newline at end of file +} Index: src/java/org/apache/hadoop/hbase/master/RegionManager.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/RegionManager.java (revision 941538) +++ src/java/org/apache/hadoop/hbase/master/RegionManager.java (working copy) @@ -1434,7 +1434,7 @@ } // check if current server is overloaded - int numRegionsToClose = balanceFromOverloaded(servLoad, avg); + int numRegionsToClose = balanceFromOverloaded(info, servLoad, avg); // check if we can unload server by low loaded servers if(numRegionsToClose <= 0) { @@ -1456,13 +1456,14 @@ * Check if server load is not overloaded (with load > avgLoadPlusSlop). * @return number of regions to unassign. */ - private int balanceFromOverloaded(HServerLoad srvLoad, double avgLoad) { + private int balanceFromOverloaded(final HServerInfo info, + final HServerLoad srvLoad, final double avgLoad) { int avgLoadPlusSlop = (int)Math.ceil(avgLoad * (1 + this.slop)); int numSrvRegs = srvLoad.getNumberOfRegions(); if (numSrvRegs > avgLoadPlusSlop) { if (LOG.isDebugEnabled()) { - LOG.debug("Server is overloaded: load=" + numSrvRegs + - ", avg=" + avgLoad + ", slop=" + this.slop); + LOG.debug("Server " + info.getServerName() + " is overloaded: load=" + + numSrvRegs + ", avg=" + avgLoad + ", slop=" + this.slop); } return numSrvRegs - (int)Math.ceil(avgLoad); } Index: src/java/org/apache/hadoop/hbase/master/RegionServerOperation.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/RegionServerOperation.java (revision 941538) +++ src/java/org/apache/hadoop/hbase/master/RegionServerOperation.java (working copy) @@ -39,7 +39,9 @@ protected RegionServerOperation(HMaster master) { this.master = master; this.numRetries = master.numRetries; - this.expirationDuration = this.master.leaseTimeout/2; + // this.master.leaseTimeout is 120 by default. 120/10 is a long time to + // wait on something coming around again. Set a max of 10 seconds to wait. + this.expirationDuration = Math.min(this.master.leaseTimeout/10, 10); resetExpiration(); }