Index: src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java (revision 1232287) +++ src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java (working copy) @@ -72,7 +72,7 @@ ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "testInterruptible", abortable); final TestTracker tracker = new TestTracker(zk, "/xyz", abortable); - tracker.start(); + tracker.start(true); Thread t = new Thread() { @Override public void run() { @@ -105,7 +105,7 @@ // Start a ZKNT with no node currently available TestTracker localTracker = new TestTracker(zk, node, abortable); - localTracker.start(); + localTracker.start(true); zk.registerListener(localTracker); // Make sure we don't have a node @@ -120,7 +120,7 @@ // Now, start a new ZKNT with the node already available TestTracker secondTracker = new TestTracker(zk, node, null); - secondTracker.start(); + secondTracker.start(true); zk.registerListener(secondTracker); // Put up an additional zk listener so we know when zk event is done @@ -213,7 +213,7 @@ public WaitToGetDataThread(ZooKeeperWatcher zk, String node) { tracker = new TestTracker(zk, node, null); - tracker.start(); + tracker.start(true); zk.registerListener(tracker); hasData = false; } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java (revision 1232287) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java (working copy) @@ -64,7 +64,7 @@ // Should not have a master yet MasterAddressTracker addressManager = new MasterAddressTracker(zk, null); - addressManager.start(); + addressManager.start(true); assertFalse(addressManager.hasMaster()); zk.registerListener(addressManager); Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java (revision 1232287) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java (working copy) @@ -169,7 +169,7 @@ } }); - masterTracker.start(); + masterTracker.start(true); zkw.registerListener(masterTracker); // Test (part of the) output that should have be printed by master when it aborts: Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java (revision 1232287) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java (working copy) @@ -183,7 +183,7 @@ } }); - masterTracker.start(); + masterTracker.start(true); zkw.registerListener(masterTracker); // Test (part of the) output that should have be printed by master when it aborts: Index: src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1232287) +++ src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -43,6 +43,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -56,6 +57,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.KeyOnlyFilter; @@ -65,13 +67,12 @@ import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.WhileMatchFilter; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -4308,4 +4309,34 @@ System.currentTimeMillis() + ", cur=" + store.getNumberOfstorefiles()); assertEquals(count, store.getNumberOfstorefiles()); } + + /** + * Test HConnection can be recovered after this connection has been + * aborted. + * @throws IOException + */ + @Test + public void testConnectionResetAfterAbort() throws IOException { + final byte[] COLUMN_FAMILY = Bytes.toBytes("columnfam"); + final byte[] COLUMN = Bytes.toBytes("col"); + HTable table = TEST_UTIL.createTable( + Bytes.toBytes("testConnectionRecover"), new byte[][] { COLUMN_FAMILY }); + Put put01 = new Put(Bytes.toBytes("testrow1")); + put01.add(COLUMN_FAMILY, COLUMN, Bytes.toBytes("testValue")); + table.put(put01); + + // At this time, abort the connection. + HConnection conn = table.getConnection(); + conn.abort("Test Connection Abort", new KeeperException.ConnectionLossException()); + boolean putSuccess = true; + // This put will success, for the connection has been recovered. + try { + Put put02 = new Put(Bytes.toBytes("testrow1")); + put02.add(COLUMN_FAMILY, COLUMN, Bytes.toBytes("testValue")); + table.put(put02); + } catch (IOException ioe) { + putSuccess = false; + } + assertTrue(putSuccess); + } } Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 1232287) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (working copy) @@ -220,6 +220,14 @@ public void registerListener(ZooKeeperListener listener) { listeners.add(listener); } + + /** + * Unregister the specified listener. + * @param listener + */ + public void unregisterListener(ZooKeeperListener listener) { + listeners.remove(listener); + } /** * Register the specified listener to receive ZooKeeper events and add it as Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java (revision 1232287) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java (working copy) @@ -69,8 +69,12 @@ * *
Use {@link #blockUntilAvailable()} to block until the node is available
* or {@link #getData(boolean)} to get the data of the node if it is available.
+ *
+ * @param allowAbort If allowAbort is false, the abortable should not abort when a
+ * KeeperException occur.
+ * @return start result. true if start successfully.
*/
- public synchronized void start() {
+ public synchronized boolean start(boolean allowAbort) {
this.watcher.registerListener(this);
try {
if(ZKUtil.watchAndCheckExists(watcher, node)) {
@@ -80,15 +84,21 @@
} else {
// It existed but now does not, try again to ensure a watch is set
LOG.debug("Try starting again because there is no data from " + node);
- start();
+ return start(allowAbort);
}
}
+ return true;
} catch (KeeperException e) {
- abortable.abort("Unexpected exception during initialization, aborting", e);
+ if (allowAbort && (abortable != null)) {
+ abortable.abort("Unexpected exception during initialization, aborting",
+ e);
+ }
+ return false;
}
}
public synchronized void stop() {
+ this.watcher.unregisterListener(this);
this.stopped = true;
notifyAll();
}
@@ -173,7 +183,9 @@
nodeDeleted(path);
}
} catch(KeeperException e) {
- abortable.abort("Unexpected exception handling nodeCreated event", e);
+ if (abortable != null) {
+ abortable.abort("Unexpected exception handling nodeCreated event", e);
+ }
}
}
@@ -187,7 +199,9 @@
this.data = null;
}
} catch(KeeperException e) {
- abortable.abort("Unexpected exception handling nodeDeleted event", e);
+ if (abortable != null) {
+ abortable.abort("Unexpected exception handling nodeDeleted event", e);
+ }
}
}
}
Index: src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (revision 1232287)
+++ src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (working copy)
@@ -626,7 +626,7 @@
}
});
- rootRegionTracker.start();
+ rootRegionTracker.start(true);
ServerName sn = null;
try {
sn = rootRegionTracker.getRootRegionLocation();
Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1232287)
+++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy)
@@ -566,35 +566,70 @@
HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
- setupZookeeperTrackers();
+ setupZookeeperTrackers(true);
this.master = null;
this.masterChecked = false;
}
- private synchronized void setupZookeeperTrackers()
+ private synchronized boolean setupZookeeperTrackers(boolean allowAbort)
throws ZooKeeperConnectionException{
// initialize zookeeper and master address manager
this.zooKeeper = getZooKeeperWatcher();
- masterAddressTracker = new MasterAddressTracker(this.zooKeeper, this);
- masterAddressTracker.start();
+ this.masterAddressTracker = new MasterAddressTracker(this.zooKeeper, this);
this.rootRegionTracker = new RootRegionTracker(this.zooKeeper, this);
- this.rootRegionTracker.start();
-
+ if (!this.masterAddressTracker.start(allowAbort)) {
+ this.masterAddressTracker.stop();
+ this.masterAddressTracker = null;
+ this.zooKeeper = null;
+ return false;
+ }
+ if (!this.rootRegionTracker.start(allowAbort)) {
+ this.masterAddressTracker.stop();
+ this.rootRegionTracker.stop();
+ this.masterAddressTracker = null;
+ this.rootRegionTracker = null;
+ this.zooKeeper = null;
+ return false;
+ }
this.clusterId = new ClusterId(this.zooKeeper, this);
+ return true;
}
- private synchronized void resetZooKeeperTrackers()
+ @Override
+ public synchronized void resetZooKeeperTrackersWithRetries()
throws ZooKeeperConnectionException {
LOG.info("Trying to reconnect to zookeeper");
- masterAddressTracker.stop();
- masterAddressTracker = null;
- rootRegionTracker.stop();
- rootRegionTracker = null;
- clusterId = null;
+ if (this.masterAddressTracker != null) {
+ this.masterAddressTracker.stop();
+ this.masterAddressTracker = null;
+ }
+ if (this.rootRegionTracker != null) {
+ this.rootRegionTracker.stop();
+ this.rootRegionTracker = null;
+ }
this.zooKeeper = null;
- setupZookeeperTrackers();
+ this.clusterId = null;
+ for (int tries = 0; tries < this.numRetries; tries++) {
+ boolean isLastTime = (tries == (this.numRetries - 1));
+ try {
+ if (setupZookeeperTrackers(isLastTime)) {
+ break;
+ }
+ } catch (ZooKeeperConnectionException zkce) {
+ if (isLastTime) {
+ throw zkce;
+ }
+ }
+ LOG.info("Tried to reconnect to zookeeper but failed, already tried "
+ + tries + " times.");
+ try {
+ Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ }
+ }
}
public Configuration getConfiguration() {
@@ -791,7 +826,9 @@
private HRegionLocation locateRegion(final byte [] tableName,
final byte [] row, boolean useCache)
throws IOException {
- if (this.closed) throw new IOException(toString() + " closed");
+ if (this.closed) {
+ throw new ClosedConnectionException(toString() + " closed");
+ }
if (tableName == null || tableName.length == 0) {
throw new IllegalArgumentException(
"table name cannot be null or zero length");
@@ -1013,7 +1050,8 @@
((metaLocation == null)? "null": "{" + metaLocation + "}") +
", attempt=" + tries + " of " +
this.numRetries + " failed; retrying after sleep of " +
- getPauseTime(tries) + " because: " + e.getMessage());
+ ConnectionUtils.getPauseTime(this.pause, tries) + " because: "
+ + e.getMessage());
}
} else {
throw e;
@@ -1316,6 +1354,9 @@
public tries retries
+ */
+ public static long getPauseTime(final long pause, final int tries) {
+ int ntries = tries;
+ if (ntries >= HConstants.RETRY_BACKOFF.length) {
+ ntries = HConstants.RETRY_BACKOFF.length - 1;
+ }
+ return pause * HConstants.RETRY_BACKOFF[ntries];
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1232287)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -552,13 +552,13 @@
// block until a master is available. No point in starting up if no master
// running.
this.masterAddressManager = new MasterAddressTracker(this.zooKeeper, this);
- this.masterAddressManager.start();
+ this.masterAddressManager.start(true);
blockAndCheckIfStopped(this.masterAddressManager);
// Wait on cluster being up. Master will set this flag up in zookeeper
// when ready.
this.clusterStatusTracker = new ClusterStatusTracker(this.zooKeeper, this);
- this.clusterStatusTracker.start();
+ this.clusterStatusTracker.start(true);
blockAndCheckIfStopped(this.clusterStatusTracker);
// Create the catalog tracker and start it;
Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1232287)
+++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy)
@@ -394,7 +394,7 @@
// Set the cluster as up. If new RSs, they'll be waiting on this before
// going ahead with their startup.
this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this);
- this.clusterStatusTracker.start();
+ this.clusterStatusTracker.start(true);
boolean wasUp = this.clusterStatusTracker.isClusterUp();
if (!wasUp) this.clusterStatusTracker.setClusterUp();
Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1232287)
+++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy)
@@ -162,7 +162,7 @@
// Set a tracker on replicationStateNodeNode
this.statusTracker =
new ReplicationStatusTracker(this.zookeeper, abortable);
- statusTracker.start();
+ statusTracker.start(true);
readReplicationStateZnode();
}
Index: src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java (revision 1232287)
+++ src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java (working copy)
@@ -230,8 +230,8 @@
public void start() throws IOException, InterruptedException {
LOG.debug("Starting catalog tracker " + this);
try {
- this.rootRegionTracker.start();
- this.metaNodeTracker.start();
+ this.rootRegionTracker.start(true);
+ this.metaNodeTracker.start(true);
} catch (RuntimeException e) {
Throwable t = e.getCause();
this.abortable.abort(e.getMessage(), t);