Index: src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java
===================================================================
--- src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 933477)
+++ src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy)
@@ -193,7 +193,7 @@
/**
* @throws IOException
- * @see {@link #startMiniCluster(boolean, int)}
+ * @see {@link #startMiniCluster(int)}
*/
public void shutdownMiniCluster() throws IOException {
LOG.info("Shutting down minicluster");
Index: src/test/org/apache/hadoop/hbase/master/DirectMaster.java
===================================================================
--- src/test/org/apache/hadoop/hbase/master/DirectMaster.java (revision 0)
+++ src/test/org/apache/hadoop/hbase/master/DirectMaster.java (revision 0)
@@ -0,0 +1,30 @@
+package org.apache.hadoop.hbase.master;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.client.ServerConnection;
+import org.apache.hadoop.hbase.ipc.HBaseServer;
+
+import java.io.IOException;
+
+/**
+ * A Master that communicates with RegionServers by invoking their methods
+ * rather than via RPC.
+ */
+public class DirectMaster extends HMaster {
+ public DirectMaster(final HBaseConfiguration c) throws IOException {
+ super(c);
+ }
+
+ protected HBaseServer getRPCServer() throws IOException {
+ return null;
+ }
+
+ protected HServerAddress getServerAddress(HBaseServer s) {
+ return null;
+ }
+
+ protected ServerConnection getServerConnection(HBaseConfiguration c) {
+ return null;
+ }
+}
\ No newline at end of file
Index: src/test/org/apache/hadoop/hbase/DirectCluster.java
===================================================================
--- src/test/org/apache/hadoop/hbase/DirectCluster.java (revision 933477)
+++ src/test/org/apache/hadoop/hbase/DirectCluster.java (working copy)
@@ -1,5 +1,5 @@
/**
- * Copyright 2007 The Apache Software Foundation
+ * Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,61 +19,50 @@
*/
package org.apache.hadoop.hbase;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-
+import org.apache.hadoop.hbase.master.DirectMaster;
+import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.util.ReflectionUtils;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
/**
* This class creates a single process HBase cluster. One thread is created for
- * a master and one per region server.
- *
+ * a master and one per region server. This cluster is like
+ * {@link org.apache.hadoop.hbase.LocalHBaseCluster} only it runs without
+ * an rpc mediating the messagings; it pulls on method names directly.
+ *
* Call {@link #startup()} to start the cluster running and {@link #shutdown()}
* to close it all down. {@link #join} the cluster is you want to wait on
* shutdown completion.
- *
- *
Runs master on port 60000 by default. Because we can't just kill the
- * process -- not till HADOOP-1700 gets fixed and even then.... -- we need to
- * be able to find the master with a remote client to run shutdown. To use a
- * port other than 60000, set the hbase.master to a value of 'local:PORT':
- * that is 'local', not 'localhost', and the port number the master should use
- * instead of 60000.
- *
- *
To make 'local' mode more responsive, make values such as
- * hbase.regionserver.msginterval,
- * hbase.master.meta.thread.rescanfrequency, and
- * hbase.server.thread.wakefrequency a second or less.
+ *
+ *
TODO: Purge zk.
*/
-public class LocalHBaseCluster implements HConstants {
- static final Log LOG = LogFactory.getLog(LocalHBaseCluster.class);
+public class DirectCluster {
+ private static final Log LOG = LogFactory.getLog(DirectCluster.class);
private final HMaster master;
private final List regionThreads;
private final static int DEFAULT_NO = 1;
- /** local mode */
- public static final String LOCAL = "local";
- /** 'local:' */
- public static final String LOCAL_COLON = LOCAL + ":";
+ private static final String LOCAL = "local";
+ private static final String LOCAL_COLON = LOCAL + ":";
private final HBaseConfiguration conf;
- private final Class extends HRegionServer> regionServerClass;
/**
* Constructor.
* @param conf
- * @throws IOException
+ * @throws java.io.IOException
*/
- public LocalHBaseCluster(final HBaseConfiguration conf)
+ public DirectCluster(final HBaseConfiguration conf)
throws IOException {
this(conf, DEFAULT_NO);
}
@@ -83,21 +72,20 @@
* @param conf Configuration to use. Post construction has the master's
* address.
* @param noRegionServers Count of regionservers to start.
- * @throws IOException
+ * @throws java.io.IOException
*/
@SuppressWarnings("unchecked")
- public LocalHBaseCluster(final HBaseConfiguration conf,
+ public DirectCluster(final HBaseConfiguration conf,
final int noRegionServers)
throws IOException {
this.conf = conf;
// Create the master
- this.master = new HMaster(conf);
+ this.master = new DirectMaster(conf);
// Start the HRegionServers. Always have region servers come up on
// port '0' so there won't be clashes over default port as unit tests
// start/stop ports at different times during the life of the test.
- conf.set(REGIONSERVER_PORT, "0");
+ conf.set(HConstants.REGIONSERVER_PORT, "0");
this.regionThreads = new ArrayList();
- regionServerClass = (Class extends HRegionServer>) conf.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
for (int i = 0; i < noRegionServers; i++) {
addRegionServer();
}
@@ -107,14 +95,14 @@
* Creates a region server.
* Call 'start' on the returned thread to make it run.
*
- * @throws IOException
+ * @throws java.io.IOException
* @return Region server added.
*/
public RegionServerThread addRegionServer() throws IOException {
synchronized (regionThreads) {
- HRegionServer server;
+ HRegionServer server;
try {
- server = regionServerClass.getConstructor(HBaseConfiguration.class).
+ server = HRegionServer.class.getConstructor(HBaseConfiguration.class).
newInstance(conf);
} catch (Exception e) {
IOException ioe = new IOException();
@@ -141,7 +129,7 @@
/** runs region servers */
public static class RegionServerThread extends Thread {
private final HRegionServer regionServer;
-
+
RegionServerThread(final HRegionServer r, final int index) {
super(r, "RegionServer:" + index);
this.regionServer = r;
@@ -151,7 +139,7 @@
public HRegionServer getRegionServer() {
return this.regionServer;
}
-
+
/**
* Block until the region server has come online, indicating it is ready
* to be used.
@@ -232,7 +220,7 @@
}
}
}
-
+
/**
* Start the cluster.
* @return Address to use contacting master.
@@ -322,18 +310,18 @@
* @return True if a 'local' address in hbase.master value.
*/
public static boolean isLocal(final Configuration c) {
- String mode = c.get(CLUSTER_DISTRIBUTED);
- return mode == null || mode.equals(CLUSTER_IS_LOCAL);
+ String mode = c.get(HConstants.CLUSTER_DISTRIBUTED);
+ return mode == null || mode.equals(HConstants.CLUSTER_IS_LOCAL);
}
/**
* Test things basically work.
* @param args
- * @throws IOException
+ * @throws java.io.IOException
*/
public static void main(String[] args) throws IOException {
HBaseConfiguration conf = new HBaseConfiguration();
- LocalHBaseCluster cluster = new LocalHBaseCluster(conf);
+ DirectCluster cluster = new DirectCluster(conf);
cluster.startup();
HBaseAdmin admin = new HBaseAdmin(conf);
HTableDescriptor htd =
@@ -341,4 +329,4 @@
admin.createTable(htd);
cluster.shutdown();
}
-}
+}
\ No newline at end of file
Index: src/java/org/apache/hadoop/hbase/master/ServerManager.java
===================================================================
--- src/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 933477)
+++ src/java/org/apache/hadoop/hbase/master/ServerManager.java (working copy)
@@ -83,9 +83,11 @@
* expires, but the server is still running. After the network is healed,
* and it's server logs are recovered, it will be told to call server startup
* because by then, its regions have probably been reassigned.
+ *
+ * newSetFromMap is means of making a Set out of a Map.
*/
protected final Set deadServers =
- Collections.synchronizedSet(new HashSet());
+ Collections.newSetFromMap(new ConcurrentHashMap());
/** SortedMap server load -> Set of server names */
final SortedMap> loadToServers =
@@ -150,52 +152,35 @@
/**
* Let the server manager know a new regionserver has come online
* @param serverInfo
- * @throws Leases.LeaseStillHeldException
+ * @throws IOException
*/
public void regionServerStartup(final HServerInfo serverInfo)
- throws Leases.LeaseStillHeldException {
+ throws IOException {
+ // Test for case where we get a region startup message from a regionserver
+ // that has been quickly restarted but whose znode expiration handler has
+ // not yet run, or from a server whose fail we are currently processing.
HServerInfo info = new HServerInfo(serverInfo);
- String serverName = info.getServerName();
- if (serversToServerInfo.containsKey(serverName) ||
- deadServers.contains(serverName)) {
- LOG.debug("Server start was rejected: " + serverInfo);
- LOG.debug("serversToServerInfo.containsKey: " + serversToServerInfo.containsKey(serverName));
- LOG.debug("deadServers.contains: " + deadServers.contains(serverName));
- throw new Leases.LeaseStillHeldException(serverName);
- }
-
- LOG.info("Received start message from: " + serverName);
- // Go on to process the regionserver registration.
- HServerLoad load = serversToLoad.remove(serverName);
- if (load != null) {
- // The startup message was from a known server.
- // Remove stale information about the server's load.
- synchronized (loadToServers) {
- Set servers = loadToServers.get(load);
- if (servers != null) {
- servers.remove(serverName);
- if (servers.size() > 0)
- loadToServers.put(load, servers);
- else
- loadToServers.remove(load);
- }
+ String hostAndPort = info.getServerAddress().toString();
+ HServerInfo existingServer =
+ this.serverAddressToServerInfo.get(hostAndPort);
+ if (existingServer != null) {
+ LOG.info("Server start rejected; we already have " + hostAndPort +
+ " registered; existingServer=" + existingServer + ", newServer=" + info);
+ if (existingServer.getStartCode() < info.getStartCode()) {
+ LOG.info("Triggering server recovery; existingServer looks stale");
+ expireServer(existingServer);
}
+ throw new Leases.LeaseStillHeldException(hostAndPort);
}
- HServerInfo storedInfo = serversToServerInfo.remove(serverName);
- if (storedInfo != null && !master.closed.get()) {
- // The startup message was from a known server with the same name.
- // Timeout the old one right away.
- master.getRootRegionLocation();
- try {
- master.toDoQueue.put(new ProcessServerShutdown(master, storedInfo));
- } catch (InterruptedException e) {
- LOG.error("Insertion into toDoQueue was interrupted", e);
- }
+ if (isDead(hostAndPort, true)) {
+ LOG.debug("Server start rejected; currently processing " + hostAndPort +
+ " failure");
+ throw new Leases.LeaseStillHeldException(hostAndPort);
}
+ LOG.info("Received start message from: " + info.getServerName());
recordNewServer(info);
}
-
/**
* Adds the HSI to the RS list and creates an empty load
* @param info The region server informations
@@ -216,7 +201,7 @@
info.setLoad(load);
// We must set this watcher here because it can be set on a fresh start
// or on a failover
- Watcher watcher = new ServerExpirer(serverName, info.getServerAddress());
+ Watcher watcher = new ServerExpirer(new HServerInfo(info));
master.getZooKeeperWrapper().updateRSLocationGetWatch(info, watcher);
serversToServerInfo.put(serverName, info);
serverAddressToServerInfo.put(info.getServerAddress(), info);
@@ -313,7 +298,6 @@
synchronized (serversToServerInfo) {
removeServerInfo(info.getServerName(), info.getServerAddress());
- serversToServerInfo.notifyAll();
}
return new HMsg[] {REGIONSERVER_STOP};
@@ -325,42 +309,36 @@
/** Region server is exiting */
private void processRegionServerExit(HServerInfo serverInfo, HMsg[] msgs) {
synchronized (serversToServerInfo) {
- try {
- // This method removes ROOT/META from the list and marks them to be reassigned
- // in addition to other housework.
- if (removeServerInfo(serverInfo.getServerName(),
- serverInfo.getServerAddress())) {
- // Only process the exit message if the server still has registered info.
- // Otherwise we could end up processing the server exit twice.
- LOG.info("Region server " + serverInfo.getServerName() +
- ": MSG_REPORT_EXITING");
- // Get all the regions the server was serving reassigned
- // (if we are not shutting down).
- if (!master.closed.get()) {
- for (int i = 1; i < msgs.length; i++) {
- LOG.info("Processing " + msgs[i] + " from " +
- serverInfo.getServerName());
- HRegionInfo info = msgs[i].getRegionInfo();
- // Meta/root region offlining is handed in removeServerInfo above.
- if (!info.isMetaRegion()) {
- synchronized (master.regionManager) {
- if (!master.regionManager.isOfflined(
- info.getRegionNameAsString())) {
- master.regionManager.setUnassigned(info, true);
- } else {
- master.regionManager.removeRegion(info);
- }
+ // This method removes ROOT/META from the list and marks them to be reassigned
+ // in addition to other housework.
+ if (removeServerInfo(serverInfo.getServerName(),
+ serverInfo.getServerAddress())) {
+ // Only process the exit message if the server still has registered info.
+ // Otherwise we could end up processing the server exit twice.
+ LOG.info("Region server " + serverInfo.getServerName() +
+ ": MSG_REPORT_EXITING");
+ // Get all the regions the server was serving reassigned
+ // (if we are not shutting down).
+ if (!master.closed.get()) {
+ for (int i = 1; i < msgs.length; i++) {
+ LOG.info("Processing " + msgs[i] + " from " +
+ serverInfo.getServerName());
+ HRegionInfo info = msgs[i].getRegionInfo();
+ // Meta/root region offlining is handed in removeServerInfo above.
+ if (!info.isMetaRegion()) {
+ synchronized (master.regionManager) {
+ if (!master.regionManager.isOfflined(
+ info.getRegionNameAsString())) {
+ master.regionManager.setUnassigned(info, true);
+ } else {
+ master.regionManager.removeRegion(info);
}
}
}
}
}
- // We don't need to return anything to the server because it isn't
- // going to do any more work.
- } finally {
- serversToServerInfo.notifyAll();
}
- }
+ }
}
/**
@@ -818,49 +796,62 @@
/** Watcher triggered when a RS znode is deleted */
private class ServerExpirer implements Watcher {
- private String server;
- private HServerAddress serverAddress;
+ private HServerInfo server;
- ServerExpirer(String server, HServerAddress serverAddress) {
- this.server = server;
- this.serverAddress = serverAddress;
+ ServerExpirer(final HServerInfo hsi) {
+ this.server = hsi;
}
public void process(WatchedEvent event) {
- if(event.getType().equals(EventType.NodeDeleted)) {
- LOG.info(server + " znode expired");
- // Remove the server from the known servers list and update load info
- serverAddressToServerInfo.remove(serverAddress);
- HServerInfo info = serversToServerInfo.remove(server);
- if (info != null) {
- String serverName = info.getServerName();
- HServerLoad load = serversToLoad.remove(serverName);
- if (load != null) {
- synchronized (loadToServers) {
- Set servers = loadToServers.get(load);
- if (servers != null) {
- servers.remove(serverName);
- if(servers.size() > 0)
- loadToServers.put(load, servers);
- else
- loadToServers.remove(load);
- }
- }
- }
- deadServers.add(server);
- try {
- master.toDoQueue.put(new ProcessServerShutdown(master, info));
- } catch (InterruptedException e) {
- LOG.error("insert into toDoQueue was interrupted", e);
- }
+ if (!event.getType().equals(EventType.NodeDeleted)) return;
+ LOG.info(this.server.getServerName() + " znode expired");
+ expireServer(this.server);
+ }
+ }
+
+ /*
+ * Expire the passed server. Add it to list of deadservers and queue a
+ * shutdown processing.
+ * @param server Servername (format is host_port_startcode)
+ * @param serverAddress
+ */
+ private synchronized void expireServer(final HServerInfo hsi) {
+ // First check a server to expire.
+ String serverName = hsi.getServerName();
+ HServerInfo info = this.serversToServerInfo.get(serverName);
+ if (info == null) {
+ LOG.warn("No HServerInfo for " + serverName);
+ return;
+ }
+ if (this.deadServers.contains(serverName)) {
+ LOG.warn("Already processing shutdown of " + serverName);
+ return;
+ }
+ // Remove the server from the known servers lists and update load info
+ this.serverAddressToServerInfo.remove(serverName);
+ this.serversToServerInfo.remove(serverName);
+ HServerLoad load = this.serversToLoad.remove(serverName);
+ if (load != null) {
+ synchronized (this.loadToServers) {
+ Set servers = this.loadToServers.get(load);
+ if (servers != null) {
+ servers.remove(serverName);
+ if (servers.size() > 0)
+ this.loadToServers.put(load, servers);
+ else
+ this.loadToServers.remove(load);
}
- synchronized (serversToServerInfo) {
- serversToServerInfo.notifyAll();
- }
}
}
+ // Add to dead servers and queue a shutdown processing.
+ this.deadServers.add(serverName);
+ try {
+ this.master.toDoQueue.put(new ProcessServerShutdown(master, info));
+ } catch (InterruptedException e) {
+ LOG.error("insert into toDoQueue was interrupted", e);
+ }
}
-
+
/**
* @param serverName
*/
@@ -872,10 +863,25 @@
* @param serverName
* @return true if server is dead
*/
- public boolean isDead(String serverName) {
- return deadServers.contains(serverName);
+ public boolean isDead(final String serverName) {
+ return isDead(serverName, false);
}
+ /**
+ * @param serverName Servername as either host:port or host_port_startcode.
+ * @param hostAndPortOnly True if serverName is host and
+ * port only (host:port) and if so, then we do a prefix compare (ignoring
+ * start codes) looking for dead server.
+ * @return true if server is dead
+ */
+ boolean isDead(final String serverName, final boolean hostAndPortOnly) {
+ if (!hostAndPortOnly) return deadServers.contains(serverName);
+ for (String hostPortStartCode: this.deadServers) {
+ if (hostPortStartCode.startsWith(serverName)) return true;
+ }
+ return false;
+ }
+
public boolean canAssignUserRegions() {
if (minimumServerCount == 0) {
return true;
Index: src/java/org/apache/hadoop/hbase/master/HMaster.java
===================================================================
--- src/java/org/apache/hadoop/hbase/master/HMaster.java (revision 933477)
+++ src/java/org/apache/hadoop/hbase/master/HMaster.java (working copy)
@@ -197,7 +197,6 @@
// If FS is in safe mode wait till out of it.
FSUtils.waitOnSafeMode(this.conf, this.threadWakeFrequency);
-
try {
// Make sure the hbase root directory exists!
if (!fs.exists(rootdir)) {
@@ -220,16 +219,16 @@
this.maxRegionOpenTime =
conf.getLong("hbase.hbasemaster.maxregionopen", 120 * 1000);
this.leaseTimeout = conf.getInt("hbase.master.lease.period", 120 * 1000);
- this.server = HBaseRPC.getServer(this, address.getBindAddress(),
- address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
- false, conf);
-
+
+
+
+ this.server = getRPCServer();
// The rpc-server port can be ephemeral... ensure we have the correct info
- this.address = new HServerAddress(server.getListenerAddress());
+ this.address = getServerAddress(this.server);
- // dont retry too much
+ // Dont retry too much
conf.setInt("hbase.client.retries.number", 3);
- this.connection = ServerConnectionManager.getConnection(conf);
+ this.connection = getServerConnection(this.conf);
this.metaRescanInterval =
conf.getInt("hbase.master.meta.thread.rescanfrequency", 60 * 1000);
@@ -238,6 +237,7 @@
zooKeeperWrapper = new ZooKeeperWrapper(conf, this);
zkMasterAddressWatcher = new ZKMasterAddressWatcher(this);
+
serverManager = new ServerManager(this);
regionManager = new RegionManager(this);
@@ -248,6 +248,20 @@
LOG.info("HMaster initialized on " + this.address.toString());
}
+ protected HBaseServer getRPCServer() throws IOException {
+ return HBaseRPC.getServer(this, address.getBindAddress(),
+ address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
+ false, conf);
+ }
+
+ protected HServerAddress getServerAddress(final HBaseServer s) {
+ return new HServerAddress(this.server.getListenerAddress());
+ }
+
+ protected ServerConnection getServerConnection(final HBaseConfiguration c) {
+ return ServerConnectionManager.getConnection(c);
+ }
+
/*
* Return true if we are the master, false if the cluster must shut down
* or if we only retry once.
@@ -1288,5 +1302,4 @@
public static void main(String [] args) {
doMain(args, HMaster.class);
}
-
}