Index: src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 941146) +++ src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy) @@ -114,6 +114,12 @@ throws IOException { super(conf); } + + public void kill() { + // Don't have hdfs shutdown on the way out. + setHDFSShutdownThreadOnExit(null); + super.kill(); + } } private void init(final int nRegionNodes) throws IOException { Index: src/test/org/apache/hadoop/hbase/master/TestMasterTransistions.java =================================================================== --- src/test/org/apache/hadoop/hbase/master/TestMasterTransistions.java (revision 941146) +++ src/test/org/apache/hadoop/hbase/master/TestMasterTransistions.java (working copy) @@ -19,15 +19,19 @@ */ package org.apache.hadoop.hbase.master; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.BindException; import java.util.Collection; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HMsg; @@ -83,6 +87,49 @@ } /** + * Test adding in a new server before old one on same host+port is dead. + * Make the test more onerous by having the server under test carry the meta. + * If confusion between old and new, purportedly meta never comes back. Test + * that meta gets redeployed. + * @throws IOException + */ + @Test public void testAddingServerBeforeOldIsDead2413() throws IOException { + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + int count = count(); + int metaIndex = cluster.getServerWithMeta(); + MiniHBaseClusterRegionServer metaHRS = + (MiniHBaseClusterRegionServer)cluster.getRegionServer(metaIndex); + int port = metaHRS.getServerInfo().getServerAddress().getPort(); + Configuration c = TEST_UTIL.getConfiguration(); + String oldPort = c.get(HConstants.REGIONSERVER_PORT, "0"); + try { + LOG.info("KILLED=" + metaHRS); + metaHRS.kill(); + c.set(HConstants.REGIONSERVER_PORT, Integer.toString(port)); + // Try and start new regionserver. It might clash with the old + // regionserver port so keep trying to get past the BindException. + HRegionServer hrs = null; + while (true) { + try { + hrs = cluster.startRegionServer().getRegionServer(); + break; + } catch (IOException e) { + if (e.getCause() != null && e.getCause() instanceof InvocationTargetException) { + InvocationTargetException ee = (InvocationTargetException)e.getCause(); + if (ee.getCause() != null && ee.getCause() instanceof BindException) { + LOG.info("BindException; retrying: " + e.toString()); + } + } + } + } + LOG.info("STARTED=" + hrs); + assertEquals(count, count()); + } finally { + c.set(HConstants.REGIONSERVER_PORT, oldPort); + } + } + + /** * HBase2482 is about outstanding region openings. If any are outstanding * when a regionserver goes down, then they'll never deploy. They'll be * stuck in the regions-in-transition list for ever. This listener looks @@ -474,6 +521,22 @@ } /* + * @return Count of rows in TABLENAME + * @throws IOException + */ + private static int count() throws IOException { + HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME); + int rows = 0; + Scan scan = new Scan(); + ResultScanner s = t.getScanner(scan); + for (Result r = null; (r = s.next()) != null;) { + rows++; + } + s.close(); + return rows; + } + + /* * @param hri * @return Start key for hri (If start key is '', then return 'aaa'. */ Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 941146) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -135,6 +135,8 @@ // debugging and unit tests. protected volatile boolean abortRequested; + private boolean killed = false; + // If false, the file system has become unavailable protected volatile boolean fsOk; @@ -630,7 +632,9 @@ hlogRoller.interruptIfNecessary(); this.majorCompactionChecker.interrupt(); - if (abortRequested) { + if (killed) { + // Just skip out w/o closing regions. + } else if (abortRequested) { if (this.fsOk) { // Only try to clean up if the file system is available try { @@ -682,9 +686,11 @@ HBaseRPC.stopProxy(this.hbaseMaster); this.hbaseMaster = null; } - - join(); - this.zooKeeperWrapper.close(); + + if (!killed) { + join(); + this.zooKeeperWrapper.close(); + } if (this.shutdownHDFS.get()) { runThread(this.hdfsShutdownThread, this.conf.getLong("hbase.dfs.shutdown.wait", 30000)); @@ -1276,6 +1282,16 @@ stop(); } + /* + * Simulate a kill -9 of this server. + * Exits w/o closing regions or cleaninup logs but it does close socket in + * case want to bring up server on old hostname+port immediately. + */ + protected void kill() { + this.killed = true; + abort(); + } + /** * Wait on all threads to finish. * Presumption is that all closes and stops have already been called. Index: src/java/org/apache/hadoop/hbase/master/ServerManager.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 941146) +++ src/java/org/apache/hadoop/hbase/master/ServerManager.java (working copy) @@ -84,9 +84,11 @@ * expires, but the server is still running. After the network is healed, * and it's server logs are recovered, it will be told to call server startup * because by then, its regions have probably been reassigned. + * + * newSetFromMap is means of making a Set out of a Map. */ protected final Set deadServers = - Collections.synchronizedSet(new HashSet()); + Collections.newSetFromMap(new ConcurrentHashMap()); /** SortedMap server load -> Set of server names */ final SortedMap> loadToServers = @@ -151,45 +153,32 @@ /** * Let the server manager know a new regionserver has come online * @param serverInfo - * @throws Leases.LeaseStillHeldException + * @throws IOException */ public void regionServerStartup(final HServerInfo serverInfo) - throws Leases.LeaseStillHeldException { + throws IOException { + // Test for case where we get a region startup message from a regionserver + // that has been quickly restarted but whose znode expiration handler has + // not yet run, or from a server whose fail we are currently processing. HServerInfo info = new HServerInfo(serverInfo); - String serverName = info.getServerName(); - if (serversToServerInfo.containsKey(serverName) || - deadServers.contains(serverName)) { - LOG.debug("Server start was rejected: " + serverInfo); - LOG.debug("serversToServerInfo.containsKey: " + serversToServerInfo.containsKey(serverName)); - LOG.debug("deadServers.contains: " + deadServers.contains(serverName)); - throw new Leases.LeaseStillHeldException(serverName); - } - - LOG.info("Received start message from: " + serverName); - // Go on to process the regionserver registration. - HServerLoad load = serversToLoad.remove(serverName); - if (load != null) { - // The startup message was from a known server. - // Remove stale information about the server's load. - synchronized (loadToServers) { - Set servers = loadToServers.get(load); - if (servers != null) { - servers.remove(serverName); - if (servers.size() > 0) - loadToServers.put(load, servers); - else - loadToServers.remove(load); - } + String hostAndPort = info.getServerAddress().toString(); + HServerInfo existingServer = + this.serverAddressToServerInfo.get(hostAndPort); + if (existingServer != null) { + LOG.info("Server start rejected; we already have " + hostAndPort + + " registered; existingServer=" + existingServer + ", newServer=" + info); + if (existingServer.getStartCode() < info.getStartCode()) { + LOG.info("Triggering server recovery; existingServer looks stale"); + expireServer(existingServer); } + throw new Leases.LeaseStillHeldException(hostAndPort); } - HServerInfo storedInfo = serversToServerInfo.remove(serverName); - if (storedInfo != null && !master.closed.get()) { - // The startup message was from a known server with the same name. - // Timeout the old one right away. - master.getRootRegionLocation(); - RegionServerOperation op = new ProcessServerShutdown(master, storedInfo); - this.master.getRegionServerOperationQueue().put(op); + if (isDead(hostAndPort, true)) { + LOG.debug("Server start rejected; currently processing " + hostAndPort + + " failure"); + throw new Leases.LeaseStillHeldException(hostAndPort); } + LOG.info("Received start message from: " + info.getServerName()); recordNewServer(info); } @@ -214,7 +203,7 @@ info.setLoad(load); // We must set this watcher here because it can be set on a fresh start // or on a failover - Watcher watcher = new ServerExpirer(serverName, info.getServerAddress()); + Watcher watcher = new ServerExpirer(new HServerInfo(info)); master.getZooKeeperWrapper().updateRSLocationGetWatch(info, watcher); serversToServerInfo.put(serverName, info); serverAddressToServerInfo.put(info.getServerAddress(), info); @@ -313,7 +302,6 @@ synchronized (serversToServerInfo) { removeServerInfo(info.getServerName(), info.getServerAddress()); - serversToServerInfo.notifyAll(); } return new HMsg[] {REGIONSERVER_STOP}; @@ -332,32 +320,29 @@ assert msgs[0].getType() == Type.MSG_REPORT_EXITING; synchronized (serversToServerInfo) { - try { - // This method removes ROOT/META from the list and marks them to be reassigned - // in addition to other housework. - if (removeServerInfo(serverInfo.getServerName(), - serverInfo.getServerAddress())) { - // Only process the exit message if the server still has registered info. - // Otherwise we could end up processing the server exit twice. - LOG.info("Region server " + serverInfo.getServerName() + - ": MSG_REPORT_EXITING"); - // Get all the regions the server was serving reassigned - // (if we are not shutting down). - if (!master.closed.get()) { - for (int i = 1; i < msgs.length; i++) { - LOG.info("Processing " + msgs[i] + " from " + - serverInfo.getServerName()); - assert msgs[i].getType() == Type.MSG_REGION_CLOSE; - HRegionInfo info = msgs[i].getRegionInfo(); - // Meta/root region offlining is handed in removeServerInfo above. - if (!info.isMetaRegion()) { - synchronized (master.regionManager) { - if (!master.regionManager.isOfflined( - info.getRegionNameAsString())) { - master.regionManager.setUnassigned(info, true); - } else { - master.regionManager.removeRegion(info); - } + // This method removes ROOT/META from the list and marks them to be reassigned + // in addition to other housework. + if (removeServerInfo(serverInfo.getServerName(), + serverInfo.getServerAddress())) { + // Only process the exit message if the server still has registered info. + // Otherwise we could end up processing the server exit twice. + LOG.info("Region server " + serverInfo.getServerName() + + ": MSG_REPORT_EXITING"); + // Get all the regions the server was serving reassigned + // (if we are not shutting down). + if (!master.closed.get()) { + for (int i = 1; i < msgs.length; i++) { + LOG.info("Processing " + msgs[i] + " from " + + serverInfo.getServerName()); + HRegionInfo info = msgs[i].getRegionInfo(); + // Meta/root region offlining is handed in removeServerInfo above. + if (!info.isMetaRegion()) { + synchronized (master.regionManager) { + if (!master.regionManager.isOfflined( + info.getRegionNameAsString())) { + master.regionManager.setUnassigned(info, true); + } else { + master.regionManager.removeRegion(info); } } } @@ -375,10 +360,6 @@ master.regionManager.setUnassigned(entry.getValue().getRegionInfo(), true); } } - // We don't need to return anything to the server because it isn't - // going to do any more work. - } finally { - serversToServerInfo.notifyAll(); } } } @@ -824,44 +805,57 @@ /** Watcher triggered when a RS znode is deleted */ private class ServerExpirer implements Watcher { - private String server; - private HServerAddress serverAddress; + private HServerInfo server; - ServerExpirer(String server, HServerAddress serverAddress) { - this.server = server; - this.serverAddress = serverAddress; + ServerExpirer(final HServerInfo hsi) { + this.server = hsi; } public void process(WatchedEvent event) { - if (event.getType().equals(EventType.NodeDeleted)) { - LOG.info(server + " znode expired"); - // Remove the server from the known servers list and update load info - serverAddressToServerInfo.remove(serverAddress); - HServerInfo info = serversToServerInfo.remove(server); - if (info != null) { - String serverName = info.getServerName(); - HServerLoad load = serversToLoad.remove(serverName); - if (load != null) { - synchronized (loadToServers) { - Set servers = loadToServers.get(load); - if (servers != null) { - servers.remove(serverName); - if(servers.size() > 0) - loadToServers.put(load, servers); - else - loadToServers.remove(load); - } - } - } - deadServers.add(server); - RegionServerOperation op = new ProcessServerShutdown(master, info); - master.getRegionServerOperationQueue().put(op); + if (!event.getType().equals(EventType.NodeDeleted)) return; + LOG.info(this.server.getServerName() + " znode expired"); + expireServer(this.server); + } + } + + /* + * Expire the passed server. Add it to list of deadservers and queue a + * shutdown processing. + * @param server Servername (format is host_port_startcode) + * @param serverAddress + */ + private synchronized void expireServer(final HServerInfo hsi) { + // First check a server to expire. + String serverName = hsi.getServerName(); + HServerInfo info = this.serversToServerInfo.get(serverName); + if (info == null) { + LOG.warn("No HServerInfo for " + serverName); + return; + } + if (this.deadServers.contains(serverName)) { + LOG.warn("Already processing shutdown of " + serverName); + return; + } + // Remove the server from the known servers lists and update load info + this.serverAddressToServerInfo.remove(serverName); + this.serversToServerInfo.remove(serverName); + HServerLoad load = this.serversToLoad.remove(serverName); + if (load != null) { + synchronized (this.loadToServers) { + Set servers = this.loadToServers.get(load); + if (servers != null) { + servers.remove(serverName); + if (servers.size() > 0) + this.loadToServers.put(load, servers); + else + this.loadToServers.remove(load); } - synchronized (serversToServerInfo) { - serversToServerInfo.notifyAll(); - } } } + // Add to dead servers and queue a shutdown processing. + this.deadServers.add(serverName); + this.master.getRegionServerOperationQueue(). + put(new ProcessServerShutdown(master, info)); } /** @@ -875,10 +869,25 @@ * @param serverName * @return true if server is dead */ - public boolean isDead(String serverName) { - return deadServers.contains(serverName); + public boolean isDead(final String serverName) { + return isDead(serverName, false); } + /** + * @param serverName Servername as either host:port or host_port_startcode. + * @param hostAndPortOnly True if serverName is host and + * port only (host:port) and if so, then we do a prefix compare (ignoring + * start codes) looking for dead server. + * @return true if server is dead + */ + boolean isDead(final String serverName, final boolean hostAndPortOnly) { + if (!hostAndPortOnly) return deadServers.contains(serverName); + for (String hostPortStartCode: this.deadServers) { + if (hostPortStartCode.startsWith(serverName)) return true; + } + return false; + } + public boolean canAssignUserRegions() { if (minimumServerCount == 0) { return true; @@ -889,4 +898,4 @@ public void setMinimumServerCount(int minimumServerCount) { this.minimumServerCount = minimumServerCount; } -} \ No newline at end of file +}