Index: src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java (revision 1308228) +++ src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java (working copy) @@ -26,6 +26,10 @@ import static org.junit.Assert.fail; import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -58,12 +62,17 @@ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static HConnection persistentConnection; /** * @throws java.lang.Exception */ @BeforeClass public static void setUpBeforeClass() throws Exception { + // create a connection *before* the cluster is started, to validate that the + // connection's ZK trackers are initialized on demand + persistentConnection = HConnectionManager.createConnection(TEST_UTIL.getConfiguration()); + // Test we can first start the ZK cluster by itself TEST_UTIL.startMiniZKCluster(); TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true); @@ -75,6 +84,7 @@ */ @AfterClass public static void tearDownAfterClass() throws Exception { + persistentConnection.close(); TEST_UTIL.shutdownMiniCluster(); } @@ -93,40 +103,28 @@ */ @Test public void testClientSessionExpired() - throws IOException, InterruptedException { + throws Exception { LOG.info("testClientSessionExpired"); Configuration c = new Configuration(TEST_UTIL.getConfiguration()); new HTable(c, HConstants.META_TABLE_NAME).close(); - String quorumServers = ZKConfig.getZKQuorumServersString(c); - int sessionTimeout = 5 * 1000; // 5 seconds HConnection connection = HConnectionManager.getConnection(c); ZooKeeperWatcher connectionZK = connection.getZooKeeperWatcher(); - long sessionID = connectionZK.getRecoverableZooKeeper().getSessionId(); - byte[] password = connectionZK.getRecoverableZooKeeper().getSessionPasswd(); - ZooKeeper zk = new ZooKeeper(quorumServers, sessionTimeout, - EmptyWatcher.instance, sessionID, password); - LOG.info("Session timeout=" + zk.getSessionTimeout() + - ", original=" + sessionTimeout + - ", id=" + zk.getSessionId()); - zk.close(); + TEST_UTIL.expireSession(connectionZK, null); - Thread.sleep(sessionTimeout * 3L); - // provoke session expiration by doing something with ZK ZKUtil.dump(connectionZK); // Check that the old ZK connection is closed, means we did expire System.err.println("ZooKeeper should have timed out"); - String state = connectionZK.getRecoverableZooKeeper().getState().toString(); LOG.info("state=" + connectionZK.getRecoverableZooKeeper().getState()); Assert.assertTrue(connectionZK.getRecoverableZooKeeper().getState(). equals(States.CLOSED)); // Check that the client recovered ZooKeeperWatcher newConnectionZK = connection.getZooKeeperWatcher(); - LOG.info("state=" + newConnectionZK.getRecoverableZooKeeper().getState()); - Assert.assertTrue(newConnectionZK.getRecoverableZooKeeper().getState().equals( - States.CONNECTED)); + States state = newConnectionZK.getRecoverableZooKeeper().getState(); + LOG.info("state=" + state); + Assert.assertTrue(state.equals(States.CONNECTED) || state.equals(States.CONNECTING)); } @Test @@ -148,21 +146,33 @@ * Make sure we can use the cluster * @throws Exception */ - public void testSanity() throws Exception{ - HBaseAdmin admin = - new HBaseAdmin(new Configuration(TEST_UTIL.getConfiguration())); + private void testSanity() throws Exception { String tableName = "test"+System.currentTimeMillis(); + HBaseAdmin admin = new HBaseAdmin(new Configuration(TEST_UTIL.getConfiguration())); + testAdminSanity(admin, tableName); + HTable table = new HTable(new Configuration(TEST_UTIL.getConfiguration()), tableName); + testTableSanity(table, tableName); + } + + private void testSanity(HConnection conn, ExecutorService pool) throws Exception { + String tableName = "test"+System.currentTimeMillis(); + HBaseAdmin admin = new HBaseAdmin(persistentConnection); + testAdminSanity(admin, tableName); + HTable table = new HTable(Bytes.toBytes(tableName), persistentConnection, pool); + testTableSanity(table, tableName); + + } + private void testAdminSanity(HBaseAdmin admin, String tableName) throws Exception { HTableDescriptor desc = new HTableDescriptor(tableName); HColumnDescriptor family = new HColumnDescriptor("fam"); desc.addFamily(family); LOG.info("Creating table " + tableName); admin.createTable(desc); + } - HTable table = - new HTable(new Configuration(TEST_UTIL.getConfiguration()), tableName); + private void testTableSanity(HTable table, String tableName) throws Exception { Put put = new Put(Bytes.toBytes("testrow")); - put.add(Bytes.toBytes("fam"), - Bytes.toBytes("col"), Bytes.toBytes("testdata")); + put.add(Bytes.toBytes("fam"), Bytes.toBytes("col"), Bytes.toBytes("testdata")); LOG.info("Putting table " + tableName); table.put(put); table.close(); @@ -229,6 +239,16 @@ } } + /** + * Test with a connection that existed before the cluster was started + */ + @Test + public void testPersistentConnection() throws Exception { + ExecutorService pool = new ThreadPoolExecutor(1, 10, 10, TimeUnit.SECONDS, + new SynchronousQueue()); + testSanity(persistentConnection, pool); + } + private void testKey(String ensemble, String port, String znode) throws IOException { Configuration conf = new Configuration(); Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1308228) +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -32,9 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.NoSuchElementException; import java.util.Set; -import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -498,11 +496,11 @@ private volatile HMasterInterface master; private volatile boolean masterChecked; // ZooKeeper reference - private ZooKeeperWatcher zooKeeper; + private volatile ZooKeeperWatcher zooKeeper; // ZooKeeper-based master address tracker - private MasterAddressTracker masterAddressTracker; - private RootRegionTracker rootRegionTracker; - private ClusterId clusterId; + private volatile MasterAddressTracker masterAddressTracker; + private volatile RootRegionTracker rootRegionTracker; + private volatile ClusterId clusterId; private final Object metaRegionLock = new Object(); @@ -574,35 +572,51 @@ HConstants.HBASE_CLIENT_PREFETCH_LIMIT, HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT); - setupZookeeperTrackers(); - this.master = null; this.masterChecked = false; } - private synchronized void setupZookeeperTrackers() - throws ZooKeeperConnectionException{ + private synchronized void ensureZookeeperTrackers() + throws ZooKeeperConnectionException { // initialize zookeeper and master address manager - this.zooKeeper = getZooKeeperWatcher(); - masterAddressTracker = new MasterAddressTracker(this.zooKeeper, this); - masterAddressTracker.start(); - - this.rootRegionTracker = new RootRegionTracker(this.zooKeeper, this); - this.rootRegionTracker.start(); - - this.clusterId = new ClusterId(this.zooKeeper, this); + try { + boolean reconnect = false; + if (this.zooKeeper == null) { + this.zooKeeper = getZooKeeperWatcher(); + reconnect = true; + } + if (this.clusterId == null) { + this.clusterId = new ClusterId(this.zooKeeper, this); + reconnect = true; + } + if (this.masterAddressTracker == null) { + masterAddressTracker = new MasterAddressTracker(this.zooKeeper, this); + masterAddressTracker.start(); + reconnect = true; + } + if (this.rootRegionTracker == null) { + this.rootRegionTracker = new RootRegionTracker(this.zooKeeper, this); + this.rootRegionTracker.start(); + reconnect = true; + } + if (reconnect) LOG.debug("(Re)connected to ZK successfully."); + } catch (ZooKeeperConnectionException e) { + resetZooKeeperTrackers(); + throw e; + } } - private synchronized void resetZooKeeperTrackers() - throws ZooKeeperConnectionException { - LOG.info("Trying to reconnect to zookeeper"); - masterAddressTracker.stop(); - masterAddressTracker = null; - rootRegionTracker.stop(); - rootRegionTracker = null; + private synchronized void resetZooKeeperTrackers() { + if (masterAddressTracker != null) { + masterAddressTracker.stop(); + masterAddressTracker = null; + } + if (rootRegionTracker != null) { + rootRegionTracker.stop(); + rootRegionTracker = null; + } clusterId = null; this.zooKeeper = null; - setupZookeeperTrackers(); } public Configuration getConfiguration() { @@ -623,6 +637,7 @@ LOG.info("Exception contacting master. Retrying...", ute.getCause()); } + ensureZookeeperTrackers(); checkIfBaseNodeAvailable(); ServerName sn = null; synchronized (this.masterLock) { @@ -807,11 +822,10 @@ throw new IllegalArgumentException( "table name cannot be null or zero length"); } - + ensureZookeeperTrackers(); if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) { try { - ServerName servername = - this.rootRegionTracker.waitRootRegionLocation(this.rpcTimeout); + ServerName servername = this.rootRegionTracker.getRootRegionLocation(); LOG.debug("Looked up root region location, connection=" + this + "; serverName=" + ((servername == null)? "": servername.toString())); if (servername == null) return null; @@ -1255,6 +1269,7 @@ } else { rsName = Addressing.createHostAndPortStr(hostname, port); } + ensureZookeeperTrackers(); // See if we already have a connection (common case) server = this.servers.get(rsName); if (server == null) { @@ -1642,25 +1657,21 @@ @Override public void abort(final String msg, Throwable t) { - if (t instanceof KeeperException.SessionExpiredException) { - try { - LOG.info("This client just lost it's session with ZooKeeper, trying" + - " to reconnect."); - resetZooKeeperTrackers(); - LOG.info("Reconnected successfully. This disconnect could have been" + + if (t instanceof KeeperException) { + LOG.info("This client just lost it's session with ZooKeeper, will" + + " automatically reconnect when needed."); + if (t instanceof KeeperException.SessionExpiredException) { + LOG.info("ZK session expired. This disconnect could have been" + " caused by a network partition or a long-running GC pause," + " either way it's recommended that you verify your environment."); - return; - } catch (ZooKeeperConnectionException e) { - LOG.error("Could not reconnect to ZooKeeper after session" + - " expiration, aborting"); - t = e; + resetZooKeeperTrackers(); } + return; } if (t != null) LOG.fatal(msg, t); else LOG.fatal(msg); this.aborted = true; - this.closed = true; + close(); } @Override