tries retries
+ */
+ public static long getPauseTime(final long pause, final int tries) {
+ int ntries = tries;
+ if (ntries >= HConstants.RETRY_BACKOFF.length) {
+ ntries = HConstants.RETRY_BACKOFF.length - 1;
+ }
+ return pause * HConstants.RETRY_BACKOFF[ntries];
+ }
+}
Property changes on: src\main\java\org\apache\hadoop\hbase\client\ConnectionUtils.java
___________________________________________________________________
Added: svn:needs-lock
+ *
Index: src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (revision 1231476)
+++ src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (working copy)
@@ -107,7 +107,7 @@
this.connection = HConnectionManager.getConnection(this.conf);
}
try { // Sleep
- Thread.sleep(getPauseTime(tries));
+ Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// we should delete connection between client and zookeeper
@@ -230,14 +230,6 @@
return this.connection.getHTableDescriptor(tableName);
}
- private long getPauseTime(int tries) {
- int triesCount = tries;
- if (triesCount >= HConstants.RETRY_BACKOFF.length) {
- triesCount = HConstants.RETRY_BACKOFF.length - 1;
- }
- return this.pause * HConstants.RETRY_BACKOFF[triesCount];
- }
-
/**
* Creates a new table.
* Synchronous operation.
@@ -357,7 +349,7 @@
" of " + numRegs + " regions are online; retries exhausted.");
}
try { // Sleep
- Thread.sleep(getPauseTime(tries));
+ Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted when opening" +
" regions; " + actualRegCount.get() + " of " + numRegs +
@@ -470,7 +462,7 @@
}
}
try {
- Thread.sleep(getPauseTime(tries));
+ Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
} catch (InterruptedException e) {
// continue
}
@@ -505,7 +497,7 @@
if (enabled) {
break;
}
- long sleep = getPauseTime(tries);
+ long sleep = ConnectionUtils.getPauseTime(this.pause, tries);
if (LOG.isDebugEnabled()) {
LOG.debug("Sleeping= " + sleep + "ms, waiting for all regions to be " +
"enabled in " + Bytes.toString(tableName));
@@ -601,7 +593,7 @@
if (disabled) {
break;
}
- long sleep = getPauseTime(tries);
+ long sleep = ConnectionUtils.getPauseTime(this.pause, tries);
if (LOG.isDebugEnabled()) {
LOG.debug("Sleeping= " + sleep + "ms, waiting for all regions to be " +
"disabled in " + Bytes.toString(tableName));
Index: src/main/java/org/apache/hadoop/hbase/client/HConnection.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/HConnection.java (revision 1231476)
+++ src/main/java/org/apache/hadoop/hbase/client/HConnection.java (working copy)
@@ -141,6 +141,12 @@
* Allows flushing the region cache.
*/
public void clearRegionCache();
+
+ /**
+ * Closes the original connection and creates a new one.
+ * @throws ZooKeeperConnectionException if unable to connect to zookeeper
+ */
+ public void resetZooKeeperTrackersWithRetries() throws ZooKeeperConnectionException;
/**
* Allows flushing the region cache of all locations that pertain to
Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1231476)
+++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy)
@@ -29,8 +29,8 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.NoSuchElementException;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeSet;
@@ -511,46 +511,67 @@
HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
- setupZookeeperTrackers();
+ setupZookeeperTrackers(true);
this.master = null;
this.masterChecked = false;
}
- private synchronized void setupZookeeperTrackers()
+ private boolean setupZookeeperTrackers(boolean allowAbort)
throws ZooKeeperConnectionException{
// initialize zookeeper and master address manager
this.zooKeeper = getZooKeeperWatcher();
- masterAddressTracker = new MasterAddressTracker(this.zooKeeper, this);
- masterAddressTracker.start();
-
+ this.masterAddressTracker = new MasterAddressTracker(this.zooKeeper, this);
this.rootRegionTracker = new RootRegionTracker(this.zooKeeper, this);
- this.rootRegionTracker.start();
+ if (!this.masterAddressTracker.start(allowAbort)) {
+ this.masterAddressTracker.stop();
+ return false;
+ }
+ if (!this.rootRegionTracker.start(allowAbort)) {
+ this.masterAddressTracker.stop();
+ this.rootRegionTracker.stop();
+ return false;
+ }
+ return true;
}
- private synchronized void resetZooKeeperTrackers()
+ @Override
+ public synchronized void resetZooKeeperTrackersWithRetries()
throws ZooKeeperConnectionException {
- LOG.info("Trying to reconnect to zookeeper");
- masterAddressTracker.stop();
- masterAddressTracker = null;
- rootRegionTracker.stop();
- rootRegionTracker = null;
- this.zooKeeper = null;
- setupZookeeperTrackers();
+ LOG.info("Trying to reconnect to zookeeper.");
+ if (this.masterAddressTracker != null) {
+ this.masterAddressTracker.stop();
+ }
+ if (this.rootRegionTracker != null) {
+ this.rootRegionTracker.stop();
+ }
+ for (int tries = 0; tries < this.numRetries; tries++) {
+ boolean isLastTime = (tries == (this.numRetries - 1));
+ try {
+ this.masterAddressTracker = null;
+ this.rootRegionTracker = null;
+ this.zooKeeper = null;
+ if (setupZookeeperTrackers(isLastTime)) {
+ break;
+ }
+ } catch (ZooKeeperConnectionException zkce) {
+ if (isLastTime) {
+ throw zkce;
+ }
+ }
+ LOG.info("Tried to reconnect to zookeeper but failed, already tried " + tries + " times.");
+ try {
+ Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ }
+ }
}
public Configuration getConfiguration() {
return this.conf;
}
- private long getPauseTime(int tries) {
- int ntries = tries;
- if (ntries >= HConstants.RETRY_BACKOFF.length) {
- ntries = HConstants.RETRY_BACKOFF.length - 1;
- }
- return this.pause * HConstants.RETRY_BACKOFF[ntries];
- }
-
public HMasterInterface getMaster()
throws MasterNotRunningException, ZooKeeperConnectionException {
@@ -593,14 +614,16 @@
" failed; no more retrying.", e);
break;
}
- LOG.info("getMaster attempt " + tries + " of " + this.numRetries +
- " failed; retrying after sleep of " +
- getPauseTime(tries), e);
+ LOG.info(
+ "getMaster attempt " + tries + " of " + this.numRetries
+ + " failed; retrying after sleep of "
+ + ConnectionUtils.getPauseTime(this.pause, tries), e);
}
// Cannot connect to master or it is not running. Sleep & retry
try {
- this.masterLock.wait(getPauseTime(tries));
+ this.masterLock.wait(ConnectionUtils
+ .getPauseTime(this.pause, tries));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Thread was interrupted while trying to connect to master.");
@@ -787,7 +810,9 @@
private HRegionLocation locateRegion(final byte [] tableName,
final byte [] row, boolean useCache)
throws IOException {
- if (this.closed) throw new IOException(toString() + " closed");
+ if (this.closed) {
+ throw new ClosedConnectionException(toString() + " closed");
+ }
if (tableName == null || tableName.length == 0) {
throw new IllegalArgumentException(
"table name cannot be null or zero length");
@@ -998,12 +1023,13 @@
}
if (tries < numRetries - 1) {
if (LOG.isDebugEnabled()) {
- LOG.debug("locateRegionInMeta parentTable=" +
- Bytes.toString(parentTable) + ", metaLocation=" +
- ((metaLocation == null)? "null": metaLocation) + ", attempt=" +
- tries + " of " +
- this.numRetries + " failed; retrying after sleep of " +
- getPauseTime(tries) + " because: " + e.getMessage());
+ LOG.debug("locateRegionInMeta parentTable="
+ + Bytes.toString(parentTable) + ", metaLocation="
+ + ((metaLocation == null) ? "null" : metaLocation)
+ + ", attempt=" + tries + " of " + this.numRetries
+ + " failed; retrying after sleep of "
+ + ConnectionUtils.getPauseTime(this.pause, tries)
+ + " because: " + e.getMessage());
}
} else {
throw e;
@@ -1015,7 +1041,7 @@
}
}
try{
- Thread.sleep(getPauseTime(tries));
+ Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Giving up trying to location region in " +
@@ -1231,6 +1257,9 @@
public Use {@link #blockUntilAvailable()} to block until the node is available - * or {@link #getData()} to get the data of the node if it is available. + * + *
+ * Use {@link #blockUntilAvailable()} to block until the node is available or + * {@link #getData()} to get the data of the node if it is available. + * + * @param allowAbort If allowAbort is false, the abortable should not abort when a + * KeeperException occur. + * @return start result. true if start successfully. */ - public synchronized void start() { + public synchronized boolean start(boolean allowAbort) { this.watcher.registerListener(this); try { if(ZKUtil.watchAndCheckExists(watcher, node)) { @@ -75,15 +80,21 @@ this.data = data; } else { // It existed but now does not, try again to ensure a watch is set - start(); + return start(allowAbort); } } + return true; } catch (KeeperException e) { - abortable.abort("Unexpected exception during initialization, aborting", e); + if (allowAbort && (abortable != null)) { + abortable.abort("Unexpected exception during initialization, aborting", + e); + } + return false; } } public synchronized void stop() { + this.watcher.unregisterListener(this); this.stopped = true; notifyAll(); } @@ -154,7 +165,9 @@ nodeDeleted(path); } } catch(KeeperException e) { - abortable.abort("Unexpected exception handling nodeCreated event", e); + if (abortable != null) { + abortable.abort("Unexpected exception handling nodeCreated event", e); + } } } @@ -168,7 +181,9 @@ this.data = null; } } catch(KeeperException e) { - abortable.abort("Unexpected exception handling nodeDeleted event", e); + if (abortable != null) { + abortable.abort("Unexpected exception handling nodeDeleted event", e); + } } } } Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 1231476) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (working copy) @@ -216,6 +216,14 @@ public void registerListener(ZooKeeperListener listener) { listeners.add(listener); } + + /** + * Unregister the specified listener. + * @param listener + */ + public void unregisterListener(ZooKeeperListener listener) { + listeners.remove(listener); + } /** * Register the specified listener to receive ZooKeeper events and add it as Index: src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1231476) +++ src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.KeyOnlyFilter; @@ -63,8 +64,8 @@ import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.WhileMatchFilter; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -4032,6 +4033,35 @@ queue.put(new Object()); } + /** + * Test HConnection can be recovered after this connection has been + * aborted. + * @throws IOException + */ + @Test + public void testConnectionResetAfterAbort() throws IOException { + final byte[] COLUMN_FAMILY = Bytes.toBytes("columnfam"); + final byte[] COLUMN = Bytes.toBytes("col"); + HTable table = TEST_UTIL.createTable( + Bytes.toBytes("testConnectionRecover"), new byte[][] { COLUMN_FAMILY }); + Put put01 = new Put(Bytes.toBytes("testrow1")); + put01.add(COLUMN_FAMILY, COLUMN, Bytes.toBytes("testValue")); + table.put(put01); + // At this time, abort the connection. + HConnection conn = table.getConnection(); + conn.abort("Test Connection Abort", new KeeperException.ConnectionLossException()); + boolean putSuccess = true; + // This put will success, for the connection has been recovered. + try { + Put put02 = new Put(Bytes.toBytes("testrow1")); + put02.add(COLUMN_FAMILY, COLUMN, Bytes.toBytes("testValue")); + table.put(put02); + } catch (IOException ioe) { + putSuccess = false; + } + assertTrue(putSuccess); + } + } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java (revision 1231476) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java (working copy) @@ -64,7 +64,7 @@ // Should not have a master yet MasterAddressTracker addressManager = new MasterAddressTracker(zk, null); - addressManager.start(); + addressManager.start(true); assertFalse(addressManager.hasMaster()); zk.registerListener(addressManager); Index: src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java (revision 1231476) +++ src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java (working copy) @@ -72,7 +72,7 @@ ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "testInterruptible", abortable); final TestTracker tracker = new TestTracker(zk, "/xyz", abortable); - tracker.start(); + tracker.start(true); Thread t = new Thread() { @Override public void run() { @@ -105,7 +105,7 @@ // Start a ZKNT with no node currently available TestTracker localTracker = new TestTracker(zk, node, abortable); - localTracker.start(); + localTracker.start(true); zk.registerListener(localTracker); // Make sure we don't have a node @@ -120,7 +120,7 @@ // Now, start a new ZKNT with the node already available TestTracker secondTracker = new TestTracker(zk, node, null); - secondTracker.start(); + secondTracker.start(true); zk.registerListener(secondTracker); // Put up an additional zk listener so we know when zk event is done @@ -213,7 +213,7 @@ public WaitToGetDataThread(ZooKeeperWatcher zk, String node) { tracker = new TestTracker(zk, node, null); - tracker.start(); + tracker.start(true); zk.registerListener(tracker); hasData = false; }