Index: src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java (revision 1188010) +++ src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java (working copy) @@ -72,7 +72,7 @@ @Override public void stop(String why) { - }}, null, null); + }}, null, null, false); LOG.debug("regionServerStartup 1"); HServerInfo hsi1 = new HServerInfo(new HServerAddress("example.org:1234"), Index: src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java (revision 1188010) +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java (working copy) @@ -139,6 +139,7 @@ // verify map-reduce results verify(Bytes.toString(table.getTableName())); } finally { + table.close(); mrCluster.shutdown(); if (job != null) { FileUtil.fullyDelete( @@ -170,6 +171,7 @@ } } assertTrue(verified); + table.close(); } /** Index: src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditor.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditor.java (revision 1188010) +++ src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditor.java (working copy) @@ -70,8 +70,7 @@ @Before public void setup() throws IOException, InterruptedException { Configuration c = new Configuration(UTIL.getConfiguration()); zkw = new ZooKeeperWatcher(c, "TestMetaReaderEditor", ABORTABLE); - HConnection connection = HConnectionManager.getConnection(c); - ct = new CatalogTracker(zkw, connection, ABORTABLE); + ct = new CatalogTracker(zkw, c, ABORTABLE); ct.start(); } Index: src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java (revision 1188010) +++ src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java (working copy) @@ -98,7 +98,8 @@ private CatalogTracker constructAndStartCatalogTracker(final HConnection c) throws IOException, InterruptedException { - CatalogTracker ct = new CatalogTracker(this.watcher, c, this.abortable); + CatalogTracker ct = new CatalogTracker(this.watcher, null, c, + this.abortable, 0); ct.start(); return ct; } Index: src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java (revision 1188010) +++ src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java (working copy) @@ -112,8 +112,7 @@ LOG.info("Starting mini hbase cluster"); UTIL.startMiniHBaseCluster(1, 1); Configuration c = new Configuration(UTIL.getConfiguration()); - HConnection connection = HConnectionManager.getConnection(c); - CatalogTracker ct = new CatalogTracker(connection); + CatalogTracker ct = new CatalogTracker(c); ct.start(); List originalTableRegions = MetaReader.getTableRegions(ct, desc.getName()); Index: src/test/java/org/apache/hadoop/hbase/client/TestHCM.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (revision 1188010) +++ src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (working copy) @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.util.Bytes; @@ -42,6 +43,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; /** * This class is for testing HCM features @@ -71,10 +73,7 @@ * @throws SecurityException * @see https://issues.apache.org/jira/browse/HBASE-2925 */ - // Disabling. Of course this test will OOME using new Configuration each time - // St.Ack 20110428 - // @Test - public void testManyNewConnectionsDoesnotOOME() + @Test public void testManyNewConnectionsDoesnotOOME() throws SecurityException, IllegalArgumentException, ZooKeeperConnectionException, NoSuchFieldException, IllegalAccessException, InterruptedException { @@ -86,13 +85,16 @@ public static void createNewConfigurations() throws SecurityException, IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException, ZooKeeperConnectionException { + int startingHConnectionManagerCacheSize = getHConnectionManagerCacheSize(); HConnection last = null; - for (int i = 0; i <= (HConnectionManager.MAX_CACHED_HBASE_INSTANCES * 2); i++) { + for (int i = 0; i <= 100; i++) { // set random key to differentiate the connection from previous ones Configuration configuration = HBaseConfiguration.create(); + configuration.set(HConstants.HBASE_CONNECTION_PER_CONFIG, "false"); configuration.set("somekey", String.valueOf(_randy.nextInt())); System.out.println("Hash Code: " + configuration.hashCode()); - HConnection connection = HConnectionManager.getConnection(configuration); + HConnection connection = + HConnectionManager.getConnection(configuration); if (last != null) { if (last == connection) { System.out.println("!! Got same connection for once !!"); @@ -104,14 +106,11 @@ configuration.set("someotherkey", String.valueOf(_randy.nextInt())); last = connection; LOG.info("Cache Size: " - + getHConnectionManagerCacheSize() + ", Valid Keys: " - + getValidKeyCount()); + + getHConnectionManagerCacheSize()); Thread.sleep(100); } - Assert.assertEquals(HConnectionManager.MAX_CACHED_HBASE_INSTANCES, + Assert.assertEquals(startingHConnectionManagerCacheSize + 1, getHConnectionManagerCacheSize()); - Assert.assertEquals(HConnectionManager.MAX_CACHED_HBASE_INSTANCES, - getValidKeyCount()); } private static int getHConnectionManagerCacheSize() @@ -124,21 +123,6 @@ return cache.size(); } - private static int getValidKeyCount() throws SecurityException, - NoSuchFieldException, IllegalArgumentException, - IllegalAccessException { - Field cacheField = - HConnectionManager.class.getDeclaredField("HBASE_INSTANCES"); - cacheField.setAccessible(true); - Map cache = (Map) cacheField.get(null); - List keys = new ArrayList(cache.keySet()); - Set values = new HashSet(); - for (Object key : keys) { - values.add(cache.get(key)); - } - return values.size(); - } - /** * Test that when we delete a location using the first row of a region * that we really delete it. @@ -158,4 +142,70 @@ HRegionLocation rl = conn.getCachedLocation(TABLE_NAME, ROW); assertNull("What is this location?? " + rl, rl); } + + /** + * Make sure that {@link HConfiguration} instances that are essentially the + * same map to the same {@link HConnection} instance. + */ + @Test + public void testConnectionSameness() throws Exception { + HConnection previousConnection = null; + for (int i = 0; i < 2; i++) { + // set random key to differentiate the connection from previous ones + Configuration configuration = TEST_UTIL.getConfiguration(); + configuration.set(HConstants.HBASE_CONNECTION_PER_CONFIG, "false"); + configuration.set("some_key", String.valueOf(_randy.nextInt())); + LOG.info("The hash code of the current configuration is: " + + configuration.hashCode()); + HConnection currentConnection = HConnectionManager + .getConnection(configuration); + if (previousConnection != null) { + assertTrue( + "Did not get the same connection even though its key didn't change", + previousConnection == currentConnection); + } + previousConnection = currentConnection; + // change the configuration, so that it is no longer reachable from the + // client's perspective. However, since its part of the LRU doubly linked + // list, it will eventually get thrown out, at which time it should also + // close the corresponding {@link HConnection}. + configuration.set("other_key", String.valueOf(_randy.nextInt())); + } + } + + /** + * Makes sure that there is no leaking of + * {@link HConnectionManager.TableServers} in the {@link HConnectionManager} + * class. + */ + @Test + public void testConnectionUniqueness() throws Exception { + HConnection previousConnection = null; + for (int i = 0; i < 50; i++) { + // set random key to differentiate the connection from previous ones + Configuration configuration = TEST_UTIL.getConfiguration(); + configuration.set(HConstants.HBASE_CONNECTION_PER_CONFIG, "false"); + configuration.set("some_key", String.valueOf(_randy.nextInt())); + configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID, + String.valueOf(_randy.nextInt())); + LOG.info("The hash code of the current configuration is: " + + configuration.hashCode()); + HConnection currentConnection = HConnectionManager + .getConnection(configuration); + if (previousConnection != null) { + assertTrue("Got the same connection even though its key changed!", + previousConnection != currentConnection); + } + // change the configuration, so that it is no longer reachable from the + // client's perspective. However, since its part of the LRU doubly linked + // list, it will eventually get thrown out, at which time it should also + // close the corresponding {@link HConnection}. + configuration.set("other_key", String.valueOf(_randy.nextInt())); + + previousConnection = currentConnection; + LOG.info("The current HConnectionManager#HBASE_INSTANCES cache size is: " + + getHConnectionManagerCacheSize()); + Thread.sleep(50); + } + } } Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (working copy) @@ -125,7 +125,8 @@ // Apparently this is recoverable. Retry a while. // See http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling // TODO: Generalize out in ZKUtil. - long wait = conf.getLong("hbase.zookeeper.recoverable.waittime", 10000); + long wait = conf.getLong(HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME, + HConstants.DEFAULT_ZOOKEPER_RECOVERABLE_WAITIME); long finished = System.currentTimeMillis() + wait; KeeperException ke = null; do { Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (working copy) @@ -169,7 +169,7 @@ "[\\t\\n\\x0B\\f\\r]", "")); StringBuilder builder = new StringBuilder(ensemble); builder.append(":"); - builder.append(conf.get("hbase.zookeeper.property.clientPort")); + builder.append(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)); builder.append(":"); builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); if (name != null && !name.isEmpty()) { Index: src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -93,14 +93,29 @@ /** Name of ZooKeeper config file in conf/ directory. */ public static final String ZOOKEEPER_CONFIG_NAME = "zoo.cfg"; - /** default client port that the zookeeper listens on */ + /** Parameter name for the client port that the zookeeper listens on */ + public static final String ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort"; + + /** Default client port that the zookeeper listens on */ public static final int DEFAULT_ZOOKEPER_CLIENT_PORT = 2181; + /** Parameter name for the wait time for the recoverable zookeeper */ + public static final String ZOOKEEPER_RECOVERABLE_WAITTIME = "hbase.zookeeper.recoverable.waittime"; + + /** Default wait time for the recoverable zookeeper */ + public static final long DEFAULT_ZOOKEPER_RECOVERABLE_WAITIME = 10000; + /** Parameter name for the root dir in ZK for this cluster */ public static final String ZOOKEEPER_ZNODE_PARENT = "zookeeper.znode.parent"; public static final String DEFAULT_ZOOKEEPER_ZNODE_PARENT = "/hbase"; + /** Parameter name for the limit on concurrent client-side zookeeper connections */ + public static final String ZOOKEEPER_MAX_CLIENT_CNXNS = "hbase.zookeeper.property.maxClientCnxns"; + + /** Default limit on concurrent client-side zookeeper connections */ + public static final int DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS = 30; + /** Parameter name for port region server listens on. */ public static final String REGIONSERVER_PORT = "hbase.regionserver.port"; @@ -331,20 +346,93 @@ */ public static long DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE = Long.MAX_VALUE; + /** + * Parameter name for client pause value, used mostly as value to wait + * before running a retry of a failed get, region lookup, etc. + */ + public static String HBASE_CLIENT_PAUSE = "hbase.client.pause"; /** + * Default value of {@link #HBASE_CLIENT_PAUSE}. + */ + public static long DEFAULT_HBASE_CLIENT_PAUSE = 1000; + + /** + * Parameter name for maximum retries, used as maximum for all retryable + * operations such as fetching of the root region from root region server, + * getting a cell's value, starting a row update, etc. + */ + public static String HBASE_CLIENT_RETRIES_NUMBER = "hbase.client.retries.number"; + + /** + * Default value of {@link #HBASE_CLIENT_RETRIES_NUMBER}. + */ + public static int DEFAULT_HBASE_CLIENT_RETRIES_NUMBER = 10; + + /** + * Parameter name for maximum attempts, used to limit the number of times the + * client will try to obtain the proxy for a given region server. + */ + public static String HBASE_CLIENT_RPC_MAXATTEMPTS = "hbase.client.rpc.maxattempts"; + + /** + * Default value of {@link #HBASE_CLIENT_RPC_MAXATTEMPTS}. + */ + public static int DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS = 1; + + /** + * Parameter name for client prefetch limit, used as the maximum number of regions + * info that will be prefetched. + */ + public static String HBASE_CLIENT_PREFETCH_LIMIT = "hbase.client.prefetch.limit"; + + /** + * Default value of {@link #HBASE_CLIENT_PREFETCH_LIMIT}. + */ + public static int DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT = 10; + + /** + * Parameter name for number of rows that will be fetched when calling next on + * a scanner if it is not served from memory. Higher caching values will + * enable faster scanners but will eat up more memory and some calls of next + * may take longer and longer times when the cache is empty. + */ + public static String HBASE_META_SCANNER_CACHING = "hbase.meta.scanner.caching"; + + /** + * Default value of {@link #HBASE_META_SCANNER_CACHING}. + */ + public static int DEFAULT_HBASE_META_SCANNER_CACHING = 100; + + /** + * Parameter name for unique identifier for this {@link Configuration} + * instance. If there are two or more {@link Configuration} instances that, + * for all intents and purposes, are the same except for their instance ids, + * then they will not be able to share the same {@link Connection} instance. + * On the other hand, even if the instance ids are the same, it could result + * in non-shared {@link Connection} instances if some of the other connection + * parameters differ. + */ + public static String HBASE_CLIENT_INSTANCE_ID = "hbase.client.instance.id"; + + /** + * If this parameter is set true then {@link HConnectionManager} will not share + * {@link HConnection} instances with different {@link Configuration} instances. + */ + public static String HBASE_CONNECTION_PER_CONFIG = "hbase.connection.per.config"; + + /** * HRegion server lease period in milliseconds. Clients must report in within this period * else they are considered dead. Unit measured in ms (milliseconds). */ public static String HBASE_REGIONSERVER_LEASE_PERIOD_KEY = "hbase.regionserver.lease.period"; - /** * Default value of {@link #HBASE_REGIONSERVER_LEASE_PERIOD_KEY}. */ public static long DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD = 60000; - + /** * timeout for each RPC */ Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -82,8 +82,6 @@ import org.apache.hadoop.hbase.client.Action; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.MultiAction; import org.apache.hadoop.hbase.client.MultiPut; @@ -168,7 +166,6 @@ protected HServerInfo serverInfo; protected final Configuration conf; - private final HConnection connection; protected final AtomicBoolean haveRootRegion = new AtomicBoolean(false); private FileSystem fs; private Path rootDir; @@ -283,7 +280,6 @@ public HRegionServer(Configuration conf) throws IOException, InterruptedException { this.fsOk = true; this.conf = conf; - this.connection = HConnectionManager.getConnection(conf); this.isOnline = false; // check to see if the codec list is available: @@ -503,7 +499,7 @@ blockAndCheckIfStopped(this.clusterStatusTracker); // Create the catalog tracker and start it; - this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection, + this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf, this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE)); catalogTracker.start(); } @@ -679,7 +675,6 @@ this.hbaseMaster = null; } this.leases.close(); - HConnectionManager.deleteConnection(conf, true); this.zooKeeper.close(); if (!killed) { join(); Index: src/main/java/org/apache/hadoop/hbase/master/ServerManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (working copy) @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.PleaseHoldException; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.YouAreDeadException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; @@ -83,6 +84,7 @@ private final Server master; private final MasterServices services; + private final HConnection connection; // Reporting to track master metrics. private final MasterMetrics metrics; @@ -96,9 +98,15 @@ * @param master * @param services * @param metrics + * @throws ZooKeeperConnectionException */ - public ServerManager(final Server master, final MasterServices services, - MasterMetrics metrics) { + public ServerManager(final Server master, final MasterServices services, MasterMetrics metrics) + throws ZooKeeperConnectionException { + this(master, services, metrics, true); + } + + ServerManager(final Server master, final MasterServices services, MasterMetrics metrics, final boolean connect) + throws ZooKeeperConnectionException { this.master = master; this.services = services; this.metrics = metrics; @@ -106,6 +114,7 @@ maxSkew = c.getLong("hbase.master.maxclockskew", 30000); this.deadservers = new DeadServer(c.getInt("hbase.master.maxdeadservers", 100)); + this.connection = connect ? HConnectionManager.getConnection(c) : null; } /** @@ -612,12 +621,10 @@ */ private HRegionInterface getServerConnection(HServerInfo info) throws IOException { - HConnection connection = - HConnectionManager.getConnection(this.master.getConfiguration()); HRegionInterface hri = serverConnections.get(info.getServerName()); if (hri == null) { LOG.debug("New connection to " + info.getServerName()); - hri = connection.getHRegionConnection(info.getServerAddress(), false); + hri = this.connection.getHRegionConnection(info.getServerAddress(), false); this.serverConnections.put(info.getServerName(), hri); } return hri; @@ -701,9 +708,15 @@ } /** - * Stop the ServerManager. Currently does nothing. + * Stop the ServerManager. Currently closes the connection to the master. */ public void stop() { - + if (connection != null) { + try { + connection.close(); + } catch (IOException e) { + LOG.error("Attempt to close connection to master failed", e); + } + } } } Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.catalog.MetaReader; -import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.MetaScanner; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; @@ -137,8 +136,6 @@ // file system manager for the master FS operations private MasterFileSystem fileSystemManager; - private HConnection connection; - // server manager to deal with region server info private ServerManager serverManager; @@ -298,7 +295,6 @@ if (this.catalogTracker != null) this.catalogTracker.stop(); if (this.serverManager != null) this.serverManager.stop(); if (this.assignmentManager != null) this.assignmentManager.stop(); - HConnectionManager.deleteConnection(this.conf, true); this.zooKeeper.close(); } LOG.info("HMaster main thread exiting"); @@ -344,12 +340,11 @@ // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring. this.fileSystemManager = new MasterFileSystem(this, metrics); - this.connection = HConnectionManager.getConnection(conf); this.executorService = new ExecutorService(getServerName()); this.serverManager = new ServerManager(this, this, metrics); - this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection, + this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf, this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE)); this.catalogTracker.start(); Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java (working copy) @@ -104,13 +104,7 @@ @Override public void close(TaskAttemptContext context) throws IOException { - table.flushCommits(); - // The following call will shutdown all connections to the cluster from - // this JVM. It will close out our zk session otherwise zk wil log - // expired sessions rather than closed ones. If any other HTable instance - // running in this JVM, this next call will cause it damage. Presumption - // is that the above this.table is only instance. - HConnectionManager.deleteAllConnections(true); + table.close(); } /** Index: src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (working copy) @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -87,12 +88,12 @@ * @throws IOException When something is broken with the data. */ @Override - public void map(ImmutableBytesWritable row, Result value, + public void map(ImmutableBytesWritable row, final Result value, Context context) throws IOException { if (replicatedScanner == null) { Configuration conf = context.getConfiguration(); - Scan scan = new Scan(); + final Scan scan = new Scan(); scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1)); long startTime = conf.getLong(NAME + ".startTime", 0); long endTime = conf.getLong(NAME + ".endTime", 0); @@ -107,18 +108,23 @@ scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); } - try { - HConnection conn = HConnectionManager.getConnection(conf); - ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf, - conn.getZooKeeperWatcher()); - ReplicationPeer peer = zk.getPeer(conf.get(NAME+".peerId")); - HTable replicatedTable = new HTable(peer.getConfiguration(), - conf.get(NAME+".tableName")); - scan.setStartRow(value.getRow()); - replicatedScanner = replicatedTable.getScanner(scan); - } catch (KeeperException e) { - throw new IOException("Got a ZK exception", e); - } + HConnectionManager.execute(new HConnectable(conf) { + @Override + public Void connect(HConnection conn) throws IOException { + try { + ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf, + conn.getZooKeeperWatcher()); + ReplicationPeer peer = zk.getPeer(conf.get(NAME+".peerId")); + HTable replicatedTable = new HTable(peer.getConfiguration(), + conf.get(NAME+".tableName")); + scan.setStartRow(value.getRow()); + replicatedScanner = replicatedTable.getScanner(scan); + } catch (KeeperException e) { + throw new IOException("Got a ZK exception", e); + } + return null; + } + }); } Result res = replicatedScanner.next(); try { @@ -151,20 +157,25 @@ if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) { throw new IOException("Replication needs to be enabled to verify it."); } - try { - HConnection conn = HConnectionManager.getConnection(conf); - ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf, - conn.getZooKeeperWatcher()); - // Just verifying it we can connect - ReplicationPeer peer = zk.getPeer(peerId); - if (peer == null) { - throw new IOException("Couldn't get access to the slave cluster," + - "please see the log"); + HConnectionManager.execute(new HConnectable(conf) { + @Override + public Void connect(HConnection conn) throws IOException { + try { + ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf, + conn.getZooKeeperWatcher()); + // Just verifying it we can connect + ReplicationPeer peer = zk.getPeer(peerId); + if (peer == null) { + throw new IOException("Couldn't get access to the slave cluster," + + "please see the log"); + } + } catch (KeeperException ex) { + throw new IOException("Couldn't get access to the slave cluster" + + " because: ", ex); + } + return null; } - } catch (KeeperException ex) { - throw new IOException("Couldn't get access to the slave cluster" + - " because: ", ex); - } + }); conf.set(NAME+".peerId", peerId); conf.set(NAME+".tableName", tableName); conf.setLong(NAME+".startTime", startTime); Index: src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java (working copy) @@ -66,7 +66,7 @@ public void close(Reporter reporter) throws IOException { - m_table.flushCommits(); + m_table.close(); } public void write(ImmutableBytesWritable key, Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -341,6 +341,13 @@ shipEdits(); } + if (this.conn != null) { + try { + this.conn.close(); + } catch (IOException e) { + LOG.debug("Attempt to close connection failed", e); + } + } LOG.debug("Source exiting " + peerClusterId); } Index: src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (working copy) @@ -154,6 +154,7 @@ LOG.info("Stopping " + this.zkHelper.getZookeeperWatcher()); this.zkHelper.getZookeeperWatcher().close(); } + // Not sure why we're deleting a connection that we never acquired or used HConnectionManager.deleteConnection(this.conf, true); } Index: src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java (working copy) @@ -29,12 +29,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.util.Bytes; @@ -58,6 +60,7 @@ */ public class CatalogTracker { private static final Log LOG = LogFactory.getLog(CatalogTracker.class); + private final Configuration conf; private final HConnection connection; private final ZooKeeperWatcher zookeeper; private final RootRegionTracker rootRegionTracker; @@ -79,15 +82,18 @@ HRegionInfo.FIRST_META_REGIONINFO.getRegionName(); /** - * Constructs a catalog tracker. Find current state of catalog tables and - * begin active tracking by executing {@link #start()} post construction. - * Does not timeout. - * @param connection Server connection; if problem, this connections - * {@link HConnection#abort(String, Throwable)} will be called. - * @throws IOException + * Constructs a catalog tracker. Find current state of catalog tables and + * begin active tracking by executing {@link #start()} post construction. Does + * not timeout. + * + * @param conf + * the {@link Configuration} from which a {@link HConnection} will be + * obtained; if problem, this connections + * {@link HConnection#abort(String, Throwable)} will be called. + * @throws IOException */ - public CatalogTracker(final HConnection connection) throws IOException { - this(connection.getZooKeeperWatcher(), connection, connection); + public CatalogTracker(final Configuration conf) throws IOException { + this(null, conf, null); } /** @@ -99,10 +105,10 @@ * @param abortable if fatal exception * @throws IOException */ - public CatalogTracker(final ZooKeeperWatcher zk, final HConnection connection, + public CatalogTracker(final ZooKeeperWatcher zk, final Configuration conf, final Abortable abortable) throws IOException { - this(zk, connection, abortable, 0); + this(zk, conf, abortable, 0); } /** @@ -115,11 +121,21 @@ * ({@link Object#wait(long)} when passed a 0 waits for ever). * @throws IOException */ - public CatalogTracker(final ZooKeeperWatcher zk, final HConnection connection, - final Abortable abortable, final int defaultTimeout) + public CatalogTracker(final ZooKeeperWatcher zk, final Configuration conf, + Abortable abortable, final int defaultTimeout) throws IOException { - this.zookeeper = zk; + this(zk, conf, HConnectionManager.getConnection(conf), abortable, defaultTimeout); + } + + CatalogTracker(final ZooKeeperWatcher zk, final Configuration conf, + HConnection connection, Abortable abortable, final int defaultTimeout) + throws IOException { + this.conf = conf; this.connection = connection; + this.zookeeper = (zk == null) ? this.connection.getZooKeeperWatcher() : zk; + if (abortable == null) { + abortable = this.connection; + } this.rootRegionTracker = new RootRegionTracker(zookeeper, abortable); this.metaNodeTracker = new MetaNodeTracker(zookeeper, this, abortable); this.defaultTimeout = defaultTimeout; @@ -143,13 +159,24 @@ * Interrupts any ongoing waits. */ public void stop() { - LOG.debug("Stopping catalog tracker " + this); - this.stopped = true; - this.rootRegionTracker.stop(); - this.metaNodeTracker.stop(); - // Call this and it will interrupt any ongoing waits on meta. - synchronized (this.metaAvailable) { - this.metaAvailable.notifyAll(); + if (!this.stopped) { + LOG.debug("Stopping catalog tracker " + this); + this.stopped = true; + this.rootRegionTracker.stop(); + this.metaNodeTracker.stop(); + try { + if (this.connection != null) { + this.connection.close(); + } + } catch (IOException e) { + // Although the {@link Closeable} interface throws an {@link + // IOException}, in reality, the implementation would never do that. + LOG.error("Attempt to close catalog tracker's connection failed.", e); + } + // Call this and it will interrupt any ongoing waits on meta. + synchronized (this.metaAvailable) { + this.metaAvailable.notifyAll(); + } } } Index: src/main/java/org/apache/hadoop/hbase/util/HMerge.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/HMerge.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/util/HMerge.java (working copy) @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -103,10 +104,14 @@ throws IOException { boolean masterIsRunning = false; if (testMasterRunning) { - HConnection connection = HConnectionManager.getConnection(conf); - masterIsRunning = connection.isMasterRunning(); + masterIsRunning = HConnectionManager + .execute(new HConnectable(conf) { + @Override + public Boolean connect(HConnection connection) throws IOException { + return connection.isMasterRunning(); + } + }); } - HConnectionManager.deleteConnection(conf, true); if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) { if (masterIsRunning) { throw new IllegalStateException( Index: src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (working copy) @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable; import org.apache.hadoop.hbase.client.MetaScanner; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; @@ -244,15 +245,23 @@ * Load the list of disabled tables in ZK into local set. * @throws ZooKeeperConnectionException * @throws IOException - * @throws KeeperException */ private void loadDisabledTables() - throws ZooKeeperConnectionException, IOException, KeeperException { - ZooKeeperWatcher zkw = - HConnectionManager.getConnection(conf).getZooKeeperWatcher(); - for (String tableName : ZKTable.getDisabledOrDisablingTables(zkw)) { - disabledTables.add(Bytes.toBytes(tableName)); - } + throws ZooKeeperConnectionException, IOException { + HConnectionManager.execute(new HConnectable(conf) { + @Override + public Void connect(HConnection connection) throws IOException { + ZooKeeperWatcher zkw = connection.getZooKeeperWatcher(); + try { + for (String tableName : ZKTable.getDisabledOrDisablingTables(zkw)) { + disabledTables.add(Bytes.toBytes(tableName)); + } + } catch (KeeperException ke) { + throw new IOException(ke); + } + return null; + } + }); } /** Index: src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java (working copy) @@ -28,7 +28,9 @@ import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.zookeeper.KeeperException; @@ -78,31 +80,58 @@ forceOfflineInZK(conf, actualRegion); } - private static void forceOfflineInZK(Configuration conf, HRegionInfo region) + private static void forceOfflineInZK(Configuration conf, final HRegionInfo region) throws ZooKeeperConnectionException, KeeperException, IOException { - ZKAssign.createOrForceNodeOffline( - HConnectionManager.getConnection(conf).getZooKeeperWatcher(), - region, HConstants.HBCK_CODE_NAME); + HConnectionManager.execute(new HConnectable(conf) { + @Override + public Void connect(HConnection connection) throws IOException { + try { + ZKAssign.createOrForceNodeOffline( + connection.getZooKeeperWatcher(), + region, HConstants.HBCK_CODE_NAME); + } catch (KeeperException ke) { + throw new IOException(ke); + } + return null; + } + }); } protected static void closeRegionSilentlyAndWait(Configuration conf, HServerAddress server, HRegionInfo region) throws IOException, InterruptedException { - HRegionInterface rs = - HConnectionManager.getConnection(conf).getHRegionConnection(server); - rs.closeRegion(region, false); - long timeout = conf.getLong("hbase.hbck.close.timeout", 120000); - long expiration = timeout + System.currentTimeMillis(); - while (System.currentTimeMillis() < expiration) { + + HConnection connection = HConnectionManager.getConnection(conf); + boolean success = false; + + try { + HRegionInterface rs = connection.getHRegionConnection(server); + rs.closeRegion(region, false); + long timeout = conf.getLong("hbase.hbck.close.timeout", 120000); + long expiration = timeout + System.currentTimeMillis(); + while (System.currentTimeMillis() < expiration) { + try { + HRegionInfo rsRegion = rs.getRegionInfo(region.getRegionName()); + if (rsRegion == null) + throw new NotServingRegionException(); + } catch (Exception e) { + success = true; + return; + } + Thread.sleep(1000); + } + throw new IOException("Region " + region + " failed to close within" + + " timeout " + timeout); + + } finally { try { - HRegionInfo rsRegion = rs.getRegionInfo(region.getRegionName()); - if (rsRegion == null) throw new NotServingRegionException(); - } catch (Exception e) { - return; + connection.close(); + } catch (IOException ioe) { + if (success) { + throw ioe; + } } - Thread.sleep(1000); } - throw new IOException("Region " + region + " failed to close within" + - " timeout " + timeout); } + } \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java (working copy) @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; @@ -113,12 +114,25 @@ * @throws IOException e */ public static void metaScan(Configuration configuration, + final MetaScannerVisitor visitor, final byte[] tableName, + final byte[] row, final int rowLimit, final byte[] metaTableName) + throws IOException { + HConnectionManager.execute(new HConnectable(configuration) { + @Override + public Void connect(HConnection connection) throws IOException { + metaScan(conf, connection, visitor, tableName, row, rowLimit, + metaTableName); + return null; + } + }); + } + + private static void metaScan(Configuration configuration, HConnection connection, MetaScannerVisitor visitor, byte [] tableName, byte[] row, int rowLimit, final byte [] metaTableName) throws IOException { int rowUpperLimit = rowLimit > 0 ? rowLimit: Integer.MAX_VALUE; - HConnection connection = HConnectionManager.getConnection(configuration); // if row is not null, we want to use the startKey of the row's region as // the startRow for the meta scan. byte[] startRow; @@ -158,8 +172,9 @@ // Scan over each meta region ScannerCallable callable; - int rows = Math.min(rowLimit, - configuration.getInt("hbase.meta.scanner.caching", 100)); + int rows = Math.min(rowLimit, configuration.getInt( + HConstants.HBASE_META_SCANNER_CACHING, + HConstants.DEFAULT_HBASE_META_SCANNER_CACHING)); do { final Scan scan = new Scan(startRow).addFamily(HConstants.CATALOG_FAMILY); if (LOG.isDebugEnabled()) { Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -19,10 +19,12 @@ */ package org.apache.hadoop.hbase.client; +import java.io.Closeable; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -43,6 +45,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -69,6 +72,8 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; +import com.google.common.collect.ImmutableMap; + /** * A non-instantiable class that manages {@link HConnection}s. * This class has a static Map of {@link HConnection} instances keyed by @@ -118,20 +123,31 @@ */ @SuppressWarnings("serial") public class HConnectionManager { - static final int MAX_CACHED_HBASE_INSTANCES = 2001; + // A LRU Map of HConnectionKey -> HConnection (TableServer). + private static final Map HBASE_INSTANCES; - // A LRU Map of Configuration hashcode -> TableServers. We set instances to 31. - // The zk default max connections to the ensemble from the one client is 30 so - // should run into zk issues before hit this value of 31. - private static final Map HBASE_INSTANCES = - new LinkedHashMap - ((int) (MAX_CACHED_HBASE_INSTANCES/0.75F)+1, 0.75F, true) { - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > MAX_CACHED_HBASE_INSTANCES; - } - }; + public static final int MAX_CACHED_HBASE_INSTANCES; + static { + // We set instances to one more than the value specified for {@link + // HConstants#ZOOKEEPER_MAX_CLIENT_CNXNS}. By default, the zk default max + // connections to the ensemble from the one client is 30, so in that case we + // should run into zk issues before the LRU hit this value of 31. + MAX_CACHED_HBASE_INSTANCES = HBaseConfiguration.create().getInt( + HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, + HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1; + HBASE_INSTANCES = new LinkedHashMap( + (int) (MAX_CACHED_HBASE_INSTANCES / 0.75F) + 1, 0.75F, true) { + @Override + protected boolean removeEldestEntry( + Map.Entry eldest) { + return size() > MAX_CACHED_HBASE_INSTANCES; + } + }; + } + + static final Log LOG = LogFactory.getLog(HConnectionManager.class); + /* * Non-instantiable. */ @@ -150,33 +166,34 @@ */ public static HConnection getConnection(Configuration conf) throws ZooKeeperConnectionException { - HConnectionImplementation connection; + HConnectionKey connectionKey = new HConnectionKey(conf); synchronized (HBASE_INSTANCES) { - connection = HBASE_INSTANCES.get(conf); + HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey); if (connection == null) { connection = new HConnectionImplementation(conf); - HBASE_INSTANCES.put(conf, connection); + HBASE_INSTANCES.put(connectionKey, connection); } + connection.incCount(); + return connection; } - return connection; } /** * Delete connection information for the instance specified by configuration. - * This will close connection to the zookeeper ensemble and let go of all - * resources. - * @param conf configuration whose identity is used to find {@link HConnection} - * instance. - * @param stopProxy Shuts down all the proxy's put up to cluster members - * including to cluster HMaster. Calls {@link HBaseRPC#stopProxy(org.apache.hadoop.ipc.VersionedProtocol)}. + * If there are no more references to it, this will then close connection to + * the zookeeper ensemble and let go of all resources. + * + * @param conf + * configuration whose identity is used to find {@link HConnection} + * instance. + * @param stopProxy + * Shuts down all the proxy's put up to cluster members including to + * cluster HMaster. Calls + * {@link HBaseRPC#stopProxy(org.apache.hadoop.ipc.VersionedProtocol)} + * . */ public static void deleteConnection(Configuration conf, boolean stopProxy) { - synchronized (HBASE_INSTANCES) { - HConnectionImplementation t = HBASE_INSTANCES.remove(conf); - if (t != null) { - t.close(stopProxy); - } - } + deleteConnection(new HConnectionKey(conf), stopProxy); } /** @@ -186,14 +203,43 @@ */ public static void deleteAllConnections(boolean stopProxy) { synchronized (HBASE_INSTANCES) { - for (HConnectionImplementation t : HBASE_INSTANCES.values()) { - if (t != null) { - t.close(stopProxy); + Set connectionKeys = new HashSet(); + connectionKeys.addAll(HBASE_INSTANCES.keySet()); + for (HConnectionKey connectionKey : connectionKeys) { + deleteConnection(connectionKey, stopProxy); + } + HBASE_INSTANCES.clear(); + } + } + + private static void deleteConnection(HConnection connection, boolean stopProxy) { + synchronized (HBASE_INSTANCES) { + for (Entry connectionEntry : HBASE_INSTANCES + .entrySet()) { + if (connectionEntry.getValue() == connection) { + deleteConnection(connectionEntry.getKey(), stopProxy); + break; } } } } + private static void deleteConnection(HConnectionKey connectionKey, boolean stopProxy) { + synchronized (HBASE_INSTANCES) { + HConnectionImplementation connection = HBASE_INSTANCES + .get(connectionKey); + if (connection != null) { + connection.decCount(); + if (connection.isZeroReference()) { + HBASE_INSTANCES.remove(connectionKey); + connection.close(stopProxy); + } else if (stopProxy) { + connection.stopProxyOnClose(stopProxy); + } + } + } + } + /** * It is provided for unit test cases which verify the behavior of region * location cache prefetch. @@ -201,10 +247,15 @@ * @throws ZooKeeperConnectionException */ static int getCachedRegionCount(Configuration conf, - byte[] tableName) - throws ZooKeeperConnectionException { - HConnectionImplementation connection = (HConnectionImplementation)getConnection(conf); - return connection.getNumberOfCachedRegionLocations(tableName); + final byte[] tableName) + throws IOException { + return execute(new HConnectable(conf) { + @Override + public Integer connect(HConnection connection) { + return ((HConnectionImplementation) connection) + .getNumberOfCachedRegionLocations(tableName); + } + }); } /** @@ -214,13 +265,162 @@ * @throws ZooKeeperConnectionException */ static boolean isRegionCached(Configuration conf, - byte[] tableName, byte[] row) throws ZooKeeperConnectionException { - HConnectionImplementation connection = (HConnectionImplementation)getConnection(conf); - return connection.isRegionCached(tableName, row); + final byte[] tableName, final byte[] row) throws IOException { + return execute(new HConnectable(conf) { + @Override + public Boolean connect(HConnection connection) { + return ((HConnectionImplementation) connection).isRegionCached(tableName, row); + } + }); } + /** + * This class makes it convenient for one to execute a command in the context + * of a {@link HConnection} instance based on the given {@link Configuration}. + * + *

+ * If you find yourself wanting to use a {@link Connection} for a relatively + * short duration of time, and do not want to deal with the hassle of creating + * and cleaning up that resource, then you should consider using this + * convenience class. + * + * @param + * the return type of the {@link HConnectable#connect(HConnection)} + * method. + */ + public static abstract class HConnectable { + public Configuration conf; + + public HConnectable(Configuration conf) { + this.conf = conf; + } + + public abstract T connect(HConnection connection) throws IOException; + } + + /** + * This convenience method invokes the given {@link HConnectable#connect} + * implementation using a {@link HConnection} instance that lasts just for the + * duration of that invocation. + * + * @param the return type of the connect method + * @param connectable the {@link HConnectable} instance + * @return the value returned by the connect method + * @throws IOException + */ + public static T execute(HConnectable connectable) throws IOException { + if (connectable == null || connectable.conf == null) { + return null; + } + Configuration conf = connectable.conf; + HConnection connection = HConnectionManager.getConnection(conf); + boolean connectSucceeded = false; + try { + T returnValue = connectable.connect(connection); + connectSucceeded = true; + return returnValue; + } finally { + try { + connection.close(); + } catch (Exception e) { + if (connectSucceeded) { + throw new IOException("The connection to " + connection + + " could not be deleted.", e); + } else { + LOG.warn("Masking close error as the connectable block threw one itself."); + } + } + } + } + + /** + * Denotes a unique key to a {@link HConnection} instance. + * + * In essence, this class captures the properties in {@link Configuration} + * that may be used in the process of establishing a connection. In light of + * that, if any new such properties are introduced into the mix, they must be + * added to the {@link HConnectionKey#properties} list. + * + */ + static class HConnectionKey { + public static String[] CONNECTION_PROPERTIES = new String[] { + HConstants.ZOOKEEPER_QUORUM, HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.ZOOKEEPER_CLIENT_PORT, + HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME, + HConstants.HBASE_CLIENT_PAUSE, HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS, + HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.HBASE_CLIENT_PREFETCH_LIMIT, + HConstants.HBASE_META_SCANNER_CACHING, + HConstants.HBASE_CLIENT_INSTANCE_ID }; + + private Map properties; + + public HConnectionKey(Configuration conf) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + if (conf != null) { + if (conf.getBoolean(HConstants.HBASE_CONNECTION_PER_CONFIG, false)) { + builder.put(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(System.identityHashCode(conf))); + } else { + for (String property : CONNECTION_PROPERTIES) { + String value = conf.get(property); + if (value != null) { + builder.put(property, value); + } + } + } + } + this.properties = builder.build(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + for (String property : CONNECTION_PROPERTIES) { + String value = properties.get(property); + if (value != null) { + result = prime * result + value.hashCode(); + } + } + + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + HConnectionKey that = (HConnectionKey) obj; + if (this.properties == null) { + if (that.properties != null) { + return false; + } + } else { + if (that.properties == null) { + return false; + } + for (String property : CONNECTION_PROPERTIES) { + String thisValue = this.properties.get(property); + String thatValue = that.properties.get(property); + if (thisValue == thatValue) { + continue; + } + if (thisValue == null || !thisValue.equals(thatValue)) { + return false; + } + } + } + return true; + } + } + /* Encapsulates connection to zookeeper and regionservers.*/ - static class HConnectionImplementation implements HConnection { + static class HConnectionImplementation implements HConnection, Closeable { static final Log LOG = LogFactory.getLog(HConnectionImplementation.class); private final Class serverInterfaceClass; private final long pause; @@ -263,6 +463,10 @@ private final Set regionCachePrefetchDisabledTables = new CopyOnWriteArraySet(); + private boolean stopProxy; + private int refCount; + + /** * constructor * @param conf Configuration object @@ -282,16 +486,20 @@ "Unable to find region server interface " + serverClassName, e); } - this.pause = conf.getLong("hbase.client.pause", 1000); - this.numRetries = conf.getInt("hbase.client.retries.number", 10); - this.maxRPCAttempts = conf.getInt("hbase.client.rpc.maxattempts", 1); + this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, + HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + this.numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.maxRPCAttempts = conf.getInt( + HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS, + HConstants.DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS); this.rpcTimeout = conf.getInt( HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.prefetchRegionLimit = conf.getInt( + HConstants.HBASE_CLIENT_PREFETCH_LIMIT, + HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT); - this.prefetchRegionLimit = conf.getInt("hbase.client.prefetch.limit", - 10); - setupZookeeperTrackers(); this.master = null; @@ -1050,28 +1258,6 @@ } } - void close(boolean stopProxy) { - if (master != null) { - if (stopProxy) { - HBaseRPC.stopProxy(master); - } - master = null; - masterChecked = false; - } - if (stopProxy) { - for (HRegionInterface i: servers.values()) { - HBaseRPC.stopProxy(i); - } - } - if (this.zooKeeper != null) { - LOG.info("Closed zookeeper sessionid=0x" + - Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId())); - this.zooKeeper.close(); - this.zooKeeper = null; - } - this.closed = true; - } - private Callable createCallable( final HServerAddress address, final MultiAction multi, @@ -1352,5 +1538,85 @@ else LOG.fatal(msg); this.closed = true; } + + public void stopProxyOnClose(boolean stopProxy) { + this.stopProxy = stopProxy; + } + + /** + * Increment this client's reference count. + */ + void incCount() { + ++refCount; + } + + /** + * Decrement this client's reference count. + */ + void decCount() { + if (refCount > 0) { + --refCount; + } + } + + /** + * Return if this client has no reference + * + * @return true if this client has no reference; false otherwise + */ + boolean isZeroReference() { + return refCount == 0; + } + + void close(boolean stopProxy) { + if (this.closed) { + return; + } + if (master != null) { + if (stopProxy) { + HBaseRPC.stopProxy(master); + } + master = null; + masterChecked = false; + } + if (stopProxy) { + for (HRegionInterface i : servers.values()) { + HBaseRPC.stopProxy(i); + } + } + this.servers.clear(); + if (this.zooKeeper != null) { + LOG.info("Closed zookeeper sessionid=0x" + + Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId())); + this.zooKeeper.close(); + this.zooKeeper = null; + } + this.closed = true; + } + + public void close() { + HConnectionManager.deleteConnection(this, stopProxy); + LOG.debug("The connection to " + this.zooKeeper + " has been closed."); + } + + /** + * Close the connection for good, regardless of what the current value of + * {@link #refCount} is. Ideally, {@link refCount} should be zero at this + * point, which would be the case if all of its consumers close the + * connection. However, on the off chance that someone is unable to close + * the connection, perhaps because it bailed out prematurely, the method + * below will ensure that this {@link Connection} instance is cleaned up. + * Caveat: The JVM may take an unknown amount of time to call finalize on an + * unreachable object, so our hope is that every consumer cleans up after + * itself, like any good citizen. + */ + @Override + protected void finalize() throws Throwable { + // Pretend as if we are about to release the last remaining reference + refCount = 1; + close(); + LOG.debug("The connection to " + this.zooKeeper + + " was closed by the finalize method."); + } } } Index: src/main/java/org/apache/hadoop/hbase/client/HConnection.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnection.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/client/HConnection.java (working copy) @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.client; +import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Map; @@ -54,7 +55,7 @@ * * @see HConnectionManager */ -public interface HConnection extends Abortable { +public interface HConnection extends Abortable, Closeable { /** * @return Configuration instance being used by this HConnection instance. */ Index: src/main/java/org/apache/hadoop/hbase/client/HTablePool.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (working copy) @@ -19,6 +19,8 @@ */ package org.apache.hadoop.hbase.client; +import java.io.Closeable; +import java.io.IOException; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; @@ -41,7 +43,7 @@ * *

Pool will manage its own cluster to the cluster. See {@link HConnectionManager}. */ -public class HTablePool { +public class HTablePool implements Closeable { private final ConcurrentMap> tables = new ConcurrentHashMap>(); private final Configuration config; @@ -68,7 +70,7 @@ final HTableInterfaceFactory tableFactory) { // Make a new configuration instance so I can safely cleanup when // done with the pool. - this.config = config == null? new Configuration(): new Configuration(config); + this.config = config == null? new Configuration(): config; this.maxSize = maxSize; this.tableFactory = tableFactory == null? new HTableFactory(): tableFactory; } @@ -151,7 +153,6 @@ table = queue.poll(); } } - HConnectionManager.deleteConnection(this.config, true); } /** @@ -163,6 +164,17 @@ closeTablePool(Bytes.toString(tableName)); } + /** + * Closes all the HTable instances , belonging to all tables in the table pool. + *

+ * Note: this is a 'shutdown' of all the table pools. + */ + public void close() throws IOException { + for (String tableName : tables.keySet()) { + closeTablePool(tableName); + } + } + int getCurrentPoolSize(String tableName) { Queue queue = tables.get(tableName); synchronized(queue) { Index: src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (working copy) @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.client; +import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; import java.net.SocketTimeoutException; @@ -65,10 +66,10 @@ *

Currently HBaseAdmin instances are not expected to be long-lived. For * example, an HBaseAdmin instance will not ride over a Master restart. */ -public class HBaseAdmin implements Abortable { +public class HBaseAdmin implements Abortable, Closeable { private final Log LOG = LogFactory.getLog(this.getClass().getName()); // private final HConnection connection; - final HConnection connection; + private final HConnection connection; private volatile Configuration conf; private final long pause; private final int numRetries; @@ -105,9 +106,7 @@ throws ZooKeeperConnectionException, IOException { CatalogTracker ct = null; try { - HConnection connection = - HConnectionManager.getConnection(this.conf); - ct = new CatalogTracker(connection); + ct = new CatalogTracker(this.conf); ct.start(); } catch (InterruptedException e) { // Let it out as an IOE for now until we redo all so tolerate IEs @@ -1257,4 +1256,10 @@ HBaseAdmin admin = new HBaseAdmin(copyOfConf); HConnectionManager.deleteConnection(admin.getConfiguration(), false); } + + public void close() throws IOException { + if (this.connection != null) { + this.connection.close(); + } + } } Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.client; +import java.io.Closeable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -50,6 +51,7 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -89,13 +91,17 @@ * {@link HTable} passing a new {@link Configuration} instance that has the * new configuration. * + *

Note that this class implements the {@link Closeable} interface. When a + * HTable instance is no longer required, it *should* be closed in order to ensure + * that the underlying resources are promptly released. + * * @see HBaseAdmin for create, drop, list, enable and disable of tables. * @see HConnection * @see HConnectionManager */ -public class HTable implements HTableInterface { +public class HTable implements HTableInterface, Closeable { private static final Log LOG = LogFactory.getLog(HTable.class); - private final HConnection connection; + private HConnection connection; private final byte [] tableName; protected final int scannerTimeout; private volatile Configuration configuration; @@ -108,6 +114,7 @@ private int maxKeyValueSize; private ExecutorService pool; // For Multi private long maxScannerResultSize; + private boolean closed; private static final int DOPUT_WB_CHECK = 10; // i.e., doPut checks the writebuffer every X Puts. /** @@ -205,6 +212,7 @@ new SynchronousQueue(), new DaemonThreadFactory()); ((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true); + this.closed = false; } /** @@ -268,9 +276,14 @@ * @return {@code true} if table is online. * @throws IOException if a remote or network exception occurs */ - public static boolean isTableEnabled(Configuration conf, byte[] tableName) - throws IOException { - return HConnectionManager.getConnection(conf).isTableEnabled(tableName); + public static boolean isTableEnabled(Configuration conf, + final byte[] tableName) throws IOException { + return HConnectionManager.execute(new HConnectable(conf) { + @Override + public Boolean connect(HConnection connection) throws IOException { + return connection.isTableEnabled(tableName); + } + }); } /** @@ -858,8 +871,15 @@ @Override public void close() throws IOException { + if (this.closed) { + return; + } flushCommits(); this.pool.shutdown(); + if (this.connection != null) { + this.connection.close(); + } + this.closed = true; } // validate for well-formedness @@ -1346,12 +1366,18 @@ * @param tableName name of table to configure. * @param enable Set to true to enable region cache prefetch. Or set to * false to disable it. - * @throws ZooKeeperConnectionException + * @throws IOException */ public static void setRegionCachePrefetch(final byte[] tableName, - boolean enable) throws ZooKeeperConnectionException { - HConnectionManager.getConnection(HBaseConfiguration.create()). - setRegionCachePrefetch(tableName, enable); + final boolean enable) throws IOException { + HConnectionManager.execute(new HConnectable(HBaseConfiguration + .create()) { + @Override + public Void connect(HConnection connection) throws IOException { + connection.setRegionCachePrefetch(tableName, enable); + return null; + } + }); } /** @@ -1362,12 +1388,17 @@ * @param tableName name of table to configure. * @param enable Set to true to enable region cache prefetch. Or set to * false to disable it. - * @throws ZooKeeperConnectionException + * @throws IOException */ public static void setRegionCachePrefetch(final Configuration conf, - final byte[] tableName, boolean enable) throws ZooKeeperConnectionException { - HConnectionManager.getConnection(conf).setRegionCachePrefetch( - tableName, enable); + final byte[] tableName, final boolean enable) throws IOException { + HConnectionManager.execute(new HConnectable(conf) { + @Override + public Void connect(HConnection connection) throws IOException { + connection.setRegionCachePrefetch(tableName, enable); + return null; + } + }); } /** @@ -1376,12 +1407,16 @@ * @param tableName name of table to check * @return true if table's region cache prefecth is enabled. Otherwise * it is disabled. - * @throws ZooKeeperConnectionException + * @throws IOException */ public static boolean getRegionCachePrefetch(final Configuration conf, - final byte[] tableName) throws ZooKeeperConnectionException { - return HConnectionManager.getConnection(conf).getRegionCachePrefetch( - tableName); + final byte[] tableName) throws IOException { + return HConnectionManager.execute(new HConnectable(conf) { + @Override + public Boolean connect(HConnection connection) throws IOException { + return connection.getRegionCachePrefetch(tableName); + } + }); } /** @@ -1389,10 +1424,14 @@ * @param tableName name of table to check * @return true if table's region cache prefecth is enabled. Otherwise * it is disabled. - * @throws ZooKeeperConnectionException + * @throws IOException */ - public static boolean getRegionCachePrefetch(final byte[] tableName) throws ZooKeeperConnectionException { - return HConnectionManager.getConnection(HBaseConfiguration.create()). - getRegionCachePrefetch(tableName); + public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException { + return HConnectionManager.execute(new HConnectable(HBaseConfiguration.create()) { + @Override + public Boolean connect(HConnection connection) throws IOException { + return connection.getRegionCachePrefetch(tableName); + } + }); } } Index: src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (revision 1188010) +++ src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (working copy) @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.client.replication; +import java.io.Closeable; import java.io.IOException; import org.apache.commons.lang.NotImplementedException; @@ -63,9 +64,10 @@ * replication. *

*/ -public class ReplicationAdmin { +public class ReplicationAdmin implements Closeable { private final ReplicationZookeeper replicationZk; + private final Configuration configuration; private final HConnection connection; /** @@ -79,6 +81,7 @@ throw new RuntimeException("hbase.replication isn't true, please " + "enable it in order to use replication"); } + this.configuration = conf; this.connection = HConnectionManager.getConnection(conf); ZooKeeperWatcher zkw = this.connection.getZooKeeperWatcher(); try { @@ -169,4 +172,11 @@ ReplicationZookeeper getReplicationZk() { return replicationZk; } + + @Override + public void close() throws IOException { + if (this.connection != null) { + this.connection.close(); + } + } } Index: src/main/resources/hbase-default.xml =================================================================== --- src/main/resources/hbase-default.xml (revision 1188010) +++ src/main/resources/hbase-default.xml (working copy) @@ -124,6 +124,14 @@ before running a retry of a failed get, region lookup, etc. + hbase.connection.per.config + true + Disallows sharing of connections for configuration instances + with equivalent settings. + Default: true (expected to be false in future releases). + + + hbase.client.retries.number 10 Maximum retries. Used as maximum for all retryable