Index: src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1236487) +++ src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -36,7 +36,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -84,6 +83,7 @@ import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.DataInputBuffer; +import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -4574,7 +4574,38 @@ assertNotNull(addrAfter); assertTrue(addrAfter.getPort() != addrCache.getPort()); assertEquals(addrAfter.getPort(), addrNoCache.getPort()); - } + } + + /** + * Test HConnection can be recovered after this connection has been + * aborted. + * @throws IOException + */ + @Test + public void testConnectionResetAfterAbort() throws IOException { + final byte[] COLUMN_FAMILY = Bytes.toBytes("columnfam"); + final byte[] COLUMN = Bytes.toBytes("col"); + HTable table = TEST_UTIL.createTable( + Bytes.toBytes("testConnectionRecover"), new byte[][] { COLUMN_FAMILY }); + Put put01 = new Put(Bytes.toBytes("testrow1")); + put01.add(COLUMN_FAMILY, COLUMN, Bytes.toBytes("testValue")); + table.put(put01); + + // At this time, abort the connection. + HConnection conn = table.getConnection(); + conn.abort("Test Connection Abort", new KeeperException.ConnectionLossException()); + boolean putSuccess = true; + // This put will success, for the connection has been recovered. + try { + Put put02 = new Put(Bytes.toBytes("testrow1")); + put02.add(COLUMN_FAMILY, COLUMN, Bytes.toBytes("testValue")); + table.put(put02); + } catch (IOException ioe) { + putSuccess = false; + } + assertTrue(putSuccess); + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 1236487) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (working copy) @@ -225,6 +225,14 @@ public void registerListener(ZooKeeperListener listener) { listeners.add(listener); } + + /** + * Unregister the specified listener. + * @param listener + */ + public void unregisterListener(ZooKeeperListener listener) { + listeners.remove(listener); + } /** * Register the specified listener to receive ZooKeeper events and add it as Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java (revision 1236487) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java (working copy) @@ -89,6 +89,7 @@ } public synchronized void stop() { + this.watcher.unregisterListener(this); this.stopped = true; notifyAll(); } Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1236487) +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -503,6 +503,7 @@ private MasterAddressTracker masterAddressTracker; private RootRegionTracker rootRegionTracker; private ClusterId clusterId; + private boolean isResettingZKTrackers; private final Object metaRegionLock = new Object(); @@ -578,6 +579,7 @@ this.master = null; this.masterChecked = false; + isResettingZKTrackers = false; } private synchronized void setupZookeeperTrackers() @@ -593,16 +595,42 @@ this.clusterId = new ClusterId(this.zooKeeper, this); } - private synchronized void resetZooKeeperTrackers() + @Override + public synchronized void resetZooKeeperTrackersWithRetries() throws ZooKeeperConnectionException { LOG.info("Trying to reconnect to zookeeper"); - masterAddressTracker.stop(); - masterAddressTracker = null; - rootRegionTracker.stop(); - rootRegionTracker = null; - clusterId = null; - this.zooKeeper = null; - setupZookeeperTrackers(); + isResettingZKTrackers = true; + try { + if (this.masterAddressTracker != null) { + this.masterAddressTracker.stop(); + this.masterAddressTracker = null; + } + if (this.rootRegionTracker != null) { + this.rootRegionTracker.stop(); + this.rootRegionTracker = null; + } + this.zooKeeper = null; + this.clusterId = null; + for (int tries = 0; tries < this.numRetries; tries++) { + try { + setupZookeeperTrackers(); + break; + } catch (ZooKeeperConnectionException zkce) { + if (tries >= this.numRetries) { + throw zkce; + } + } + LOG.info("Tried to reconnect to zookeeper but failed, already tried " + + tries + " times."); + try { + Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + } + } + } finally { + isResettingZKTrackers = false; + } } public Configuration getConfiguration() { @@ -802,7 +830,9 @@ private HRegionLocation locateRegion(final byte [] tableName, final byte [] row, boolean useCache) throws IOException { - if (this.closed) throw new IOException(toString() + " closed"); + if (this.closed) { + throw new ClosedConnectionException(toString() + " closed"); + } if (tableName == null || tableName.length == 0) { throw new IllegalArgumentException( "table name cannot be null or zero length"); @@ -1024,7 +1054,8 @@ ((metaLocation == null)? "null": "{" + metaLocation + "}") + ", attempt=" + tries + " of " + this.numRetries + " failed; retrying after sleep of " + - ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage()); + ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + + e.getMessage()); } } else { throw e; @@ -1331,11 +1362,17 @@ public T getRegionServerWithRetries(ServerCallable callable) throws IOException, RuntimeException { + if (this.closed) { + throw new ClosedConnectionException(toString() + " closed"); + } return callable.withRetries(); } public T getRegionServerWithoutRetries(ServerCallable callable) throws IOException, RuntimeException { + if (this.closed) { + throw new ClosedConnectionException(toString() + " closed"); + } return callable.withoutRetries(); } @@ -1659,11 +1696,15 @@ @Override public void abort(final String msg, Throwable t) { - if (t instanceof KeeperException.SessionExpiredException) { + if (t instanceof KeeperException.SessionExpiredException + || t instanceof KeeperException.ConnectionLossException) { + if (isResettingZKTrackers) { + return; + } try { LOG.info("This client just lost it's session with ZooKeeper, trying" + " to reconnect."); - resetZooKeeperTrackers(); + resetZooKeeperTrackersWithRetries(); LOG.info("Reconnected successfully. This disconnect could have been" + " caused by a network partition or a long-running GC pause," + " either way it's recommended that you verify your environment."); Index: src/main/java/org/apache/hadoop/hbase/client/HConnection.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnection.java (revision 1236487) +++ src/main/java/org/apache/hadoop/hbase/client/HConnection.java (working copy) @@ -145,6 +145,12 @@ * Allows flushing the region cache. */ public void clearRegionCache(); + + /** + * Closes the original connection and creates a new one. + * @throws ZooKeeperConnectionException if unable to connect to zookeeper + */ + public void resetZooKeeperTrackersWithRetries() throws ZooKeeperConnectionException; /** * Allows flushing the region cache of all locations that pertain to Index: src/main/java/org/apache/hadoop/hbase/client/ClosedConnectionException.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ClosedConnectionException.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/ClosedConnectionException.java (revision 0) @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +/** + * Thrown when HConnection has been closed. + */ +public class ClosedConnectionException extends IOException { + private static final long serialVersionUID = 8792360655678089586L; + + public ClosedConnectionException() { + super(); + } + + public ClosedConnectionException(String s) { + super(s); + } +} Index: src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (revision 1236487) +++ src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (working copy) @@ -121,7 +121,7 @@ } try { - Thread.sleep(getPauseTime(tries)); + Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // we should delete connection between client and zookeeper @@ -301,14 +301,6 @@ return this.connection.getHTableDescriptor(tableName); } - private long getPauseTime(int tries) { - int triesCount = tries; - if (triesCount >= HConstants.RETRY_BACKOFF.length) { - triesCount = HConstants.RETRY_BACKOFF.length - 1; - } - return this.pause * HConstants.RETRY_BACKOFF[triesCount]; - } - /** * Creates a new table. * Synchronous operation. @@ -429,7 +421,7 @@ " of " + numRegs + " regions are online; retries exhausted."); } try { // Sleep - Thread.sleep(getPauseTime(tries)); + Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); } catch (InterruptedException e) { throw new InterruptedIOException("Interrupted when opening" + " regions; " + actualRegCount.get() + " of " + numRegs + @@ -557,7 +549,7 @@ } } try { - Thread.sleep(getPauseTime(tries)); + Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); } catch (InterruptedException e) { // continue } @@ -638,7 +630,7 @@ if (enabled) { break; } - long sleep = getPauseTime(tries); + long sleep = ConnectionUtils.getPauseTime(this.pause, tries); if (LOG.isDebugEnabled()) { LOG.debug("Sleeping= " + sleep + "ms, waiting for all regions to be " + "enabled in " + Bytes.toString(tableName)); @@ -779,7 +771,7 @@ if (disabled) { break; } - long sleep = getPauseTime(tries); + long sleep = ConnectionUtils.getPauseTime(this.pause, tries); if (LOG.isDebugEnabled()) { LOG.debug("Sleeping= " + sleep + "ms, waiting for all regions to be " + "disabled in " + Bytes.toString(tableName)); @@ -1575,6 +1567,7 @@ throws MasterNotRunningException, ZooKeeperConnectionException { Configuration copyOfConf = HBaseConfiguration.create(conf); copyOfConf.setInt("hbase.client.retries.number", 1); + copyOfConf.setInt("zookeeper.recovery.retry", 1); HBaseAdmin admin = new HBaseAdmin(copyOfConf); try { admin.close();