Index: src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java (revision 1231476) +++ src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java (working copy) @@ -170,8 +170,8 @@ public void start() throws IOException, InterruptedException { try { - this.rootRegionTracker.start(); - this.metaNodeTracker.start(); + this.rootRegionTracker.start(true); + this.metaNodeTracker.start(true); LOG.debug("Starting catalog tracker " + this); }catch (RuntimeException e){ Throwable t = e.getCause(); Index: src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java (revision 0) @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.HConstants; + + +/** + * Utility used by client connections such as {@link HConnection} and + * {@link ServerCallable} + */ +public class ConnectionUtils { + /** + * Calculate pause time. + * Built on {@link HConstants#RETRY_BACKOFF}. + * @param pause + * @param tries + * @return How long to wait after tries retries + */ + public static long getPauseTime(final long pause, final int tries) { + int ntries = tries; + if (ntries >= HConstants.RETRY_BACKOFF.length) { + ntries = HConstants.RETRY_BACKOFF.length - 1; + } + return pause * HConstants.RETRY_BACKOFF[ntries]; + } +} Property changes on: src\main\java\org\apache\hadoop\hbase\client\ConnectionUtils.java ___________________________________________________________________ Added: svn:needs-lock + * Index: src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (revision 1231476) +++ src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (working copy) @@ -107,7 +107,7 @@ this.connection = HConnectionManager.getConnection(this.conf); } try { // Sleep - Thread.sleep(getPauseTime(tries)); + Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // we should delete connection between client and zookeeper @@ -230,14 +230,6 @@ return this.connection.getHTableDescriptor(tableName); } - private long getPauseTime(int tries) { - int triesCount = tries; - if (triesCount >= HConstants.RETRY_BACKOFF.length) { - triesCount = HConstants.RETRY_BACKOFF.length - 1; - } - return this.pause * HConstants.RETRY_BACKOFF[triesCount]; - } - /** * Creates a new table. * Synchronous operation. @@ -357,7 +349,7 @@ " of " + numRegs + " regions are online; retries exhausted."); } try { // Sleep - Thread.sleep(getPauseTime(tries)); + Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); } catch (InterruptedException e) { throw new InterruptedIOException("Interrupted when opening" + " regions; " + actualRegCount.get() + " of " + numRegs + @@ -470,7 +462,7 @@ } } try { - Thread.sleep(getPauseTime(tries)); + Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); } catch (InterruptedException e) { // continue } @@ -505,7 +497,7 @@ if (enabled) { break; } - long sleep = getPauseTime(tries); + long sleep = ConnectionUtils.getPauseTime(this.pause, tries); if (LOG.isDebugEnabled()) { LOG.debug("Sleeping= " + sleep + "ms, waiting for all regions to be " + "enabled in " + Bytes.toString(tableName)); @@ -601,7 +593,7 @@ if (disabled) { break; } - long sleep = getPauseTime(tries); + long sleep = ConnectionUtils.getPauseTime(this.pause, tries); if (LOG.isDebugEnabled()) { LOG.debug("Sleeping= " + sleep + "ms, waiting for all regions to be " + "disabled in " + Bytes.toString(tableName)); Index: src/main/java/org/apache/hadoop/hbase/client/HConnection.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnection.java (revision 1231476) +++ src/main/java/org/apache/hadoop/hbase/client/HConnection.java (working copy) @@ -141,6 +141,12 @@ * Allows flushing the region cache. */ public void clearRegionCache(); + + /** + * Close the original connection and create a new one. + * @throws ZooKeeperConnectionException if unable to connect to zookeeper + */ + public void resetConnection() throws ZooKeeperConnectionException; /** * Allows flushing the region cache of all locations that pertain to Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1231476) +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -29,8 +29,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.NoSuchElementException; -import java.util.Map.Entry; import java.util.Set; import java.util.SortedMap; import java.util.TreeSet; @@ -511,46 +511,60 @@ HConstants.HBASE_CLIENT_PREFETCH_LIMIT, HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT); - setupZookeeperTrackers(); + setupZookeeperTrackers(true); this.master = null; this.masterChecked = false; } - private synchronized void setupZookeeperTrackers() + private synchronized boolean setupZookeeperTrackers(boolean allowAbort) throws ZooKeeperConnectionException{ // initialize zookeeper and master address manager this.zooKeeper = getZooKeeperWatcher(); - masterAddressTracker = new MasterAddressTracker(this.zooKeeper, this); - masterAddressTracker.start(); - + this.masterAddressTracker = new MasterAddressTracker(this.zooKeeper, this); this.rootRegionTracker = new RootRegionTracker(this.zooKeeper, this); - this.rootRegionTracker.start(); + return this.rootRegionTracker.start(allowAbort) + && this.masterAddressTracker.start(allowAbort); } - private synchronized void resetZooKeeperTrackers() + /** + * Reset ZookeeperTrackers. + * + * @throws ZooKeeperConnectionException + * if unable to connect to Zookeeper. + */ + private synchronized void resetZooKeeperTrackersWithRetries() throws ZooKeeperConnectionException { - LOG.info("Trying to reconnect to zookeeper"); - masterAddressTracker.stop(); - masterAddressTracker = null; - rootRegionTracker.stop(); - rootRegionTracker = null; - this.zooKeeper = null; - setupZookeeperTrackers(); + LOG.info("Trying to reconnect to zookeeper."); + for (int tries = 0; tries < this.numRetries; tries++) { + boolean isLastTime = (tries == (this.numRetries - 1)); + try { + masterAddressTracker.stop(); + masterAddressTracker = null; + rootRegionTracker.stop(); + rootRegionTracker = null; + this.zooKeeper = null; + if (setupZookeeperTrackers(isLastTime)) { + break; + } + } catch (ZooKeeperConnectionException zkce) { + if (isLastTime) { + throw zkce; + } + } + LOG.info("Tried to reconnect to zookeeper but failed, already tried " + tries + " times."); + try { + Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + } + } } public Configuration getConfiguration() { return this.conf; } - private long getPauseTime(int tries) { - int ntries = tries; - if (ntries >= HConstants.RETRY_BACKOFF.length) { - ntries = HConstants.RETRY_BACKOFF.length - 1; - } - return this.pause * HConstants.RETRY_BACKOFF[ntries]; - } - public HMasterInterface getMaster() throws MasterNotRunningException, ZooKeeperConnectionException { @@ -593,14 +607,16 @@ " failed; no more retrying.", e); break; } - LOG.info("getMaster attempt " + tries + " of " + this.numRetries + - " failed; retrying after sleep of " + - getPauseTime(tries), e); + LOG.info( + "getMaster attempt " + tries + " of " + this.numRetries + + " failed; retrying after sleep of " + + ConnectionUtils.getPauseTime(this.pause, tries), e); } // Cannot connect to master or it is not running. Sleep & retry try { - this.masterLock.wait(getPauseTime(tries)); + this.masterLock.wait(ConnectionUtils + .getPauseTime(this.pause, tries)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Thread was interrupted while trying to connect to master."); @@ -787,7 +803,9 @@ private HRegionLocation locateRegion(final byte [] tableName, final byte [] row, boolean useCache) throws IOException { - if (this.closed) throw new IOException(toString() + " closed"); + if (this.closed) { + throw new InvalidConnectionException(toString() + " closed"); + } if (tableName == null || tableName.length == 0) { throw new IllegalArgumentException( "table name cannot be null or zero length"); @@ -998,12 +1016,13 @@ } if (tries < numRetries - 1) { if (LOG.isDebugEnabled()) { - LOG.debug("locateRegionInMeta parentTable=" + - Bytes.toString(parentTable) + ", metaLocation=" + - ((metaLocation == null)? "null": metaLocation) + ", attempt=" + - tries + " of " + - this.numRetries + " failed; retrying after sleep of " + - getPauseTime(tries) + " because: " + e.getMessage()); + LOG.debug("locateRegionInMeta parentTable=" + + Bytes.toString(parentTable) + ", metaLocation=" + + ((metaLocation == null) ? "null" : metaLocation) + + ", attempt=" + tries + " of " + this.numRetries + + " failed; retrying after sleep of " + + ConnectionUtils.getPauseTime(this.pause, tries) + + " because: " + e.getMessage()); } } else { throw e; @@ -1015,7 +1034,7 @@ } } try{ - Thread.sleep(getPauseTime(tries)); + Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Giving up trying to location region in " + @@ -1231,6 +1250,9 @@ public T getRegionServerWithRetries(ServerCallable callable) throws IOException, RuntimeException { + if (this.closed) { + throw new InvalidConnectionException(toString() + " closed"); + } List exceptions = new ArrayList(); for(int tries = 0; tries < numRetries; tries++) { try { @@ -1245,7 +1267,7 @@ } } try { - Thread.sleep(getPauseTime(tries)); + Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Giving up trying to get region server: thread is interrupted."); @@ -1256,6 +1278,9 @@ public T getRegionServerWithoutRetries(ServerCallable callable) throws IOException, RuntimeException { + if (this.closed) { + throw new InvalidConnectionException(toString() + " closed"); + } try { callable.instantiateServer(false); return callable.call(); @@ -1295,7 +1320,6 @@ final byte[] tableName, ExecutorService pool, Object[] results) throws IOException, InterruptedException { - // results must be the same size as list if (results.length != list.size()) { throw new IllegalArgumentException("argument results must be the same size as argument list"); @@ -1316,7 +1340,7 @@ // sleep first, if this is a retry if (tries >= 1) { - long sleepTime = getPauseTime(tries); + long sleepTime = ConnectionUtils.getPauseTime(this.pause, tries); LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!"); Thread.sleep(sleepTime); } @@ -1530,11 +1554,12 @@ @Override public void abort(final String msg, Throwable t) { - if (t instanceof KeeperException.SessionExpiredException) { + if ((t instanceof KeeperException.SessionExpiredException) + || (t instanceof KeeperException.ConnectionLossException)) { try { LOG.info("This client just lost it's session with ZooKeeper, trying" + " to reconnect."); - resetZooKeeperTrackers(); + resetZooKeeperTrackersWithRetries(); LOG.info("Reconnected successfully. This disconnect could have been" + " caused by a network partition or a long-running GC pause," + " either way it's recommended that you verify your environment."); @@ -1629,5 +1654,10 @@ LOG.debug("The connection to " + this.zooKeeper + " was closed by the finalize method."); } + + @Override + public void resetConnection() throws ZooKeeperConnectionException { + resetZooKeeperTrackersWithRetries(); + } } } Index: src/main/java/org/apache/hadoop/hbase/client/InvalidConnectionException.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/InvalidConnectionException.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/InvalidConnectionException.java (revision 0) @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +/** + * Thrown when HConnection has been closed. + */ +public class InvalidConnectionException extends IOException { + private static final long serialVersionUID = 8792360655678089586L; + + public InvalidConnectionException() { + super(); + } + + public InvalidConnectionException(String s) { + super(s); + } +} Property changes on: src\main\java\org\apache\hadoop\hbase\client\InvalidConnectionException.java ___________________________________________________________________ Added: svn:needs-lock + * Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1231476) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -360,7 +360,7 @@ // Set the cluster as up. If new RSs, they'll be waiting on this before // going ahead with their startup. this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this); - this.clusterStatusTracker.start(); + this.clusterStatusTracker.start(true); boolean wasUp = this.clusterStatusTracker.isClusterUp(); if (!wasUp) this.clusterStatusTracker.setClusterUp(); Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1231476) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -489,13 +489,13 @@ // block until a master is available. No point in starting up if no master // running. this.masterAddressManager = new MasterAddressTracker(this.zooKeeper, this); - this.masterAddressManager.start(); + this.masterAddressManager.start(true); blockAndCheckIfStopped(this.masterAddressManager); // Wait on cluster being up. Master will set this flag up in zookeeper // when ready. this.clusterStatusTracker = new ClusterStatusTracker(this.zooKeeper, this); - this.clusterStatusTracker.start(); + this.clusterStatusTracker.start(true); blockAndCheckIfStopped(this.clusterStatusTracker); // Create the catalog tracker and start it; Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1231476) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -171,7 +171,7 @@ // Set a tracker on replicationStateNodeNode this.statusTracker = new ReplicationStatusTracker(this.zookeeper, abortable); - statusTracker.start(); + statusTracker.start(true); readReplicationStateZnode(); } Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java (revision 1231476) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java (working copy) @@ -62,11 +62,16 @@ /** * Starts the tracking of the node in ZooKeeper. - * - *

Use {@link #blockUntilAvailable()} to block until the node is available - * or {@link #getData()} to get the data of the node if it is available. + * + *

+ * Use {@link #blockUntilAvailable()} to block until the node is available or + * {@link #getData()} to get the data of the node if it is available. + * + * @param allowAbort If allowAbort is false, the abortable should not abort when a + * KeeperException occur. + * @return start result. true if start successfully. */ - public synchronized void start() { + public synchronized boolean start(boolean allowAbort) { this.watcher.registerListener(this); try { if(ZKUtil.watchAndCheckExists(watcher, node)) { @@ -75,11 +80,16 @@ this.data = data; } else { // It existed but now does not, try again to ensure a watch is set - start(); + return start(allowAbort); } } + return true; } catch (KeeperException e) { - abortable.abort("Unexpected exception during initialization, aborting", e); + if (allowAbort && (abortable != null)) { + abortable.abort("Unexpected exception during initialization, aborting", + e); + } + return false; } } @@ -154,7 +164,9 @@ nodeDeleted(path); } } catch(KeeperException e) { - abortable.abort("Unexpected exception handling nodeCreated event", e); + if (abortable != null) { + abortable.abort("Unexpected exception handling nodeCreated event", e); + } } } @@ -168,7 +180,9 @@ this.data = null; } } catch(KeeperException e) { - abortable.abort("Unexpected exception handling nodeDeleted event", e); + if (abortable != null) { + abortable.abort("Unexpected exception handling nodeDeleted event", e); + } } } } Index: src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1231476) +++ src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.KeyOnlyFilter; @@ -63,8 +64,8 @@ import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.WhileMatchFilter; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -4032,6 +4033,35 @@ queue.put(new Object()); } + /** + * Test HConnection can be recovered after this connection has been + * aborted. + * @throws IOException + */ + @Test + public void testConnectionResetAfterAbort() throws IOException { + final byte[] COLUMN_FAMILY = Bytes.toBytes("columnfam"); + final byte[] COLUMN = Bytes.toBytes("col"); + HTable table = TEST_UTIL.createTable( + Bytes.toBytes("testConnectionRecover"), new byte[][] { COLUMN_FAMILY }); + Put put01 = new Put(Bytes.toBytes("testrow1")); + put01.add(COLUMN_FAMILY, COLUMN, Bytes.toBytes("testValue")); + table.put(put01); + // At this time, abort the connection. + HConnection conn = table.getConnection(); + conn.abort("Test Connection Abort", new KeeperException.ConnectionLossException()); + boolean putSuccess = true; + // This put will success, for the connection has been recovered. + try { + Put put02 = new Put(Bytes.toBytes("testrow1")); + put02.add(COLUMN_FAMILY, COLUMN, Bytes.toBytes("testValue")); + table.put(put02); + } catch (IOException ioe) { + putSuccess = false; + } + assertTrue(putSuccess); + } + } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java (revision 1231476) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java (working copy) @@ -64,7 +64,7 @@ // Should not have a master yet MasterAddressTracker addressManager = new MasterAddressTracker(zk, null); - addressManager.start(); + addressManager.start(true); assertFalse(addressManager.hasMaster()); zk.registerListener(addressManager); Index: src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java (revision 1231476) +++ src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java (working copy) @@ -72,7 +72,7 @@ ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "testInterruptible", abortable); final TestTracker tracker = new TestTracker(zk, "/xyz", abortable); - tracker.start(); + tracker.start(true); Thread t = new Thread() { @Override public void run() { @@ -105,7 +105,7 @@ // Start a ZKNT with no node currently available TestTracker localTracker = new TestTracker(zk, node, abortable); - localTracker.start(); + localTracker.start(true); zk.registerListener(localTracker); // Make sure we don't have a node @@ -120,7 +120,7 @@ // Now, start a new ZKNT with the node already available TestTracker secondTracker = new TestTracker(zk, node, null); - secondTracker.start(); + secondTracker.start(true); zk.registerListener(secondTracker); // Put up an additional zk listener so we know when zk event is done @@ -213,7 +213,7 @@ public WaitToGetDataThread(ZooKeeperWatcher zk, String node) { tracker = new TestTracker(zk, node, null); - tracker.start(); + tracker.start(true); zk.registerListener(tracker); hasData = false; }