Index: src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
===================================================================
--- src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 941538)
+++ src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy)
@@ -29,6 +29,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -110,10 +111,32 @@
* Subclass so can get at protected methods (none at moment).
*/
public static class MiniHBaseClusterRegionServer extends HRegionServer {
+ private FileSystem fs = null;
+
public MiniHBaseClusterRegionServer(HBaseConfiguration conf)
throws IOException {
super(conf);
}
+
+ @Override
+ protected FileSystem getFileSystem() {
+ // TODO Auto-generated method stub
+ return this.fs != null? this.fs: super.getFileSystem();
+ }
+
+ /**
+ * Set alternate filesystem. Must call before regionserver is started.
+ * @param fs
+ */
+ public void setAlternateFileSystemInstance(final FileSystem fs) {
+ this.fs = fs;
+ }
+
+ public void kill() {
+ // Don't have hdfs shutdown on the way out.
+ setHDFSShutdownThreadOnExit(null);
+ super.kill();
+ }
}
private void init(final int nRegionNodes) throws IOException {
Index: src/test/org/apache/hadoop/hbase/master/TestMasterTransistions.java
===================================================================
--- src/test/org/apache/hadoop/hbase/master/TestMasterTransistions.java (revision 941538)
+++ src/test/org/apache/hadoop/hbase/master/TestMasterTransistions.java (working copy)
@@ -19,15 +19,19 @@
*/
package org.apache.hadoop.hbase.master;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.BindException;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HMsg;
@@ -68,6 +72,7 @@
* @throws Exception
*/
@BeforeClass public static void beforeAllTests() throws Exception {
+ TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
// Start a cluster of two regionservers.
TEST_UTIL.startMiniCluster(2);
// Create a table of three families. This will assign a region.
@@ -83,6 +88,55 @@
}
/**
+ * Test adding in a new server before old one on same host+port is dead.
+ * Make the test more onerous by having the server under test carry the meta.
+ * If confusion between old and new, purportedly meta never comes back. Test
+ * that meta gets redeployed.
+ * @throws IOException
+ */
+ @Test public void testAddingServerBeforeOldIsDead2413() throws IOException {
+ LOG.info("Running testAddingServerBeforeOldIsDead2413");
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ int count = count();
+ int metaIndex = cluster.getServerWithMeta();
+ MiniHBaseClusterRegionServer metaHRS =
+ (MiniHBaseClusterRegionServer)cluster.getRegionServer(metaIndex);
+ int port = metaHRS.getServerInfo().getServerAddress().getPort();
+ Configuration c = TEST_UTIL.getConfiguration();
+ String oldPort = c.get(HConstants.REGIONSERVER_PORT, "0");
+ try {
+ LOG.info("KILLED=" + metaHRS);
+ metaHRS.kill();
+ c.set(HConstants.REGIONSERVER_PORT, Integer.toString(port));
+ // Try and start new regionserver. It might clash with the old
+ // regionserver port so keep trying to get past the BindException.
+ HRegionServer hrs = null;
+ while (true) {
+ try {
+ hrs = cluster.startRegionServer().getRegionServer();
+ break;
+ } catch (IOException e) {
+ if (e.getCause() != null && e.getCause() instanceof InvocationTargetException) {
+ InvocationTargetException ee = (InvocationTargetException)e.getCause();
+ if (ee.getCause() != null && ee.getCause() instanceof BindException) {
+ LOG.info("BindException; retrying: " + e.toString());
+ }
+ }
+ }
+ }
+ LOG.info("STARTED=" + hrs);
+ // Wait until he's been given at least 3 regions before we go on to try
+ // and count rows in table.
+ while (hrs.getOnlineRegions().size() > 3) Threads.sleep(100);
+ LOG.info(hrs.toString() + " has " + hrs.getOnlineRegions().size() +
+ " regions");
+ assertEquals(count, count());
+ } finally {
+ c.set(HConstants.REGIONSERVER_PORT, oldPort);
+ }
+ }
+
+ /**
* HBase2482 is about outstanding region openings. If any are outstanding
* when a regionserver goes down, then they'll never deploy. They'll be
* stuck in the regions-in-transition list for ever. This listener looks
@@ -165,6 +219,7 @@
* @see HBASE-2482
*/
@Test public void testKillRSWithOpeningRegion2482() throws Exception {
+ LOG.info("Running testKillRSWithOpeningRegion2482");
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
// Count how many regions are online. They need to be all back online for
// this test to succeed.
@@ -336,6 +391,7 @@
* @see HBASE-2428
*/
@Test public void testRegionCloseWhenNoMetaHBase2428() throws Exception {
+ LOG.info("Running testRegionCloseWhenNoMetaHBase2428");
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
final HMaster master = cluster.getMaster();
int metaIndex = cluster.getServerWithMeta();
@@ -474,6 +530,23 @@
}
/*
+ * @return Count of rows in TABLENAME
+ * @throws IOException
+ */
+ private static int count() throws IOException {
+ HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
+ int rows = 0;
+ Scan scan = new Scan();
+ ResultScanner s = t.getScanner(scan);
+ for (Result r = null; (r = s.next()) != null;) {
+ rows++;
+ }
+ s.close();
+ LOG.info("Counted=" + rows);
+ return rows;
+ }
+
+ /*
* @param hri
* @return Start key for hri (If start key is '', then return 'aaa'.
*/
Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 941538)
+++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -135,6 +135,8 @@
// debugging and unit tests.
protected volatile boolean abortRequested;
+ private boolean killed = false;
+
// If false, the file system has become unavailable
protected volatile boolean fsOk;
@@ -630,7 +632,9 @@
hlogRoller.interruptIfNecessary();
this.majorCompactionChecker.interrupt();
- if (abortRequested) {
+ if (killed) {
+ // Just skip out w/o closing regions.
+ } else if (abortRequested) {
if (this.fsOk) {
// Only try to clean up if the file system is available
try {
@@ -682,9 +686,11 @@
HBaseRPC.stopProxy(this.hbaseMaster);
this.hbaseMaster = null;
}
-
- join();
- this.zooKeeperWrapper.close();
+
+ if (!killed) {
+ join();
+ this.zooKeeperWrapper.close();
+ }
if (this.shutdownHDFS.get()) {
runThread(this.hdfsShutdownThread,
this.conf.getLong("hbase.dfs.shutdown.wait", 30000));
@@ -789,7 +795,7 @@
// accessors will be going against wrong filesystem (unless all is set
// to defaults).
this.conf.set("fs.default.name", this.conf.get("hbase.rootdir"));
- this.fs = FileSystem.get(this.conf);
+ this.fs = getFileSystem(this.conf);
// Register shutdown hook for HRegionServer, runs an orderly shutdown
// when a kill signal is recieved
@@ -812,6 +818,17 @@
}
/*
+ * Method so subclass can put in place an alternate filesystem instance.
+ * @param c
+ * @return FileSystem to use.
+ * @throws IOException
+ */
+ protected FileSystem getFileSystem(final HBaseConfiguration c)
+ throws IOException {
+ return FileSystem.get(c);
+ }
+
+ /*
* @param r Region to get RegionLoad for.
* @return RegionLoad instance.
* @throws IOException
@@ -1276,6 +1293,16 @@
stop();
}
+ /*
+ * Simulate a kill -9 of this server.
+ * Exits w/o closing regions or cleaninup logs but it does close socket in
+ * case want to bring up server on old hostname+port immediately.
+ */
+ protected void kill() {
+ this.killed = true;
+ abort();
+ }
+
/**
* Wait on all threads to finish.
* Presumption is that all closes and stops have already been called.
Index: src/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java
===================================================================
--- src/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java (revision 941538)
+++ src/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java (working copy)
@@ -241,6 +243,8 @@
// Get regions reassigned
for (HRegionInfo info: regions) {
+ LOG.debug("Setting " + info.getRegionNameAsString() +
+ " to unassigned in shutdown processor");
master.regionManager.setUnassigned(info, true);
}
}
@@ -272,13 +276,12 @@
public Boolean call() throws IOException {
if (LOG.isDebugEnabled()) {
- LOG.debug("process server shutdown scanning " +
+ LOG.debug("Process server shutdown scanning " +
Bytes.toString(m.getRegionName()) + " on " + m.getServer());
}
Scan scan = new Scan();
scan.addFamily(CATALOG_FAMILY);
- long scannerId = server.openScanner(
- m.getRegionName(), scan);
+ long scannerId = server.openScanner(m.getRegionName(), scan);
scanMetaRegion(server, scannerId, m.getRegionName());
return true;
}
@@ -383,4 +386,4 @@
protected int getPriority() {
return 2; // high but not highest priority
}
-}
+}
\ No newline at end of file
Index: src/java/org/apache/hadoop/hbase/master/ServerManager.java
===================================================================
--- src/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 941538)
+++ src/java/org/apache/hadoop/hbase/master/ServerManager.java (working copy)
@@ -84,9 +84,11 @@
* expires, but the server is still running. After the network is healed,
* and it's server logs are recovered, it will be told to call server startup
* because by then, its regions have probably been reassigned.
+ *
+ * newSetFromMap is means of making a Set out of a Map.
*/
protected final Set deadServers =
- Collections.synchronizedSet(new HashSet());
+ Collections.newSetFromMap(new ConcurrentHashMap());
/** SortedMap server load -> Set of server names */
final SortedMap> loadToServers =
@@ -151,45 +153,32 @@
/**
* Let the server manager know a new regionserver has come online
* @param serverInfo
- * @throws Leases.LeaseStillHeldException
+ * @throws IOException
*/
public void regionServerStartup(final HServerInfo serverInfo)
- throws Leases.LeaseStillHeldException {
+ throws IOException {
+ // Test for case where we get a region startup message from a regionserver
+ // that has been quickly restarted but whose znode expiration handler has
+ // not yet run, or from a server whose fail we are currently processing.
HServerInfo info = new HServerInfo(serverInfo);
- String serverName = info.getServerName();
- if (serversToServerInfo.containsKey(serverName) ||
- deadServers.contains(serverName)) {
- LOG.debug("Server start was rejected: " + serverInfo);
- LOG.debug("serversToServerInfo.containsKey: " + serversToServerInfo.containsKey(serverName));
- LOG.debug("deadServers.contains: " + deadServers.contains(serverName));
- throw new Leases.LeaseStillHeldException(serverName);
- }
-
- LOG.info("Received start message from: " + serverName);
- // Go on to process the regionserver registration.
- HServerLoad load = serversToLoad.remove(serverName);
- if (load != null) {
- // The startup message was from a known server.
- // Remove stale information about the server's load.
- synchronized (loadToServers) {
- Set servers = loadToServers.get(load);
- if (servers != null) {
- servers.remove(serverName);
- if (servers.size() > 0)
- loadToServers.put(load, servers);
- else
- loadToServers.remove(load);
- }
+ String hostAndPort = info.getServerAddress().toString();
+ HServerInfo existingServer =
+ this.serverAddressToServerInfo.get(info.getServerAddress());
+ if (existingServer != null) {
+ LOG.info("Server start rejected; we already have " + hostAndPort +
+ " registered; existingServer=" + existingServer + ", newServer=" + info);
+ if (existingServer.getStartCode() < info.getStartCode()) {
+ LOG.info("Triggering server recovery; existingServer looks stale");
+ expireServer(existingServer);
}
+ throw new Leases.LeaseStillHeldException(hostAndPort);
}
- HServerInfo storedInfo = serversToServerInfo.remove(serverName);
- if (storedInfo != null && !master.closed.get()) {
- // The startup message was from a known server with the same name.
- // Timeout the old one right away.
- master.getRootRegionLocation();
- RegionServerOperation op = new ProcessServerShutdown(master, storedInfo);
- this.master.getRegionServerOperationQueue().put(op);
+ if (isDead(hostAndPort, true)) {
+ LOG.debug("Server start rejected; currently processing " + hostAndPort +
+ " failure");
+ throw new Leases.LeaseStillHeldException(hostAndPort);
}
+ LOG.info("Received start message from: " + info.getServerName());
recordNewServer(info);
}
@@ -214,7 +203,7 @@
info.setLoad(load);
// We must set this watcher here because it can be set on a fresh start
// or on a failover
- Watcher watcher = new ServerExpirer(serverName, info.getServerAddress());
+ Watcher watcher = new ServerExpirer(new HServerInfo(info));
master.getZooKeeperWrapper().updateRSLocationGetWatch(info, watcher);
serversToServerInfo.put(serverName, info);
serverAddressToServerInfo.put(info.getServerAddress(), info);
@@ -313,7 +302,6 @@
synchronized (serversToServerInfo) {
removeServerInfo(info.getServerName(), info.getServerAddress());
- serversToServerInfo.notifyAll();
}
return new HMsg[] {REGIONSERVER_STOP};
@@ -330,57 +318,47 @@
*/
private void processRegionServerExit(HServerInfo serverInfo, HMsg[] msgs) {
assert msgs[0].getType() == Type.MSG_REPORT_EXITING;
-
synchronized (serversToServerInfo) {
- try {
- // This method removes ROOT/META from the list and marks them to be reassigned
- // in addition to other housework.
- if (removeServerInfo(serverInfo.getServerName(),
- serverInfo.getServerAddress())) {
- // Only process the exit message if the server still has registered info.
- // Otherwise we could end up processing the server exit twice.
- LOG.info("Region server " + serverInfo.getServerName() +
- ": MSG_REPORT_EXITING");
- // Get all the regions the server was serving reassigned
- // (if we are not shutting down).
- if (!master.closed.get()) {
- for (int i = 1; i < msgs.length; i++) {
- LOG.info("Processing " + msgs[i] + " from " +
- serverInfo.getServerName());
- assert msgs[i].getType() == Type.MSG_REGION_CLOSE;
- HRegionInfo info = msgs[i].getRegionInfo();
- // Meta/root region offlining is handed in removeServerInfo above.
- if (!info.isMetaRegion()) {
- synchronized (master.regionManager) {
- if (!master.regionManager.isOfflined(
- info.getRegionNameAsString())) {
- master.regionManager.setUnassigned(info, true);
- } else {
- master.regionManager.removeRegion(info);
- }
+ // This method removes ROOT/META from the list and marks them to be
+ // reassigned in addition to other housework.
+ if (removeServerInfo(serverInfo.getServerName(), serverInfo.getServerAddress())) {
+ // Only process the exit message if the server still has registered info.
+ // Otherwise we could end up processing the server exit twice.
+ LOG.info("Region server " + serverInfo.getServerName() +
+ ": MSG_REPORT_EXITING");
+ // Get all the regions the server was serving reassigned
+ // (if we are not shutting down).
+ if (!master.closed.get()) {
+ for (int i = 1; i < msgs.length; i++) {
+ LOG.info("Processing " + msgs[i] + " from " +
+ serverInfo.getServerName());
+ assert msgs[i].getType() == Type.MSG_REGION_CLOSE;
+ HRegionInfo info = msgs[i].getRegionInfo();
+ // Meta/root region offlining is handed in removeServerInfo above.
+ if (!info.isMetaRegion()) {
+ synchronized (master.regionManager) {
+ if (!master.regionManager.isOfflined(info.getRegionNameAsString())) {
+ master.regionManager.setUnassigned(info, true);
+ } else {
+ master.regionManager.removeRegion(info);
}
}
}
}
-
- // There should not be any regions in transition for this server - the
- // server should finish transitions itself before closing
- Map inTransition =
- master.regionManager.getRegionsInTransitionOnServer(
- serverInfo.getServerName());
- for (Map.Entry entry : inTransition.entrySet()) {
- LOG.warn("Region server " + serverInfo.getServerName() +
- " shut down with region " + entry.getKey() + " in transition " +
- "state " + entry.getValue());
- master.regionManager.setUnassigned(entry.getValue().getRegionInfo(), true);
- }
}
- // We don't need to return anything to the server because it isn't
- // going to do any more work.
- } finally {
- serversToServerInfo.notifyAll();
+ // There should not be any regions in transition for this server - the
+ // server should finish transitions itself before closing
+ Map inTransition = master.regionManager
+ .getRegionsInTransitionOnServer(serverInfo.getServerName());
+ for (Map.Entry entry : inTransition.entrySet()) {
+ LOG.warn("Region server " + serverInfo.getServerName()
+ + " shut down with region " + entry.getKey() + " in transition "
+ + "state " + entry.getValue());
+ master.regionManager.setUnassigned(entry.getValue().getRegionInfo(),
+ true);
+ }
}
- }
+ }
}
/**
@@ -824,44 +802,59 @@
/** Watcher triggered when a RS znode is deleted */
private class ServerExpirer implements Watcher {
- private String server;
- private HServerAddress serverAddress;
+ private HServerInfo server;
- ServerExpirer(String server, HServerAddress serverAddress) {
- this.server = server;
- this.serverAddress = serverAddress;
+ ServerExpirer(final HServerInfo hsi) {
+ this.server = hsi;
}
public void process(WatchedEvent event) {
- if (event.getType().equals(EventType.NodeDeleted)) {
- LOG.info(server + " znode expired");
- // Remove the server from the known servers list and update load info
- serverAddressToServerInfo.remove(serverAddress);
- HServerInfo info = serversToServerInfo.remove(server);
- if (info != null) {
- String serverName = info.getServerName();
- HServerLoad load = serversToLoad.remove(serverName);
- if (load != null) {
- synchronized (loadToServers) {
- Set servers = loadToServers.get(load);
- if (servers != null) {
- servers.remove(serverName);
- if(servers.size() > 0)
- loadToServers.put(load, servers);
- else
- loadToServers.remove(load);
- }
- }
- }
- deadServers.add(server);
- RegionServerOperation op = new ProcessServerShutdown(master, info);
- master.getRegionServerOperationQueue().put(op);
+ if (!event.getType().equals(EventType.NodeDeleted)) return;
+ LOG.info(this.server.getServerName() + " znode expired");
+ expireServer(this.server);
+ }
+ }
+
+ /*
+ * Expire the passed server. Add it to list of deadservers and queue a
+ * shutdown processing.
+ * @param server Servername (format is host_port_startcode)
+ * @param serverAddress
+ */
+ private synchronized void expireServer(final HServerInfo hsi) {
+ // First check a server to expire.
+ String serverName = hsi.getServerName();
+ HServerInfo info = this.serversToServerInfo.get(serverName);
+ if (info == null) {
+ LOG.warn("No HServerInfo for " + serverName);
+ return;
+ }
+ if (this.deadServers.contains(serverName)) {
+ LOG.warn("Already processing shutdown of " + serverName);
+ return;
+ }
+ // Remove the server from the known servers lists and update load info
+ this.serverAddressToServerInfo.remove(info.getServerAddress());
+ this.serversToServerInfo.remove(serverName);
+ HServerLoad load = this.serversToLoad.remove(serverName);
+ if (load != null) {
+ synchronized (this.loadToServers) {
+ Set servers = this.loadToServers.get(load);
+ if (servers != null) {
+ servers.remove(serverName);
+ if (servers.size() > 0)
+ this.loadToServers.put(load, servers);
+ else
+ this.loadToServers.remove(load);
}
- synchronized (serversToServerInfo) {
- serversToServerInfo.notifyAll();
- }
}
}
+ // Add to dead servers and queue a shutdown processing.
+ LOG.debug("Added=" + serverName +
+ " to dead servers, added shutdown processing operation");
+ this.deadServers.add(serverName);
+ this.master.getRegionServerOperationQueue().
+ put(new ProcessServerShutdown(master, info));
}
/**
@@ -875,10 +868,25 @@
* @param serverName
* @return true if server is dead
*/
- public boolean isDead(String serverName) {
- return deadServers.contains(serverName);
+ public boolean isDead(final String serverName) {
+ return isDead(serverName, false);
}
+ /**
+ * @param serverName Servername as either host:port or host_port_startcode.
+ * @param hostAndPortOnly True if serverName is host and
+ * port only (host:port) and if so, then we do a prefix compare (ignoring
+ * start codes) looking for dead server.
+ * @return true if server is dead
+ */
+ boolean isDead(final String serverName, final boolean hostAndPortOnly) {
+ if (!hostAndPortOnly) return deadServers.contains(serverName);
+ for (String hostPortStartCode: this.deadServers) {
+ if (hostPortStartCode.startsWith(serverName)) return true;
+ }
+ return false;
+ }
+
public boolean canAssignUserRegions() {
if (minimumServerCount == 0) {
return true;
@@ -889,4 +897,4 @@
public void setMinimumServerCount(int minimumServerCount) {
this.minimumServerCount = minimumServerCount;
}
-}
\ No newline at end of file
+}
Index: src/java/org/apache/hadoop/hbase/master/RegionManager.java
===================================================================
--- src/java/org/apache/hadoop/hbase/master/RegionManager.java (revision 941538)
+++ src/java/org/apache/hadoop/hbase/master/RegionManager.java (working copy)
@@ -1434,7 +1434,7 @@
}
// check if current server is overloaded
- int numRegionsToClose = balanceFromOverloaded(servLoad, avg);
+ int numRegionsToClose = balanceFromOverloaded(info, servLoad, avg);
// check if we can unload server by low loaded servers
if(numRegionsToClose <= 0) {
@@ -1456,13 +1456,14 @@
* Check if server load is not overloaded (with load > avgLoadPlusSlop).
* @return number of regions to unassign.
*/
- private int balanceFromOverloaded(HServerLoad srvLoad, double avgLoad) {
+ private int balanceFromOverloaded(final HServerInfo info,
+ final HServerLoad srvLoad, final double avgLoad) {
int avgLoadPlusSlop = (int)Math.ceil(avgLoad * (1 + this.slop));
int numSrvRegs = srvLoad.getNumberOfRegions();
if (numSrvRegs > avgLoadPlusSlop) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Server is overloaded: load=" + numSrvRegs +
- ", avg=" + avgLoad + ", slop=" + this.slop);
+ LOG.debug("Server " + info.getServerName() + " is overloaded: load=" +
+ numSrvRegs + ", avg=" + avgLoad + ", slop=" + this.slop);
}
return numSrvRegs - (int)Math.ceil(avgLoad);
}
Index: src/java/org/apache/hadoop/hbase/master/RegionServerOperation.java
===================================================================
--- src/java/org/apache/hadoop/hbase/master/RegionServerOperation.java (revision 941538)
+++ src/java/org/apache/hadoop/hbase/master/RegionServerOperation.java (working copy)
@@ -39,7 +39,9 @@
protected RegionServerOperation(HMaster master) {
this.master = master;
this.numRetries = master.numRetries;
- this.expirationDuration = this.master.leaseTimeout/2;
+ // this.master.leaseTimeout is 120 by default. 120/10 is a long time to
+ // wait on something coming around again. Set a max of 10 seconds to wait.
+ this.expirationDuration = Math.min(this.master.leaseTimeout/10, 10);
resetExpiration();
}