Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (revision 1470804) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (working copy) @@ -37,6 +37,8 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; /** * A simple pool of HTable instances. @@ -64,6 +66,7 @@ @InterfaceStability.Stable public class HTablePool implements Closeable { private final PoolMap tables; + private final ConcurrentHashMap concurrentUsedTableCounts; private final int maxSize; private final PoolType poolType; private final Configuration config; @@ -161,8 +164,20 @@ } this.tables = new PoolMap(this.poolType, this.maxSize); + this.concurrentUsedTableCounts = new ConcurrentHashMap(); } + private void increaseConcurrentUsedTableCount(String tableName) { + AtomicInteger initCount = new AtomicInteger(0); + AtomicInteger previousCount = null; + if ((previousCount = concurrentUsedTableCounts.putIfAbsent(tableName, initCount)) == null) { + // no count associate with tableName + initCount.incrementAndGet(); + } else { + previousCount.incrementAndGet(); + } + } + /** * Get a reference to the specified table from the pool. *

@@ -177,6 +192,8 @@ public HTableInterface getTable(String tableName) { // call the old getTable implementation renamed to findOrCreateTable HTableInterface table = findOrCreateTable(tableName); + // increase concurrent using table count after findOrCreateTable success + increaseConcurrentUsedTableCount(tableName); // return a proxy table so when user closes the proxy, the actual table // will be returned to the pool return new PooledHTable(table); @@ -257,12 +274,19 @@ private void returnTable(HTableInterface table) throws IOException { // this is the old putTable method renamed and made private String tableName = Bytes.toString(table.getTableName()); + if (tables.size(tableName) >= maxSize) { // release table instance since we're not reusing it this.tables.remove(tableName, table); this.tableFactory.releaseHTableInterface(table); + // we should decrease the concurrent using table count after releaseHTableInterface + // because releaseHTableInterface may throw IOException when close HTable fail. In + // this case, table has been removed from HTable.tables, user should decide either retry + // until success or tolerate the IOException. + concurrentUsedTableCounts.get(tableName).decrementAndGet(); return; } + concurrentUsedTableCounts.get(tableName).decrementAndGet(); tables.put(tableName, table); } @@ -317,6 +341,15 @@ return tables.size(tableName); } + public int getConcurrentUsedTableCount(String tableName) { + AtomicInteger value = concurrentUsedTableCounts.get(tableName); + return value == null ? 0 : value.get(); + } + + public int getConcurrentUsedTableCount(byte[] tableName) { + return getConcurrentUsedTableCount(Bytes.toString(tableName)); + } + /** * A proxy class that implements HTableInterface.close method to return the * wrapped table back to the table pool Index: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java (revision 1470804) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java (working copy) @@ -182,6 +182,20 @@ Assert.assertTrue("alien table rejected", true); } } + + @Test + public void testConcurrentUsedTableCount() throws IOException { + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 1, getPoolType()); + Assert.assertEquals(0, pool.getConcurrentUsedTableCount(TABLENAME)); + HTableInterface table1 = pool.getTable(TABLENAME); + HTableInterface table2 = pool.getTable(TABLENAME); + Assert.assertEquals(2, pool.getConcurrentUsedTableCount(TABLENAME)); + table1.close(); + Assert.assertEquals(1, pool.getConcurrentUsedTableCount(TABLENAME)); + table2.close(); + Assert.assertEquals(0, pool.getConcurrentUsedTableCount(TABLENAME)); + pool.close(); + } } @Category(MediumTests.class) @@ -256,7 +270,7 @@ Assert.assertEquals(0, pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); - } + } } @Category(MediumTests.class) @@ -332,7 +346,6 @@ Assert.assertEquals(0, pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); - } + } } - }