Index: src/java/org/apache/hadoop/hbase/master/ServerManager.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 933350) +++ src/java/org/apache/hadoop/hbase/master/ServerManager.java (working copy) @@ -83,9 +83,11 @@ * expires, but the server is still running. After the network is healed, * and it's server logs are recovered, it will be told to call server startup * because by then, its regions have probably been reassigned. + * + * newSetFromMap is means of making a Set out of a Map. */ protected final Set deadServers = - Collections.synchronizedSet(new HashSet()); + Collections.newSetFromMap(new ConcurrentHashMap()); /** SortedMap server load -> Set of server names */ final SortedMap> loadToServers = @@ -150,52 +152,35 @@ /** * Let the server manager know a new regionserver has come online * @param serverInfo - * @throws Leases.LeaseStillHeldException + * @throws IOException */ public void regionServerStartup(final HServerInfo serverInfo) - throws Leases.LeaseStillHeldException { + throws IOException { + // Test for case where we get a region startup message from a regionserver + // that has been quickly restarted but whose znode expiration handler has + // not yet run, or from a server whose fail we are currently processing. HServerInfo info = new HServerInfo(serverInfo); - String serverName = info.getServerName(); - if (serversToServerInfo.containsKey(serverName) || - deadServers.contains(serverName)) { - LOG.debug("Server start was rejected: " + serverInfo); - LOG.debug("serversToServerInfo.containsKey: " + serversToServerInfo.containsKey(serverName)); - LOG.debug("deadServers.contains: " + deadServers.contains(serverName)); - throw new Leases.LeaseStillHeldException(serverName); - } - - LOG.info("Received start message from: " + serverName); - // Go on to process the regionserver registration. - HServerLoad load = serversToLoad.remove(serverName); - if (load != null) { - // The startup message was from a known server. - // Remove stale information about the server's load. - synchronized (loadToServers) { - Set servers = loadToServers.get(load); - if (servers != null) { - servers.remove(serverName); - if (servers.size() > 0) - loadToServers.put(load, servers); - else - loadToServers.remove(load); - } + String hostAndPort = info.getServerAddress().toString(); + HServerInfo existingServer = + this.serverAddressToServerInfo.get(hostAndPort); + if (existingServer != null) { + LOG.info("Server start rejected; we already have " + hostAndPort + + " registered; existingServer=" + existingServer + ", newServer=" + info); + if (existingServer.getStartCode() < info.getStartCode()) { + LOG.info("Triggering server recovery; existingServer looks stale"); + expireServer(existingServer); } + throw new Leases.LeaseStillHeldException(hostAndPort); } - HServerInfo storedInfo = serversToServerInfo.remove(serverName); - if (storedInfo != null && !master.closed.get()) { - // The startup message was from a known server with the same name. - // Timeout the old one right away. - master.getRootRegionLocation(); - try { - master.toDoQueue.put(new ProcessServerShutdown(master, storedInfo)); - } catch (InterruptedException e) { - LOG.error("Insertion into toDoQueue was interrupted", e); - } + if (isDead(hostAndPort, true)) { + LOG.debug("Server start rejected; currently processing " + hostAndPort + + " failure"); + throw new Leases.LeaseStillHeldException(hostAndPort); } + LOG.info("Received start message from: " + info.getServerName()); recordNewServer(info); } - /** * Adds the HSI to the RS list and creates an empty load * @param info The region server informations @@ -216,7 +201,7 @@ info.setLoad(load); // We must set this watcher here because it can be set on a fresh start // or on a failover - Watcher watcher = new ServerExpirer(serverName, info.getServerAddress()); + Watcher watcher = new ServerExpirer(new HServerInfo(info)); master.getZooKeeperWrapper().updateRSLocationGetWatch(info, watcher); serversToServerInfo.put(serverName, info); serverAddressToServerInfo.put(info.getServerAddress(), info); @@ -313,7 +298,6 @@ synchronized (serversToServerInfo) { removeServerInfo(info.getServerName(), info.getServerAddress()); - serversToServerInfo.notifyAll(); } return new HMsg[] {REGIONSERVER_STOP}; @@ -325,42 +309,36 @@ /** Region server is exiting */ private void processRegionServerExit(HServerInfo serverInfo, HMsg[] msgs) { synchronized (serversToServerInfo) { - try { - // This method removes ROOT/META from the list and marks them to be reassigned - // in addition to other housework. - if (removeServerInfo(serverInfo.getServerName(), - serverInfo.getServerAddress())) { - // Only process the exit message if the server still has registered info. - // Otherwise we could end up processing the server exit twice. - LOG.info("Region server " + serverInfo.getServerName() + - ": MSG_REPORT_EXITING"); - // Get all the regions the server was serving reassigned - // (if we are not shutting down). - if (!master.closed.get()) { - for (int i = 1; i < msgs.length; i++) { - LOG.info("Processing " + msgs[i] + " from " + - serverInfo.getServerName()); - HRegionInfo info = msgs[i].getRegionInfo(); - // Meta/root region offlining is handed in removeServerInfo above. - if (!info.isMetaRegion()) { - synchronized (master.regionManager) { - if (!master.regionManager.isOfflined( - info.getRegionNameAsString())) { - master.regionManager.setUnassigned(info, true); - } else { - master.regionManager.removeRegion(info); - } + // This method removes ROOT/META from the list and marks them to be reassigned + // in addition to other housework. + if (removeServerInfo(serverInfo.getServerName(), + serverInfo.getServerAddress())) { + // Only process the exit message if the server still has registered info. + // Otherwise we could end up processing the server exit twice. + LOG.info("Region server " + serverInfo.getServerName() + + ": MSG_REPORT_EXITING"); + // Get all the regions the server was serving reassigned + // (if we are not shutting down). + if (!master.closed.get()) { + for (int i = 1; i < msgs.length; i++) { + LOG.info("Processing " + msgs[i] + " from " + + serverInfo.getServerName()); + HRegionInfo info = msgs[i].getRegionInfo(); + // Meta/root region offlining is handed in removeServerInfo above. + if (!info.isMetaRegion()) { + synchronized (master.regionManager) { + if (!master.regionManager.isOfflined( + info.getRegionNameAsString())) { + master.regionManager.setUnassigned(info, true); + } else { + master.regionManager.removeRegion(info); } } } } } - // We don't need to return anything to the server because it isn't - // going to do any more work. - } finally { - serversToServerInfo.notifyAll(); } - } + } } /** @@ -818,49 +796,62 @@ /** Watcher triggered when a RS znode is deleted */ private class ServerExpirer implements Watcher { - private String server; - private HServerAddress serverAddress; + private HServerInfo server; - ServerExpirer(String server, HServerAddress serverAddress) { - this.server = server; - this.serverAddress = serverAddress; + ServerExpirer(final HServerInfo hsi) { + this.server = hsi; } public void process(WatchedEvent event) { - if(event.getType().equals(EventType.NodeDeleted)) { - LOG.info(server + " znode expired"); - // Remove the server from the known servers list and update load info - serverAddressToServerInfo.remove(serverAddress); - HServerInfo info = serversToServerInfo.remove(server); - if (info != null) { - String serverName = info.getServerName(); - HServerLoad load = serversToLoad.remove(serverName); - if (load != null) { - synchronized (loadToServers) { - Set servers = loadToServers.get(load); - if (servers != null) { - servers.remove(serverName); - if(servers.size() > 0) - loadToServers.put(load, servers); - else - loadToServers.remove(load); - } - } - } - deadServers.add(server); - try { - master.toDoQueue.put(new ProcessServerShutdown(master, info)); - } catch (InterruptedException e) { - LOG.error("insert into toDoQueue was interrupted", e); - } + if (!event.getType().equals(EventType.NodeDeleted)) return; + LOG.info(this.server.getServerName() + " znode expired"); + expireServer(this.server); + } + } + + /* + * Expire the passed server. Add it to list of deadservers and queue a + * shutdown processing. + * @param server Servername (format is host_port_startcode) + * @param serverAddress + */ + private synchronized void expireServer(final HServerInfo hsi) { + // First check a server to expire. + String serverName = hsi.getServerName(); + HServerInfo info = this.serversToServerInfo.get(serverName); + if (info == null) { + LOG.warn("No HServerInfo for " + serverName); + return; + } + if (this.deadServers.contains(serverName)) { + LOG.warn("Already processing shutdown of " + serverName); + return; + } + // Remove the server from the known servers lists and update load info + this.serverAddressToServerInfo.remove(serverName); + this.serversToServerInfo.remove(serverName); + HServerLoad load = this.serversToLoad.remove(serverName); + if (load != null) { + synchronized (this.loadToServers) { + Set servers = this.loadToServers.get(load); + if (servers != null) { + servers.remove(serverName); + if (servers.size() > 0) + this.loadToServers.put(load, servers); + else + this.loadToServers.remove(load); } - synchronized (serversToServerInfo) { - serversToServerInfo.notifyAll(); - } } } + // Add to dead servers and queue a shutdown processing. + this.deadServers.add(serverName); + try { + this.master.toDoQueue.put(new ProcessServerShutdown(master, info)); + } catch (InterruptedException e) { + LOG.error("insert into toDoQueue was interrupted", e); + } } - + /** * @param serverName */ @@ -872,10 +863,25 @@ * @param serverName * @return true if server is dead */ - public boolean isDead(String serverName) { - return deadServers.contains(serverName); + public boolean isDead(final String serverName) { + return isDead(serverName, false); } + /** + * @param serverName Servername as either host:port or host_port_startcode. + * @param hostAndPortOnly True if serverName is host and + * port only (host:port) and if so, then we do a prefix compare (ignoring + * start codes) looking for dead server. + * @return true if server is dead + */ + boolean isDead(final String serverName, final boolean hostAndPortOnly) { + if (!hostAndPortOnly) return deadServers.contains(serverName); + for (String hostPortStartCode: this.deadServers) { + if (hostPortStartCode.startsWith(serverName)) return true; + } + return false; + } + public boolean canAssignUserRegions() { if (minimumServerCount == 0) { return true;