Index: src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 933477) +++ src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -193,7 +193,7 @@ /** * @throws IOException - * @see {@link #startMiniCluster(boolean, int)} + * @see {@link #startMiniCluster(int)} */ public void shutdownMiniCluster() throws IOException { LOG.info("Shutting down minicluster"); Index: src/test/org/apache/hadoop/hbase/master/DirectMaster.java =================================================================== --- src/test/org/apache/hadoop/hbase/master/DirectMaster.java (revision 0) +++ src/test/org/apache/hadoop/hbase/master/DirectMaster.java (revision 0) @@ -0,0 +1,30 @@ +package org.apache.hadoop.hbase.master; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.client.ServerConnection; +import org.apache.hadoop.hbase.ipc.HBaseServer; + +import java.io.IOException; + +/** + * A Master that communicates with RegionServers by invoking their methods + * rather than via RPC. + */ +public class DirectMaster extends HMaster { + public DirectMaster(final HBaseConfiguration c) throws IOException { + super(c); + } + + protected HBaseServer getRPCServer() throws IOException { + return null; + } + + protected HServerAddress getServerAddress(HBaseServer s) { + return null; + } + + protected ServerConnection getServerConnection(HBaseConfiguration c) { + return null; + } +} \ No newline at end of file Index: src/test/org/apache/hadoop/hbase/DirectCluster.java =================================================================== --- src/test/org/apache/hadoop/hbase/DirectCluster.java (revision 933477) +++ src/test/org/apache/hadoop/hbase/DirectCluster.java (working copy) @@ -1,5 +1,5 @@ /** - * Copyright 2007 The Apache Software Foundation + * Copyright 2010 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,61 +19,50 @@ */ package org.apache.hadoop.hbase; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.client.HBaseAdmin; - +import org.apache.hadoop.hbase.master.DirectMaster; +import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.util.ReflectionUtils; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + /** * This class creates a single process HBase cluster. One thread is created for - * a master and one per region server. - * + * a master and one per region server. This cluster is like + * {@link org.apache.hadoop.hbase.LocalHBaseCluster} only it runs without + * an rpc mediating the messagings; it pulls on method names directly. + * * Call {@link #startup()} to start the cluster running and {@link #shutdown()} * to close it all down. {@link #join} the cluster is you want to wait on * shutdown completion. - * - *

Runs master on port 60000 by default. Because we can't just kill the - * process -- not till HADOOP-1700 gets fixed and even then.... -- we need to - * be able to find the master with a remote client to run shutdown. To use a - * port other than 60000, set the hbase.master to a value of 'local:PORT': - * that is 'local', not 'localhost', and the port number the master should use - * instead of 60000. - * - *

To make 'local' mode more responsive, make values such as - * hbase.regionserver.msginterval, - * hbase.master.meta.thread.rescanfrequency, and - * hbase.server.thread.wakefrequency a second or less. + * + *

TODO: Purge zk. */ -public class LocalHBaseCluster implements HConstants { - static final Log LOG = LogFactory.getLog(LocalHBaseCluster.class); +public class DirectCluster { + private static final Log LOG = LogFactory.getLog(DirectCluster.class); private final HMaster master; private final List regionThreads; private final static int DEFAULT_NO = 1; - /** local mode */ - public static final String LOCAL = "local"; - /** 'local:' */ - public static final String LOCAL_COLON = LOCAL + ":"; + private static final String LOCAL = "local"; + private static final String LOCAL_COLON = LOCAL + ":"; private final HBaseConfiguration conf; - private final Class regionServerClass; /** * Constructor. * @param conf - * @throws IOException + * @throws java.io.IOException */ - public LocalHBaseCluster(final HBaseConfiguration conf) + public DirectCluster(final HBaseConfiguration conf) throws IOException { this(conf, DEFAULT_NO); } @@ -83,21 +72,20 @@ * @param conf Configuration to use. Post construction has the master's * address. * @param noRegionServers Count of regionservers to start. - * @throws IOException + * @throws java.io.IOException */ @SuppressWarnings("unchecked") - public LocalHBaseCluster(final HBaseConfiguration conf, + public DirectCluster(final HBaseConfiguration conf, final int noRegionServers) throws IOException { this.conf = conf; // Create the master - this.master = new HMaster(conf); + this.master = new DirectMaster(conf); // Start the HRegionServers. Always have region servers come up on // port '0' so there won't be clashes over default port as unit tests // start/stop ports at different times during the life of the test. - conf.set(REGIONSERVER_PORT, "0"); + conf.set(HConstants.REGIONSERVER_PORT, "0"); this.regionThreads = new ArrayList(); - regionServerClass = (Class) conf.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); for (int i = 0; i < noRegionServers; i++) { addRegionServer(); } @@ -107,14 +95,14 @@ * Creates a region server. * Call 'start' on the returned thread to make it run. * - * @throws IOException + * @throws java.io.IOException * @return Region server added. */ public RegionServerThread addRegionServer() throws IOException { synchronized (regionThreads) { - HRegionServer server; + HRegionServer server; try { - server = regionServerClass.getConstructor(HBaseConfiguration.class). + server = HRegionServer.class.getConstructor(HBaseConfiguration.class). newInstance(conf); } catch (Exception e) { IOException ioe = new IOException(); @@ -141,7 +129,7 @@ /** runs region servers */ public static class RegionServerThread extends Thread { private final HRegionServer regionServer; - + RegionServerThread(final HRegionServer r, final int index) { super(r, "RegionServer:" + index); this.regionServer = r; @@ -151,7 +139,7 @@ public HRegionServer getRegionServer() { return this.regionServer; } - + /** * Block until the region server has come online, indicating it is ready * to be used. @@ -232,7 +220,7 @@ } } } - + /** * Start the cluster. * @return Address to use contacting master. @@ -322,18 +310,18 @@ * @return True if a 'local' address in hbase.master value. */ public static boolean isLocal(final Configuration c) { - String mode = c.get(CLUSTER_DISTRIBUTED); - return mode == null || mode.equals(CLUSTER_IS_LOCAL); + String mode = c.get(HConstants.CLUSTER_DISTRIBUTED); + return mode == null || mode.equals(HConstants.CLUSTER_IS_LOCAL); } /** * Test things basically work. * @param args - * @throws IOException + * @throws java.io.IOException */ public static void main(String[] args) throws IOException { HBaseConfiguration conf = new HBaseConfiguration(); - LocalHBaseCluster cluster = new LocalHBaseCluster(conf); + DirectCluster cluster = new DirectCluster(conf); cluster.startup(); HBaseAdmin admin = new HBaseAdmin(conf); HTableDescriptor htd = @@ -341,4 +329,4 @@ admin.createTable(htd); cluster.shutdown(); } -} +} \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/master/ServerManager.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 933477) +++ src/java/org/apache/hadoop/hbase/master/ServerManager.java (working copy) @@ -83,9 +83,11 @@ * expires, but the server is still running. After the network is healed, * and it's server logs are recovered, it will be told to call server startup * because by then, its regions have probably been reassigned. + * + * newSetFromMap is means of making a Set out of a Map. */ protected final Set deadServers = - Collections.synchronizedSet(new HashSet()); + Collections.newSetFromMap(new ConcurrentHashMap()); /** SortedMap server load -> Set of server names */ final SortedMap> loadToServers = @@ -150,52 +152,35 @@ /** * Let the server manager know a new regionserver has come online * @param serverInfo - * @throws Leases.LeaseStillHeldException + * @throws IOException */ public void regionServerStartup(final HServerInfo serverInfo) - throws Leases.LeaseStillHeldException { + throws IOException { + // Test for case where we get a region startup message from a regionserver + // that has been quickly restarted but whose znode expiration handler has + // not yet run, or from a server whose fail we are currently processing. HServerInfo info = new HServerInfo(serverInfo); - String serverName = info.getServerName(); - if (serversToServerInfo.containsKey(serverName) || - deadServers.contains(serverName)) { - LOG.debug("Server start was rejected: " + serverInfo); - LOG.debug("serversToServerInfo.containsKey: " + serversToServerInfo.containsKey(serverName)); - LOG.debug("deadServers.contains: " + deadServers.contains(serverName)); - throw new Leases.LeaseStillHeldException(serverName); - } - - LOG.info("Received start message from: " + serverName); - // Go on to process the regionserver registration. - HServerLoad load = serversToLoad.remove(serverName); - if (load != null) { - // The startup message was from a known server. - // Remove stale information about the server's load. - synchronized (loadToServers) { - Set servers = loadToServers.get(load); - if (servers != null) { - servers.remove(serverName); - if (servers.size() > 0) - loadToServers.put(load, servers); - else - loadToServers.remove(load); - } + String hostAndPort = info.getServerAddress().toString(); + HServerInfo existingServer = + this.serverAddressToServerInfo.get(hostAndPort); + if (existingServer != null) { + LOG.info("Server start rejected; we already have " + hostAndPort + + " registered; existingServer=" + existingServer + ", newServer=" + info); + if (existingServer.getStartCode() < info.getStartCode()) { + LOG.info("Triggering server recovery; existingServer looks stale"); + expireServer(existingServer); } + throw new Leases.LeaseStillHeldException(hostAndPort); } - HServerInfo storedInfo = serversToServerInfo.remove(serverName); - if (storedInfo != null && !master.closed.get()) { - // The startup message was from a known server with the same name. - // Timeout the old one right away. - master.getRootRegionLocation(); - try { - master.toDoQueue.put(new ProcessServerShutdown(master, storedInfo)); - } catch (InterruptedException e) { - LOG.error("Insertion into toDoQueue was interrupted", e); - } + if (isDead(hostAndPort, true)) { + LOG.debug("Server start rejected; currently processing " + hostAndPort + + " failure"); + throw new Leases.LeaseStillHeldException(hostAndPort); } + LOG.info("Received start message from: " + info.getServerName()); recordNewServer(info); } - /** * Adds the HSI to the RS list and creates an empty load * @param info The region server informations @@ -216,7 +201,7 @@ info.setLoad(load); // We must set this watcher here because it can be set on a fresh start // or on a failover - Watcher watcher = new ServerExpirer(serverName, info.getServerAddress()); + Watcher watcher = new ServerExpirer(new HServerInfo(info)); master.getZooKeeperWrapper().updateRSLocationGetWatch(info, watcher); serversToServerInfo.put(serverName, info); serverAddressToServerInfo.put(info.getServerAddress(), info); @@ -313,7 +298,6 @@ synchronized (serversToServerInfo) { removeServerInfo(info.getServerName(), info.getServerAddress()); - serversToServerInfo.notifyAll(); } return new HMsg[] {REGIONSERVER_STOP}; @@ -325,42 +309,36 @@ /** Region server is exiting */ private void processRegionServerExit(HServerInfo serverInfo, HMsg[] msgs) { synchronized (serversToServerInfo) { - try { - // This method removes ROOT/META from the list and marks them to be reassigned - // in addition to other housework. - if (removeServerInfo(serverInfo.getServerName(), - serverInfo.getServerAddress())) { - // Only process the exit message if the server still has registered info. - // Otherwise we could end up processing the server exit twice. - LOG.info("Region server " + serverInfo.getServerName() + - ": MSG_REPORT_EXITING"); - // Get all the regions the server was serving reassigned - // (if we are not shutting down). - if (!master.closed.get()) { - for (int i = 1; i < msgs.length; i++) { - LOG.info("Processing " + msgs[i] + " from " + - serverInfo.getServerName()); - HRegionInfo info = msgs[i].getRegionInfo(); - // Meta/root region offlining is handed in removeServerInfo above. - if (!info.isMetaRegion()) { - synchronized (master.regionManager) { - if (!master.regionManager.isOfflined( - info.getRegionNameAsString())) { - master.regionManager.setUnassigned(info, true); - } else { - master.regionManager.removeRegion(info); - } + // This method removes ROOT/META from the list and marks them to be reassigned + // in addition to other housework. + if (removeServerInfo(serverInfo.getServerName(), + serverInfo.getServerAddress())) { + // Only process the exit message if the server still has registered info. + // Otherwise we could end up processing the server exit twice. + LOG.info("Region server " + serverInfo.getServerName() + + ": MSG_REPORT_EXITING"); + // Get all the regions the server was serving reassigned + // (if we are not shutting down). + if (!master.closed.get()) { + for (int i = 1; i < msgs.length; i++) { + LOG.info("Processing " + msgs[i] + " from " + + serverInfo.getServerName()); + HRegionInfo info = msgs[i].getRegionInfo(); + // Meta/root region offlining is handed in removeServerInfo above. + if (!info.isMetaRegion()) { + synchronized (master.regionManager) { + if (!master.regionManager.isOfflined( + info.getRegionNameAsString())) { + master.regionManager.setUnassigned(info, true); + } else { + master.regionManager.removeRegion(info); } } } } } - // We don't need to return anything to the server because it isn't - // going to do any more work. - } finally { - serversToServerInfo.notifyAll(); } - } + } } /** @@ -818,49 +796,62 @@ /** Watcher triggered when a RS znode is deleted */ private class ServerExpirer implements Watcher { - private String server; - private HServerAddress serverAddress; + private HServerInfo server; - ServerExpirer(String server, HServerAddress serverAddress) { - this.server = server; - this.serverAddress = serverAddress; + ServerExpirer(final HServerInfo hsi) { + this.server = hsi; } public void process(WatchedEvent event) { - if(event.getType().equals(EventType.NodeDeleted)) { - LOG.info(server + " znode expired"); - // Remove the server from the known servers list and update load info - serverAddressToServerInfo.remove(serverAddress); - HServerInfo info = serversToServerInfo.remove(server); - if (info != null) { - String serverName = info.getServerName(); - HServerLoad load = serversToLoad.remove(serverName); - if (load != null) { - synchronized (loadToServers) { - Set servers = loadToServers.get(load); - if (servers != null) { - servers.remove(serverName); - if(servers.size() > 0) - loadToServers.put(load, servers); - else - loadToServers.remove(load); - } - } - } - deadServers.add(server); - try { - master.toDoQueue.put(new ProcessServerShutdown(master, info)); - } catch (InterruptedException e) { - LOG.error("insert into toDoQueue was interrupted", e); - } + if (!event.getType().equals(EventType.NodeDeleted)) return; + LOG.info(this.server.getServerName() + " znode expired"); + expireServer(this.server); + } + } + + /* + * Expire the passed server. Add it to list of deadservers and queue a + * shutdown processing. + * @param server Servername (format is host_port_startcode) + * @param serverAddress + */ + private synchronized void expireServer(final HServerInfo hsi) { + // First check a server to expire. + String serverName = hsi.getServerName(); + HServerInfo info = this.serversToServerInfo.get(serverName); + if (info == null) { + LOG.warn("No HServerInfo for " + serverName); + return; + } + if (this.deadServers.contains(serverName)) { + LOG.warn("Already processing shutdown of " + serverName); + return; + } + // Remove the server from the known servers lists and update load info + this.serverAddressToServerInfo.remove(serverName); + this.serversToServerInfo.remove(serverName); + HServerLoad load = this.serversToLoad.remove(serverName); + if (load != null) { + synchronized (this.loadToServers) { + Set servers = this.loadToServers.get(load); + if (servers != null) { + servers.remove(serverName); + if (servers.size() > 0) + this.loadToServers.put(load, servers); + else + this.loadToServers.remove(load); } - synchronized (serversToServerInfo) { - serversToServerInfo.notifyAll(); - } } } + // Add to dead servers and queue a shutdown processing. + this.deadServers.add(serverName); + try { + this.master.toDoQueue.put(new ProcessServerShutdown(master, info)); + } catch (InterruptedException e) { + LOG.error("insert into toDoQueue was interrupted", e); + } } - + /** * @param serverName */ @@ -872,10 +863,25 @@ * @param serverName * @return true if server is dead */ - public boolean isDead(String serverName) { - return deadServers.contains(serverName); + public boolean isDead(final String serverName) { + return isDead(serverName, false); } + /** + * @param serverName Servername as either host:port or host_port_startcode. + * @param hostAndPortOnly True if serverName is host and + * port only (host:port) and if so, then we do a prefix compare (ignoring + * start codes) looking for dead server. + * @return true if server is dead + */ + boolean isDead(final String serverName, final boolean hostAndPortOnly) { + if (!hostAndPortOnly) return deadServers.contains(serverName); + for (String hostPortStartCode: this.deadServers) { + if (hostPortStartCode.startsWith(serverName)) return true; + } + return false; + } + public boolean canAssignUserRegions() { if (minimumServerCount == 0) { return true; Index: src/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/HMaster.java (revision 933477) +++ src/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -197,7 +197,6 @@ // If FS is in safe mode wait till out of it. FSUtils.waitOnSafeMode(this.conf, this.threadWakeFrequency); - try { // Make sure the hbase root directory exists! if (!fs.exists(rootdir)) { @@ -220,16 +219,16 @@ this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 120 * 1000); this.leaseTimeout = conf.getInt("hbase.master.lease.period", 120 * 1000); - this.server = HBaseRPC.getServer(this, address.getBindAddress(), - address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), - false, conf); - + + + + this.server = getRPCServer(); // The rpc-server port can be ephemeral... ensure we have the correct info - this.address = new HServerAddress(server.getListenerAddress()); + this.address = getServerAddress(this.server); - // dont retry too much + // Dont retry too much conf.setInt("hbase.client.retries.number", 3); - this.connection = ServerConnectionManager.getConnection(conf); + this.connection = getServerConnection(this.conf); this.metaRescanInterval = conf.getInt("hbase.master.meta.thread.rescanfrequency", 60 * 1000); @@ -238,6 +237,7 @@ zooKeeperWrapper = new ZooKeeperWrapper(conf, this); zkMasterAddressWatcher = new ZKMasterAddressWatcher(this); + serverManager = new ServerManager(this); regionManager = new RegionManager(this); @@ -248,6 +248,20 @@ LOG.info("HMaster initialized on " + this.address.toString()); } + protected HBaseServer getRPCServer() throws IOException { + return HBaseRPC.getServer(this, address.getBindAddress(), + address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), + false, conf); + } + + protected HServerAddress getServerAddress(final HBaseServer s) { + return new HServerAddress(this.server.getListenerAddress()); + } + + protected ServerConnection getServerConnection(final HBaseConfiguration c) { + return ServerConnectionManager.getConnection(c); + } + /* * Return true if we are the master, false if the cluster must shut down * or if we only retry once. @@ -1288,5 +1302,4 @@ public static void main(String [] args) { doMain(args, HMaster.class); } - }