Index: src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java (revision 1232125) +++ src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java (working copy) @@ -72,7 +72,7 @@ ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "testInterruptible", abortable); final TestTracker tracker = new TestTracker(zk, "/xyz", abortable); - tracker.start(); + tracker.start(true); Thread t = new Thread() { @Override public void run() { @@ -105,7 +105,7 @@ // Start a ZKNT with no node currently available TestTracker localTracker = new TestTracker(zk, node, abortable); - localTracker.start(); + localTracker.start(true); zk.registerListener(localTracker); // Make sure we don't have a node @@ -120,7 +120,7 @@ // Now, start a new ZKNT with the node already available TestTracker secondTracker = new TestTracker(zk, node, null); - secondTracker.start(); + secondTracker.start(true); zk.registerListener(secondTracker); // Put up an additional zk listener so we know when zk event is done @@ -213,7 +213,7 @@ public WaitToGetDataThread(ZooKeeperWatcher zk, String node) { tracker = new TestTracker(zk, node, null); - tracker.start(); + tracker.start(true); zk.registerListener(tracker); hasData = false; } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java (revision 1232125) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java (working copy) @@ -64,7 +64,7 @@ // Should not have a master yet MasterAddressTracker addressManager = new MasterAddressTracker(zk, null); - addressManager.start(); + addressManager.start(true); assertFalse(addressManager.hasMaster()); zk.registerListener(addressManager); Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java (revision 1232125) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java (working copy) @@ -122,7 +122,7 @@ RSTracker rsTracker = new RSTracker(zkw, "/hbase/rs/"+regionServer.getServerName(), Thread.currentThread()); - rsTracker.start(); + rsTracker.start(true); zkw.registerListener(rsTracker); boolean caughtInterruption = false; Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java (revision 1232125) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java (working copy) @@ -166,7 +166,7 @@ } }); - masterTracker.start(); + masterTracker.start(true); zkw.registerListener(masterTracker); // Test (part of the) output that should have be printed by master when it aborts: Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java (revision 1232125) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java (working copy) @@ -180,7 +180,7 @@ } }); - masterTracker.start(); + masterTracker.start(true); zkw.registerListener(masterTracker); // Test (part of the) output that should have be printed by master when it aborts: Index: src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1232125) +++ src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -35,7 +35,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -83,6 +82,7 @@ import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.DataInputBuffer; +import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -4567,7 +4567,38 @@ assertNotNull(addrAfter); assertTrue(addrAfter.getPort() != addrCache.getPort()); assertEquals(addrAfter.getPort(), addrNoCache.getPort()); - } + } + + /** + * Test HConnection can be recovered after this connection has been + * aborted. + * @throws IOException + */ + @Test + public void testConnectionResetAfterAbort() throws IOException { + final byte[] COLUMN_FAMILY = Bytes.toBytes("columnfam"); + final byte[] COLUMN = Bytes.toBytes("col"); + HTable table = TEST_UTIL.createTable( + Bytes.toBytes("testConnectionRecover"), new byte[][] { COLUMN_FAMILY }); + Put put01 = new Put(Bytes.toBytes("testrow1")); + put01.add(COLUMN_FAMILY, COLUMN, Bytes.toBytes("testValue")); + table.put(put01); + + // At this time, abort the connection. + HConnection conn = table.getConnection(); + conn.abort("Test Connection Abort", new KeeperException.ConnectionLossException()); + boolean putSuccess = true; + // This put will success, for the connection has been recovered. + try { + Put put02 = new Put(Bytes.toBytes("testrow1")); + put02.add(COLUMN_FAMILY, COLUMN, Bytes.toBytes("testValue")); + table.put(put02); + } catch (IOException ioe) { + putSuccess = false; + } + assertTrue(putSuccess); + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 1232125) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (working copy) @@ -225,6 +225,14 @@ public void registerListener(ZooKeeperListener listener) { listeners.add(listener); } + + /** + * Unregister the specified listener. + * @param listener + */ + public void unregisterListener(ZooKeeperListener listener) { + listeners.remove(listener); + } /** * Register the specified listener to receive ZooKeeper events and add it as Index: src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java (revision 1232125) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java (working copy) @@ -63,14 +63,21 @@ } @Override - public void start() { + public boolean start(boolean allowAbort) { try { watcher.registerListener(this); ZKUtil.listChildrenAndWatchThem(watcher, node); // Clean-up old in-process schema changes for this RS now? + return true; } catch (KeeperException e) { - LOG.error("RegionServer SchemaChangeTracker startup failed with " + - "KeeperException.", e); + if (allowAbort && (abortable != null)) { + abortable.abort("RegionServer SchemaChangeTracker startup failed", + e); + } else { + LOG.error("RegionServer SchemaChangeTracker startup failed with " + + "KeeperException.", e); + } + return false; } } Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java (revision 1232125) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java (working copy) @@ -69,8 +69,12 @@ * *

Use {@link #blockUntilAvailable()} to block until the node is available * or {@link #getData(boolean)} to get the data of the node if it is available. + * + * @param allowAbort If allowAbort is false, the abortable should not abort when a + * KeeperException occur. + * @return start result. true if start successfully. */ - public synchronized void start() { + public synchronized boolean start(boolean allowAbort) { this.watcher.registerListener(this); try { if(ZKUtil.watchAndCheckExists(watcher, node)) { @@ -80,15 +84,21 @@ } else { // It existed but now does not, try again to ensure a watch is set LOG.debug("Try starting again because there is no data from " + node); - start(); + return start(allowAbort); } } + return true; } catch (KeeperException e) { - abortable.abort("Unexpected exception during initialization, aborting", e); + if (allowAbort && (abortable != null)) { + abortable.abort("Unexpected exception during initialization, aborting", + e); + } + return false; } } public synchronized void stop() { + this.watcher.unregisterListener(this); this.stopped = true; notifyAll(); } @@ -171,7 +181,9 @@ nodeDeleted(path); } } catch(KeeperException e) { - abortable.abort("Unexpected exception handling nodeCreated event", e); + if (abortable != null) { + abortable.abort("Unexpected exception handling nodeCreated event", e); + } } } @@ -185,7 +197,9 @@ this.data = null; } } catch(KeeperException e) { - abortable.abort("Unexpected exception handling nodeDeleted event", e); + if (abortable != null) { + abortable.abort("Unexpected exception handling nodeDeleted event", e); + } } } } Index: src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java (revision 1232125) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java (working copy) @@ -60,15 +60,20 @@ } @Override - public void start() { + public boolean start(boolean allowAbort) { try { watcher.registerListener(this); List tables = ZKUtil.listChildrenNoWatch(watcher, watcher.schemaZNode); processCompletedSchemaChanges(tables); + return true; } catch (KeeperException e) { - LOG.error("MasterSchemaChangeTracker startup failed.", e); - abortable.abort("MasterSchemaChangeTracker startup failed", e); + if (allowAbort && abortable != null) { + abortable.abort("MasterSchemaChangeTracker startup failed", e); + } else { + LOG.error("MasterSchemaChangeTracker startup failed.", e); + } + return false; } } Index: src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (revision 1232125) +++ src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (working copy) @@ -626,7 +626,7 @@ } }); - rootRegionTracker.start(); + rootRegionTracker.start(true); ServerName sn = null; try { sn = rootRegionTracker.getRootRegionLocation(); Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1232125) +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -574,35 +574,70 @@ HConstants.HBASE_CLIENT_PREFETCH_LIMIT, HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT); - setupZookeeperTrackers(); + setupZookeeperTrackers(true); this.master = null; this.masterChecked = false; } - private synchronized void setupZookeeperTrackers() + private synchronized boolean setupZookeeperTrackers(boolean allowAbort) throws ZooKeeperConnectionException{ // initialize zookeeper and master address manager this.zooKeeper = getZooKeeperWatcher(); - masterAddressTracker = new MasterAddressTracker(this.zooKeeper, this); - masterAddressTracker.start(); + this.masterAddressTracker = new MasterAddressTracker(this.zooKeeper, this); this.rootRegionTracker = new RootRegionTracker(this.zooKeeper, this); - this.rootRegionTracker.start(); - + if (!this.masterAddressTracker.start(allowAbort)) { + this.masterAddressTracker.stop(); + this.masterAddressTracker = null; + this.zooKeeper = null; + return false; + } + if (!this.rootRegionTracker.start(allowAbort)) { + this.masterAddressTracker.stop(); + this.rootRegionTracker.stop(); + this.masterAddressTracker = null; + this.rootRegionTracker = null; + this.zooKeeper = null; + return false; + } this.clusterId = new ClusterId(this.zooKeeper, this); + return true; } - private synchronized void resetZooKeeperTrackers() + @Override + public synchronized void resetZooKeeperTrackersWithRetries() throws ZooKeeperConnectionException { LOG.info("Trying to reconnect to zookeeper"); - masterAddressTracker.stop(); - masterAddressTracker = null; - rootRegionTracker.stop(); - rootRegionTracker = null; - clusterId = null; + if (this.masterAddressTracker != null) { + this.masterAddressTracker.stop(); + this.masterAddressTracker = null; + } + if (this.rootRegionTracker != null) { + this.rootRegionTracker.stop(); + this.rootRegionTracker = null; + } this.zooKeeper = null; - setupZookeeperTrackers(); + this.clusterId = null; + for (int tries = 0; tries < this.numRetries; tries++) { + boolean isLastTime = (tries == (this.numRetries - 1)); + try { + if (setupZookeeperTrackers(isLastTime)) { + break; + } + } catch (ZooKeeperConnectionException zkce) { + if (isLastTime) { + throw zkce; + } + } + LOG.info("Tried to reconnect to zookeeper but failed, already tried " + + tries + " times."); + try { + Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + } + } } public Configuration getConfiguration() { @@ -802,7 +837,9 @@ private HRegionLocation locateRegion(final byte [] tableName, final byte [] row, boolean useCache) throws IOException { - if (this.closed) throw new IOException(toString() + " closed"); + if (this.closed) { + throw new ClosedConnectionException(toString() + " closed"); + } if (tableName == null || tableName.length == 0) { throw new IllegalArgumentException( "table name cannot be null or zero length"); @@ -1024,7 +1061,8 @@ ((metaLocation == null)? "null": "{" + metaLocation + "}") + ", attempt=" + tries + " of " + this.numRetries + " failed; retrying after sleep of " + - ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage()); + ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + + e.getMessage()); } } else { throw e; @@ -1331,11 +1369,17 @@ public T getRegionServerWithRetries(ServerCallable callable) throws IOException, RuntimeException { + if (this.closed) { + throw new ClosedConnectionException(toString() + " closed"); + } return callable.withRetries(); } public T getRegionServerWithoutRetries(ServerCallable callable) throws IOException, RuntimeException { + if (this.closed) { + throw new ClosedConnectionException(toString() + " closed"); + } return callable.withoutRetries(); } @@ -1659,11 +1703,12 @@ @Override public void abort(final String msg, Throwable t) { - if (t instanceof KeeperException.SessionExpiredException) { + if (t instanceof KeeperException.SessionExpiredException + || t instanceof KeeperException.ConnectionLossException) { try { LOG.info("This client just lost it's session with ZooKeeper, trying" + " to reconnect."); - resetZooKeeperTrackers(); + resetZooKeeperTrackersWithRetries(); LOG.info("Reconnected successfully. This disconnect could have been" + " caused by a network partition or a long-running GC pause," + " either way it's recommended that you verify your environment."); Index: src/main/java/org/apache/hadoop/hbase/client/HConnection.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnection.java (revision 1232125) +++ src/main/java/org/apache/hadoop/hbase/client/HConnection.java (working copy) @@ -145,6 +145,12 @@ * Allows flushing the region cache. */ public void clearRegionCache(); + + /** + * Closes the original connection and creates a new one. + * @throws ZooKeeperConnectionException if unable to connect to zookeeper + */ + public void resetZooKeeperTrackersWithRetries() throws ZooKeeperConnectionException; /** * Allows flushing the region cache of all locations that pertain to Index: src/main/java/org/apache/hadoop/hbase/client/ClosedConnectionException.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ClosedConnectionException.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/ClosedConnectionException.java (revision 0) @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +/** + * Thrown when HConnection has been closed. + */ +public class ClosedConnectionException extends IOException { + private static final long serialVersionUID = 8792360655678089586L; + + public ClosedConnectionException() { + super(); + } + + public ClosedConnectionException(String s) { + super(s); + } +} Index: src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (revision 1232125) +++ src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (working copy) @@ -121,7 +121,7 @@ } try { - Thread.sleep(getPauseTime(tries)); + Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // we should delete connection between client and zookeeper @@ -301,14 +301,6 @@ return this.connection.getHTableDescriptor(tableName); } - private long getPauseTime(int tries) { - int triesCount = tries; - if (triesCount >= HConstants.RETRY_BACKOFF.length) { - triesCount = HConstants.RETRY_BACKOFF.length - 1; - } - return this.pause * HConstants.RETRY_BACKOFF[triesCount]; - } - /** * Creates a new table. * Synchronous operation. @@ -429,7 +421,7 @@ " of " + numRegs + " regions are online; retries exhausted."); } try { // Sleep - Thread.sleep(getPauseTime(tries)); + Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); } catch (InterruptedException e) { throw new InterruptedIOException("Interrupted when opening" + " regions; " + actualRegCount.get() + " of " + numRegs + @@ -557,7 +549,7 @@ } } try { - Thread.sleep(getPauseTime(tries)); + Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); } catch (InterruptedException e) { // continue } @@ -638,7 +630,7 @@ if (enabled) { break; } - long sleep = getPauseTime(tries); + long sleep = ConnectionUtils.getPauseTime(this.pause, tries); if (LOG.isDebugEnabled()) { LOG.debug("Sleeping= " + sleep + "ms, waiting for all regions to be " + "enabled in " + Bytes.toString(tableName)); @@ -779,7 +771,7 @@ if (disabled) { break; } - long sleep = getPauseTime(tries); + long sleep = ConnectionUtils.getPauseTime(this.pause, tries); if (LOG.isDebugEnabled()) { LOG.debug("Sleeping= " + sleep + "ms, waiting for all regions to be " + "disabled in " + Bytes.toString(tableName)); Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1232125) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -568,13 +568,13 @@ // block until a master is available. No point in starting up if no master // running. this.masterAddressManager = new MasterAddressTracker(this.zooKeeper, this); - this.masterAddressManager.start(); + this.masterAddressManager.start(true); blockAndCheckIfStopped(this.masterAddressManager); // Wait on cluster being up. Master will set this flag up in zookeeper // when ready. this.clusterStatusTracker = new ClusterStatusTracker(this.zooKeeper, this); - this.clusterStatusTracker.start(); + this.clusterStatusTracker.start(true); blockAndCheckIfStopped(this.clusterStatusTracker); // Create the catalog tracker and start it; @@ -585,7 +585,7 @@ // Schema change tracker this.schemaChangeTracker = new SchemaChangeTracker(this.zooKeeper, this, this); - this.schemaChangeTracker.start(); + this.schemaChangeTracker.start(true); } /** Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1232125) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -416,7 +416,7 @@ // Set the cluster as up. If new RSs, they'll be waiting on this before // going ahead with their startup. this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this); - this.clusterStatusTracker.start(); + this.clusterStatusTracker.start(true); boolean wasUp = this.clusterStatusTracker.isClusterUp(); if (!wasUp) this.clusterStatusTracker.setClusterUp(); @@ -424,7 +424,7 @@ this.schemaChangeTracker = new MasterSchemaChangeTracker(getZooKeeper(), this, this, conf.getInt("hbase.instant.schema.alter.timeout", 60000)); - this.schemaChangeTracker.start(); + this.schemaChangeTracker.start(true); LOG.info("Server active/primary master; " + this.serverName + ", sessionid=0x" + Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1232125) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -162,7 +162,7 @@ // Set a tracker on replicationStateNodeNode this.statusTracker = new ReplicationStatusTracker(this.zookeeper, abortable); - statusTracker.start(); + statusTracker.start(true); readReplicationStateZnode(); } Index: src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java (revision 1232125) +++ src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java (working copy) @@ -235,8 +235,8 @@ public void start() throws IOException, InterruptedException { LOG.debug("Starting catalog tracker " + this); try { - this.rootRegionTracker.start(); - this.metaNodeTracker.start(); + this.rootRegionTracker.start(true); + this.metaNodeTracker.start(true); } catch (RuntimeException e) { Throwable t = e.getCause(); this.abortable.abort(e.getMessage(), t);