Index: src/test/java/org/apache/hadoop/hbase/client/TestHCM.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (revision 1202802) +++ src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (working copy) @@ -22,6 +22,8 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; import java.lang.reflect.Field; import java.util.ArrayList; @@ -203,4 +205,51 @@ Thread.sleep(50); } } + + @Test + public void testClosing() throws Exception { + Configuration configuration = TEST_UTIL.getConfiguration(); + configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID, + String.valueOf(_randy.nextInt())); + + HConnection c1 = HConnectionManager.createConnection(configuration); + HConnection c2 = HConnectionManager.createConnection(configuration); + + HConnection c3 = HConnectionManager.getConnection(configuration); + HConnection c4 = HConnectionManager.getConnection(configuration); + assertTrue(c3 == c4); + + c1.close(); + assertTrue(c1.isClosed()); + assertFalse(c2.isClosed()); + assertFalse(c3.isClosed()); + + c3.close(); + // still a reference left + assertFalse(c3.isClosed()); + c3.close(); + assertTrue(c3.isClosed()); + // c3 was removed from the cache + assertTrue(HConnectionManager.getConnection(configuration) != c3); + + assertFalse(c2.isClosed()); + } + + /** + * Trivial test to verify that nobody messes with + * {@link HConnectionManager#createConnection(Configuration)} + */ + @Test + public void testCreateConnection() throws Exception { + Configuration configuration = TEST_UTIL.getConfiguration(); + HConnection c1 = HConnectionManager.createConnection(configuration); + HConnection c2 = HConnectionManager.createConnection(configuration); + // created from the same configuration, yet they are different + assertTrue(c1 != c2); + assertTrue(c1.getConfiguration() == c2.getConfiguration()); + // make sure these were not cached + HConnection c3 = HConnectionManager.getConnection(configuration); + assertTrue(c1 != c3); + assertTrue(c2 != c3); + } } Index: src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java (revision 1202802) +++ src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java (working copy) @@ -79,7 +79,7 @@ HConnectionImplementation connection = HConnectionManager.HBASE_INSTANCES.get(connectionKey); if (connection == null) { - connection = Mockito.spy(new HConnectionImplementation(conf)); + connection = Mockito.spy(new HConnectionImplementation(conf, true)); HConnectionManager.HBASE_INSTANCES.put(connectionKey, connection); } return connection; Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1202802) +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -177,7 +177,7 @@ synchronized (HBASE_INSTANCES) { HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey); if (connection == null) { - connection = new HConnectionImplementation(conf); + connection = new HConnectionImplementation(conf, true); HBASE_INSTANCES.put(connectionKey, connection); } connection.incCount(); @@ -186,6 +186,21 @@ } /** + * Create a new HConnection instance using the passed conf + * instance. + * Note: This bypasses the usual HConnection life cycle management! + * Use this with caution, your application must manage the + * connection's life cycle explicitly. + * @param conf configuration + * @return HConnection object for conf + * @throws ZooKeeperConnectionException + */ + public static HConnection createConnection(Configuration conf) + throws ZooKeeperConnectionException { + return new HConnectionImplementation(conf, false); + } + + /** * Delete connection information for the instance specified by configuration. * If there are no more references to it, this will then close connection to * the zookeeper ensemble and let go of all resources. @@ -483,15 +498,17 @@ private boolean stopProxy; private int refCount; - + // indicates whether this connection's life cycle is managed + private final boolean isManaged; /** * constructor * @param conf Configuration object */ @SuppressWarnings("unchecked") - public HConnectionImplementation(Configuration conf) + public HConnectionImplementation(Configuration conf, boolean isManaged) throws ZooKeeperConnectionException { this.conf = conf; + this.isManaged = isManaged; String serverClassName = conf.get(HConstants.REGION_SERVER_CLASS, HConstants.DEFAULT_REGION_SERVER_CLASS); this.closed = false; @@ -1639,6 +1656,11 @@ this.aborted = true; this.closed = true; } + + @Override + public boolean isClosed() { + return this.closed; + } @Override public boolean isAborted(){ @@ -1712,7 +1734,11 @@ } public void close() { - HConnectionManager.deleteConnection((HConnection)this, stopProxy, false); + if (isManaged) { + HConnectionManager.deleteConnection((HConnection)this, stopProxy, false); + } else { + close(true); + } LOG.debug("The connection to " + this.zooKeeper + " has been closed."); } Index: src/main/java/org/apache/hadoop/hbase/client/HConnection.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnection.java (revision 1202802) +++ src/main/java/org/apache/hadoop/hbase/client/HConnection.java (working copy) @@ -369,4 +369,8 @@ public HTableDescriptor[] getHTableDescriptors(List tableNames) throws IOException; + /** + * @return true if this connection is closed + */ + public boolean isClosed(); } Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1202802) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -111,7 +111,7 @@ private static final Log LOG = LogFactory.getLog(HTable.class); private HConnection connection; private final byte [] tableName; - protected final int scannerTimeout; + protected int scannerTimeout; private volatile Configuration configuration; private final ArrayList writeBuffer = new ArrayList(); private long writeBufferSize; @@ -125,6 +125,7 @@ private boolean closed; private int operationTimeout; private static final int DOPUT_WB_CHECK = 10; // i.e., doPut checks the writebuffer every X Puts. + private final boolean cleanupOnClose; // close the connection in close() /** * Creates an object to access a HBase table. @@ -155,36 +156,19 @@ public HTable(Configuration conf, final byte [] tableName) throws IOException { this.tableName = tableName; + this.cleanupOnClose = true; if (conf == null) { this.scannerTimeout = 0; this.connection = null; return; } this.connection = HConnectionManager.getConnection(conf); - this.scannerTimeout = - (int) conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, - HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD); - this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT - : conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); this.configuration = conf; - this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW); - this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152); - this.clearBufferOnFail = true; - this.autoFlush = true; - this.currentWriteBufferSize = 0; - this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 1); - this.maxScannerResultSize = conf.getLong( - HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); - this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1); - int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); if (maxThreads == 0) { maxThreads = 1; // is there a better default? } - // Using the "direct handoff" approach, new threads will only be created // if it is necessary and will grow unbounded. This could be bad but in HCM // we only create as many Runnables as there are region servers. It means @@ -194,6 +178,59 @@ new SynchronousQueue(), new DaemonThreadFactory()); ((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true); + + this.finishSetup(); + } + + /** + * Creates an object to access a HBase table. + * Shares zookeeper connection and other resources with other HTable instances + * created with the same connection instance. + * Use this constructor when the ExecutorService and HConnection instance are + * externally managed. + * @param tableName Name of the table. + * @param connection HConnection to be used. + * @param pool ExecutorService to be used. + * @throws IOException if a remote or network exception occurs + */ + public HTable(final byte[] tableName, final HConnection connection, + final ExecutorService pool) throws IOException { + assert connection != null && pool != null; + + this.tableName = tableName; + this.cleanupOnClose = false; + this.connection = connection; + this.configuration = connection.getConfiguration(); + this.pool = pool; + + this.finishSetup(); + } + + /** + * setup this HTable's parameter based on the passed configuration + * @param conf + */ + private void finishSetup() throws IOException { + this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW); + this.scannerTimeout = (int) this.configuration.getLong( + HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, + HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD); + this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT + : this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + this.writeBufferSize = this.configuration.getLong( + "hbase.client.write.buffer", 2097152); + this.clearBufferOnFail = true; + this.autoFlush = true; + this.currentWriteBufferSize = 0; + this.scannerCaching = this.configuration.getInt( + "hbase.client.scanner.caching", 1); + + this.maxScannerResultSize = this.configuration.getLong( + HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + this.maxKeyValueSize = this.configuration.getInt( + "hbase.client.keyvalue.maxsize", -1); this.closed = false; } @@ -886,9 +923,11 @@ return; } flushCommits(); - this.pool.shutdown(); - if (this.connection != null) { - this.connection.close(); + if (cleanupOnClose) { + this.pool.shutdown(); + if (this.connection != null) { + this.connection.close(); + } } this.closed = true; }