Index: src/main/java/org/apache/hadoop/hbase/client/HConnection.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnection.java (revision 1212237) +++ src/main/java/org/apache/hadoop/hbase/client/HConnection.java (working copy) @@ -297,4 +297,9 @@ */ public void prewarmRegionCache(final byte[] tableName, final Map regions); + + /** + * @return true if this connection is closed + */ + public boolean isClosed(); } Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1212237) +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -169,7 +169,7 @@ synchronized (HBASE_INSTANCES) { HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey); if (connection == null) { - connection = new HConnectionImplementation(conf); + connection = new HConnectionImplementation(conf, true); HBASE_INSTANCES.put(connectionKey, connection); } connection.incCount(); @@ -178,6 +178,21 @@ } /** + * Create a new HConnection instance using the passed conf + * instance. + * Note: This bypasses the usual HConnection life cycle management! + * Use this with caution, the caller is responsible for closing the + * created connection. + * @param conf configuration + * @return HConnection object for conf + * @throws ZooKeeperConnectionException + */ + public static HConnection createConnection(Configuration conf) + throws ZooKeeperConnectionException { + return new HConnectionImplementation(conf, false); + } + + /** * Delete connection information for the instance specified by configuration. * If there are no more references to it, this will then close connection to * the zookeeper ensemble and let go of all resources. @@ -464,6 +479,8 @@ private boolean stopProxy; private int refCount; + // indicates whether this connection's life cycle is managed + private final boolean managed; /** @@ -471,8 +488,9 @@ * @param conf Configuration object */ @SuppressWarnings("unchecked") - public HConnectionImplementation(Configuration conf) - throws ZooKeeperConnectionException { + public HConnectionImplementation(Configuration conf, boolean managed) + throws ZooKeeperConnectionException { + this.managed = managed; this.conf = conf; String serverClassName = conf.get(HConstants.REGION_SERVER_CLASS, HConstants.DEFAULT_REGION_SERVER_CLASS); @@ -1517,6 +1535,11 @@ } @Override + public boolean isClosed() { + return this.closed; + } + + @Override public void abort(final String msg, Throwable t) { if (t instanceof KeeperException.SessionExpiredException) { try { @@ -1593,8 +1616,12 @@ this.closed = true; } - public void close() { - HConnectionManager.deleteConnection(this, stopProxy); + public void close() { + if (managed) { + HConnectionManager.deleteConnection(this, stopProxy); + } else { + close(true); + } LOG.debug("The connection to " + this.zooKeeper + " has been closed."); } Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1212237) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -103,7 +103,7 @@ private static final Log LOG = LogFactory.getLog(HTable.class); private HConnection connection; private final byte [] tableName; - protected final int scannerTimeout; + protected int scannerTimeout; private volatile Configuration configuration; private final ArrayList writeBuffer = new ArrayList(); private long writeBufferSize; @@ -116,6 +116,7 @@ private long maxScannerResultSize; private boolean closed; private static final int DOPUT_WB_CHECK = 10; // i.e., doPut checks the writebuffer every X Puts. + private final boolean cleanupOnClose; // close the connection in close() /** * Creates an object to access a HBase table. @@ -163,7 +164,6 @@ this(conf, Bytes.toBytes(tableName)); } - /** * Creates an object to access a HBase table. * Shares zookeeper connection and other resources with other HTable instances @@ -177,41 +177,78 @@ public HTable(Configuration conf, final byte [] tableName) throws IOException { this.tableName = tableName; + this.cleanupOnClose = true; if (conf == null) { this.scannerTimeout = 0; this.connection = null; return; } + this.configuration = conf; this.connection = HConnectionManager.getConnection(conf); - this.scannerTimeout = - (int) conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD); - this.configuration = conf; - this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW); - this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152); - this.clearBufferOnFail = true; - this.autoFlush = true; - this.currentWriteBufferSize = 0; - this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 1); + - this.maxScannerResultSize = conf.getLong( - HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); - this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1); - int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); if (maxThreads == 0) { maxThreads = 1; // is there a better default? } + long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60); // Using the "direct handoff" approach, new threads will only be created // if it is necessary and will grow unbounded. This could be bad but in HCM // we only create as many Runnables as there are region servers. It means // it also scales when new region servers are added. - this.pool = new ThreadPoolExecutor(1, maxThreads, - 60, TimeUnit.SECONDS, - new SynchronousQueue(), + this.pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, + TimeUnit.SECONDS, new SynchronousQueue(), new DaemonThreadFactory()); - ((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true); + ((ThreadPoolExecutor) this.pool).allowCoreThreadTimeOut(true); + + this.finishSetup(); + } + + /** + * Creates an object to access a HBase table. + * Shares zookeeper connection and other resources with other HTable instances + * created with the same connection instance. + * Use this constructor when the ExecutorService and HConnection instance are + * externally managed. + * @param tableName Name of the table. + * @param connection HConnection to be used. + * @param pool ExecutorService to be used.XS + * @throws IOException if a remote or network exception occurs + */ + public HTable(final byte[] tableName, final HConnection connection, + final ExecutorService pool) throws IOException { + if (pool == null || pool.isShutdown()) { + throw new IllegalArgumentException("Pool is null or shut down."); + } + if (connection == null || connection.isClosed()) { + throw new IllegalArgumentException("Connection is null or closed."); + } + this.tableName = tableName; + this.cleanupOnClose = false; + this.connection = connection; + this.configuration = connection.getConfiguration(); + this.pool = pool; + + this.finishSetup(); + } + + private void finishSetup() throws IOException { + this.scannerTimeout = (int) this.configuration.getLong( + HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, + HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD); + + this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW); + this.writeBufferSize = this.configuration.getLong("hbase.client.write.buffer", 2097152); + this.clearBufferOnFail = true; + this.autoFlush = true; + this.currentWriteBufferSize = 0; + this.scannerCaching = this.configuration.getInt("hbase.client.scanner.caching", 1); + + this.maxScannerResultSize = this.configuration.getLong( + HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + this.maxKeyValueSize = this.configuration.getInt("hbase.client.keyvalue.maxsize", -1); this.closed = false; } @@ -875,9 +912,11 @@ return; } flushCommits(); - this.pool.shutdown(); - if (this.connection != null) { - this.connection.close(); + if (cleanupOnClose) { + this.pool.shutdown(); + if (this.connection != null) { + this.connection.close(); + } } this.closed = true; } Index: src/test/java/org/apache/hadoop/hbase/client/TestHCM.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (revision 1212237) +++ src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (working copy) @@ -44,7 +44,10 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; + /** * This class is for testing HCM features */ @@ -208,4 +211,51 @@ Thread.sleep(50); } } + + @Test + public void testClosing() throws Exception { + Configuration configuration = TEST_UTIL.getConfiguration(); + configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID, + String.valueOf(_randy.nextInt())); + + HConnection c1 = HConnectionManager.createConnection(configuration); + HConnection c2 = HConnectionManager.createConnection(configuration); + + HConnection c3 = HConnectionManager.getConnection(configuration); + HConnection c4 = HConnectionManager.getConnection(configuration); + assertTrue(c3 == c4); + + c1.close(); + assertTrue(c1.isClosed()); + assertFalse(c2.isClosed()); + assertFalse(c3.isClosed()); + + c3.close(); + // still a reference left + assertFalse(c3.isClosed()); + c3.close(); + assertTrue(c3.isClosed()); + // c3 was removed from the cache + assertTrue(HConnectionManager.getConnection(configuration) != c3); + + assertFalse(c2.isClosed()); + } + + /** + * Trivial test to verify that nobody messes with + * {@link HConnectionManager#createConnection(Configuration)} + */ + @Test + public void testCreateConnection() throws Exception { + Configuration configuration = TEST_UTIL.getConfiguration(); + HConnection c1 = HConnectionManager.createConnection(configuration); + HConnection c2 = HConnectionManager.createConnection(configuration); + // created from the same configuration, yet they are different + assertTrue(c1 != c2); + assertTrue(c1.getConfiguration() == c2.getConfiguration()); + // make sure these were not cached + HConnection c3 = HConnectionManager.getConnection(configuration); + assertTrue(c1 != c3); + assertTrue(c2 != c3); + } }