Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1092780) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -30,7 +30,7 @@ import java.util.Map; import java.util.TreeMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -54,8 +54,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.zookeeper.KeeperException; /** * Used to communicate with a single HBase table. @@ -180,18 +178,18 @@ HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1); - int nrThreads = conf.getInt("hbase.htable.threads.max", getCurrentNrHRS()); - if (nrThreads == 0) { - nrThreads = 1; // is there a better default? + int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); + if (maxThreads == 0) { + maxThreads = 1; // is there a better default? } - // Unfortunately Executors.newCachedThreadPool does not allow us to - // set the maximum size of the pool, so we have to do it ourselves. - // Must also set set corethreadpool size as with a LinkedBlockingQueue, - // a new thread will not be started until the queue is full - this.pool = new ThreadPoolExecutor(nrThreads, nrThreads, + // Using the "direct handoff" approach, new threads will only be created + // if it is necessary and will grow unbounded. This could be bad but in HCM + // we only create as many Runnables as there are region servers. It means + // it also scales when new region servers are added. + this.pool = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, - new LinkedBlockingQueue(), + new SynchronousQueue(), new DaemonThreadFactory()); ((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true); } @@ -201,21 +199,6 @@ } /** - * @return the number of region servers that are currently running - * @throws IOException if a remote or network exception occurs - */ - public int getCurrentNrHRS() throws IOException { - try { - // We go to zk rather than to master to get count of regions to avoid - // HTable having a Master dependency. See HBase-2828 - return ZKUtil.getNumberOfChildren(this.connection.getZooKeeperWatcher(), - this.connection.getZooKeeperWatcher().rsZNode); - } catch (KeeperException ke) { - throw new IOException("Unexpected ZooKeeper exception", ke); - } - } - - /** * Tells whether or not a table is enabled or not. * Warning: use {@link HBaseAdmin#isTableEnabled(byte[])} instead. * @param tableName Name of table to check. @@ -1259,6 +1242,14 @@ } } + /** + * The pool is used for mutli requests for this HTable + * @return the pool used for mutli + */ + ExecutorService getPool() { + return this.pool; + } + static class DaemonThreadFactory implements ThreadFactory { static final AtomicInteger poolNumber = new AtomicInteger(1); final ThreadGroup group; Index: src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1092780) +++ src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -34,9 +34,12 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.UUID; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -512,7 +515,7 @@ assertEquals(count, 10); scanner.close(); } - + /** * Test simple table and non-existent row cases. */ @@ -3970,5 +3973,65 @@ assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], 2*(i+1)); } } + + /** + * This test demonstrates how we use ThreadPoolExecutor. + * It needs to show that we only use as many threads in the pool as we have + * region servers. To do this, instead of doing real requests, we use a + * SynchronousQueue where each put must wait for a take (and vice versa) + * so that way we have full control of the number of active threads. + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testPoolBehavior() throws IOException, InterruptedException { + byte[] someBytes = Bytes.toBytes("pool"); + HTable table = TEST_UTIL.createTable(someBytes, someBytes); + ThreadPoolExecutor pool = (ThreadPoolExecutor)table.getPool(); + + // Make sure that the TPE stars with a core pool size of one and 0 + // initialized worker threads + assertEquals(1, pool.getCorePoolSize()); + assertEquals(0, pool.getPoolSize()); + + // Build a SynchronousQueue that we use for thread coordination + final SynchronousQueue queue = new SynchronousQueue(); + List threads = new ArrayList(5); + for (int i = 0; i < 5; i++) { + threads.add(new Thread() { + public void run() { + try { + // The thread blocks here until we decide to let it go + queue.take(); + } catch (InterruptedException ie) { } + } + }); + } + // First, add two threads and make sure the pool size follows + pool.submit(threads.get(0)); + assertEquals(1, pool.getPoolSize()); + pool.submit(threads.get(1)); + assertEquals(2, pool.getPoolSize()); + + // Next, terminate those threads and then make sure the pool is still the + // same size + queue.put(new Object()); + threads.get(0).join(); + queue.put(new Object()); + threads.get(1).join(); + assertEquals(2, pool.getPoolSize()); + + // Now let's simulate adding a RS meaning that we'll go up to three + // concurrent threads. The pool should not grow larger than three. + pool.submit(threads.get(2)); + pool.submit(threads.get(3)); + pool.submit(threads.get(4)); + assertEquals(3, pool.getPoolSize()); + queue.put(new Object()); + queue.put(new Object()); + queue.put(new Object()); + } + + }