Index: src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java (revision 1006040)
+++ src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java (working copy)
@@ -25,8 +25,10 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.util.Random;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,6 +36,7 @@
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.master.TestActiveMasterManager.NodeDeletionListener;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -60,9 +63,31 @@
TEST_UTIL.shutdownMiniZKCluster();
}
+ @Test public void testInterruptible() throws IOException, InterruptedException {
+ Abortable abortable = new StubAbortable();
+ ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
+ "testInterruptible", abortable);
+ final TestTracker tracker = new TestTracker(zk, "/xyz", abortable);
+ tracker.start();
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ tracker.blockUntilAvailable();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted", e);
+ }
+ }
+ };
+ t.start();
+ while (!t.isAlive()) Threads.sleep(1);
+ tracker.stop();
+ t.join();
+ // If it wasn't interruptible, we'd never get to here.
+ }
+
@Test
public void testNodeTracker() throws Exception {
-
Abortable abortable = new StubAbortable();
ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
"testNodeTracker", abortable);
@@ -209,7 +234,6 @@
}
public static class TestTracker extends ZooKeeperNodeTracker {
-
public TestTracker(ZooKeeperWatcher watcher, String node,
Abortable abortable) {
super(watcher, node, abortable);
Index: src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java (revision 1006040)
+++ src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java (working copy)
@@ -19,6 +19,8 @@
*/
package org.apache.hadoop.hbase.catalog;
+import static org.junit.Assert.assertTrue;
+
import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
@@ -33,11 +35,11 @@
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
-import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.Result;
@@ -102,6 +104,32 @@
return ct;
}
+ @Test public void testInterruptWaitOnMetaAndRoot()
+ throws IOException, InterruptedException {
+ final CatalogTracker ct = constructAndStartCatalogTracker();
+ HServerAddress hsa = ct.getRootLocation();
+ Assert.assertNull(hsa);
+ HServerAddress meta = ct.getMetaLocation();
+ Assert.assertNull(meta);
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ ct.waitForMeta();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted", e);
+ }
+ }
+ };
+ t.start();
+ while (!t.isAlive()) Threads.sleep(1);
+ Threads.sleep(1);
+ assertTrue(t.isAlive());
+ ct.stop();
+ // Join the thread... should exit shortly.
+ t.join();
+ }
+
@Test public void testGetMetaServerConnectionFails()
throws IOException, InterruptedException, KeeperException {
HConnection connection = Mockito.mock(HConnection.class);
@@ -292,7 +320,7 @@
try {
doWaiting();
} catch (InterruptedException e) {
- throw new RuntimeException("Failed wait on root", e);
+ throw new RuntimeException("Failed wait", e);
}
LOG.info("Exiting " + getName());
}
Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java (revision 1006040)
+++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java (working copy)
@@ -41,6 +41,8 @@
/** Used to abort if a fatal error occurs */
protected final Abortable abortable;
+ private boolean stopped = false;
+
/**
* Constructs a new ZK node tracker.
*
@@ -81,6 +83,11 @@
}
}
+ public synchronized void stop() {
+ this.stopped = true;
+ notifyAll();
+ }
+
/**
* Gets the data of the node, blocking until the node is available.
*
@@ -107,7 +114,7 @@
boolean notimeout = timeout == 0;
long startTime = System.currentTimeMillis();
long remaining = timeout;
- while ((notimeout || remaining > 0) && this.data == null) {
+ while (!this.stopped && (notimeout || remaining > 0) && this.data == null) {
if (notimeout) {
wait();
continue;
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1006202)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -592,6 +592,7 @@
closeAllScanners();
LOG.info("stopping server at: " + this.serverInfo.getServerName());
}
+ this.catalogTracker.stop();
waitOnAllRegionsToClose();
// Make sure the proxy is down.
Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (revision 1006040)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (working copy)
@@ -65,7 +65,8 @@
@Override
public void process() throws IOException {
- LOG.debug("Processing open of " + regionInfo.getRegionNameAsString());
+ final String name = regionInfo.getRegionNameAsString();
+ LOG.debug("Processing open of " + name);
final String encodedName = regionInfo.getEncodedName();
// TODO: Previously we would check for root region availability (but only that it
@@ -76,7 +77,7 @@
// Check that this region is not already online
HRegion region = this.rsServices.getFromOnlineRegions(encodedName);
if (region != null) {
- LOG.warn("Attempting open of " + regionInfo.getRegionNameAsString() +
+ LOG.warn("Attempting open of " + name +
" but it's already online on this server");
return;
}
@@ -101,7 +102,7 @@
}
openingInteger.set(vsn);
} catch (KeeperException e) {
- server.abort("ZK exception refreshing OPENING node", e);
+ server.abort("ZK exception refreshing OPENING node; " + name, e);
}
}
});
@@ -115,7 +116,7 @@
ZKAssign.forceNodeOffline(server.getZooKeeper(), regionInfo,
server.getServerName());
} catch (KeeperException e1) {
- LOG.error("Error forcing node back to OFFLINE from OPENING");
+ LOG.error("Error forcing node back to OFFLINE from OPENING; " + name);
return;
}
return;
@@ -129,15 +130,15 @@
openingVersion)) == -1) {
LOG.warn("Completed the OPEN of a region but when transitioning from " +
" OPENING to OPENING got a version mismatch, someone else clashed " +
- "so now unassigning");
+ "so now unassigning; " + name);
region.close();
return;
}
} catch (KeeperException e) {
- LOG.error("Failed transitioning node from OPENING to OPENED", e);
+ LOG.error("Failed transitioning node from OPENING to OPENED; " + name, e);
return;
} catch (IOException e) {
- LOG.error("Failed to close region after failing to transition", e);
+ LOG.error("Failed to close region after failing to transition; " + name, e);
return;
}
Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1006040)
+++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy)
@@ -287,6 +287,7 @@
// Stop services started for both backup and active masters
this.activeMasterManager.stop();
+ this.catalogTracker.stop();
HConnectionManager.deleteConnection(this.conf, true);
this.zooKeeper.close();
LOG.info("HMaster main thread exiting");
Index: src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java (revision 1006219)
+++ src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java (working copy)
@@ -53,7 +53,8 @@
* the location of .META. If not available in -ROOT-,
* ZooKeeper is used to monitor for a new location of .META..
*
- *
Call {@link #start()} to start up operation. + *
Call {@link #start()} to start up operation. Call {@link #stop()}} to
+ * interrupt waits and close up shop.
*/
public class CatalogTracker {
private static final Log LOG = LogFactory.getLog(CatalogTracker.class);
@@ -64,6 +65,7 @@
private final AtomicBoolean metaAvailable = new AtomicBoolean(false);
private HServerAddress metaLocation;
private final int defaultTimeout;
+ private boolean stopped = false;
public static final byte [] ROOT_REGION =
HRegionInfo.ROOT_REGIONINFO.getRegionName();
@@ -130,6 +132,20 @@
}
/**
+ * Stop working.
+ * Interrupts any ongoing waits.
+ */
+ public void stop() {
+ this.stopped = true;
+ this.rootRegionTracker.stop();
+ this.metaNodeTracker.stop();
+ // Call this and it will interrupt any ongoing waits on meta.
+ synchronized (this.metaAvailable) {
+ this.metaAvailable.notifyAll();
+ }
+ }
+
+ /**
* Gets the current location for -ROOT- or null if location is
* not currently available.
* @return location of root, null if not available
@@ -275,7 +291,7 @@
*/
public void waitForMeta() throws InterruptedException {
synchronized(metaAvailable) {
- while (!metaAvailable.get()) {
+ while (!stopped && !metaAvailable.get()) {
metaAvailable.wait();
}
}
@@ -301,7 +317,7 @@
if (getMetaServerConnection(true) != null) {
return metaLocation;
}
- while(!metaAvailable.get() &&
+ while(!stopped && !metaAvailable.get() &&
(timeout == 0 || System.currentTimeMillis() < stop)) {
metaAvailable.wait(timeout);
}
@@ -486,4 +502,8 @@
MetaNodeTracker getMetaNodeTracker() {
return this.metaNodeTracker;
}
+
+ public HConnection getConnection() {
+ return this.connection;
+ }
}
\ No newline at end of file
Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1006219)
+++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy)
@@ -140,6 +140,14 @@
}
/**
+ * Delete connection information for the instance
+ * @param connection configuration
+ */
+ public static void deleteConnection(HConnection connection) {
+ deleteConnection(connection.getConfiguration(), false);
+ }
+
+ /**
* Delete information for all connections.
* @param stopProxy stop the proxy as well
* @throws IOException
@@ -231,17 +239,12 @@
public HConnectionImplementation(Configuration conf)
throws ZooKeeperConnectionException {
this.conf = conf;
-
- String serverClassName =
- conf.get(HConstants.REGION_SERVER_CLASS,
- HConstants.DEFAULT_REGION_SERVER_CLASS);
-
+ String serverClassName = conf.get(HConstants.REGION_SERVER_CLASS,
+ HConstants.DEFAULT_REGION_SERVER_CLASS);
this.closed = false;
-
try {
this.serverInterfaceClass =
(Class extends HRegionInterface>) Class.forName(serverClassName);
-
} catch (ClassNotFoundException e) {
throw new UnsupportedOperationException(
"Unable to find region server interface " + serverClassName, e);
@@ -271,6 +274,10 @@
this.masterChecked = false;
}
+ public Configuration getConfiguration() {
+ return this.conf;
+ }
+
@Override
public String toString() {
return this.identifier;
Index: src/main/java/org/apache/hadoop/hbase/client/HConnection.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/HConnection.java (revision 1006219)
+++ src/main/java/org/apache/hadoop/hbase/client/HConnection.java (working copy)
@@ -25,6 +25,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -42,6 +43,11 @@
*/
public interface HConnection extends Abortable {
/**
+ * @return Configuration instance being used by this HConnection instance.
+ */
+ public Configuration getConfiguration();
+
+ /**
* Retrieve ZooKeeperWatcher used by the connection.
* @return ZooKeeperWatcher handle being used by the connection.
* @throws IOException if a remote or network exception occurs
Index: src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (revision 1006040)
+++ src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (working copy)
@@ -66,11 +66,6 @@
private volatile Configuration conf;
private final long pause;
private final int numRetries;
- /**
- * Lazily instantiated. Use {@link #getCatalogTracker()} to ensure you get
- * an instance rather than a null.
- */
- private CatalogTracker catalogTracker = null;
/**
* Constructor
@@ -88,23 +83,34 @@
this.connection.getMaster();
}
+ /**
+ * @return A new CatalogTracker instance; call {@link #cleanupCatalogTracker(CatalogTracker)}
+ * to cleanup the returned catalog tracker.
+ * @throws ZooKeeperConnectionException
+ * @throws IOException
+ * @see #cleanupCatalogTracker(CatalogTracker);
+ */
private synchronized CatalogTracker getCatalogTracker()
throws ZooKeeperConnectionException, IOException {
- if (this.catalogTracker == null) {
- this.catalogTracker = new CatalogTracker(this.connection.getZooKeeperWatcher(),
- HConnectionManager.getConnection(conf), this,
- this.conf.getInt("hbase.admin.catalog.timeout", 10 * 1000));
- try {
- this.catalogTracker.start();
- } catch (InterruptedException e) {
- // Let it out as an IOE for now until we redo all so tolerate IEs
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted", e);
- }
+ CatalogTracker ct = null;
+ try {
+ HConnection connection =
+ HConnectionManager.getConnection(new Configuration(this.conf));
+ ct = new CatalogTracker(connection);
+ ct.start();
+ } catch (InterruptedException e) {
+ // Let it out as an IOE for now until we redo all so tolerate IEs
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted", e);
}
- return this.catalogTracker;
+ return ct;
}
+ private void cleanupCatalogTracker(final CatalogTracker ct) {
+ ct.stop();
+ HConnectionManager.deleteConnection(ct.getConnection());
+ }
+
@Override
public void abort(String why, Throwable e) {
// Currently does nothing but throw the passed message and exception
@@ -142,7 +148,14 @@
*/
public boolean tableExists(final String tableName)
throws IOException {
- return MetaReader.tableExists(getCatalogTracker(), tableName);
+ boolean b = false;
+ CatalogTracker ct = getCatalogTracker();
+ try {
+ b = MetaReader.tableExists(ct, tableName);
+ } finally {
+ cleanupCatalogTracker(ct);
+ }
+ return b;
}
/**
@@ -718,15 +731,20 @@
*/
public void closeRegion(final byte [] regionname, final String hostAndPort)
throws IOException {
- if (hostAndPort != null) {
- HServerAddress hsa = new HServerAddress(hostAndPort);
- Pair