Index: src/main/java/org/apache/hadoop/hbase/client/HTablePool.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (revision 1463112) +++ src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (working copy) @@ -24,6 +24,8 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -59,6 +61,7 @@ */ public class HTablePool implements Closeable { private final PoolMap tables; + private final ConcurrentHashMap concurrentUsedTableCounts; private final int maxSize; private final PoolType poolType; private final Configuration config; @@ -156,8 +159,20 @@ } this.tables = new PoolMap(this.poolType, this.maxSize); + this.concurrentUsedTableCounts = new ConcurrentHashMap(); } + private void increaseConcurrentUsedTableCount(String tableName) { + AtomicInteger initCount = new AtomicInteger(0); + AtomicInteger previousCount = null; + if ((previousCount = concurrentUsedTableCounts.putIfAbsent(tableName, initCount)) == null) { + // no count associate with tableName + initCount.incrementAndGet(); + } else { + previousCount.incrementAndGet(); + } + } + /** * Get a reference to the specified table from the pool. *

@@ -172,6 +187,8 @@ public HTableInterface getTable(String tableName) { // call the old getTable implementation renamed to findOrCreateTable HTableInterface table = findOrCreateTable(tableName); + // increase concurrent using table count after findOrCreateTable success + increaseConcurrentUsedTableCount(tableName); // return a proxy table so when user closes the proxy, the actual table // will be returned to the pool return new PooledHTable(table); @@ -252,12 +269,19 @@ private void returnTable(HTableInterface table) throws IOException { // this is the old putTable method renamed and made private String tableName = Bytes.toString(table.getTableName()); + if (tables.size(tableName) >= maxSize) { // release table instance since we're not reusing it this.tables.remove(tableName, table); this.tableFactory.releaseHTableInterface(table); + // we should decrease the concurrent using table count after releaseHTableInterface + // because releaseHTableInterface may throw IOException when close HTable fail. In + // this case, table has been removed from HTable.tables, user should decide either retry + // until success or tolerate the IOException. + concurrentUsedTableCounts.get(table).decrementAndGet(); return; } + concurrentUsedTableCounts.get(table).decrementAndGet(); tables.put(tableName, table); } @@ -312,6 +336,11 @@ return tables.size(tableName); } + public int getConcurrentUsedTableCount(String tableName) { + AtomicInteger value = concurrentUsedTableCounts.get(tableName); + return value == null ? 0 : value.get(); + } + /** * A proxy class that implements HTableInterface.close method to return the * wrapped table back to the table pool