Index: src/test/java/org/apache/hadoop/hbase/client/TestHConnection.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestHConnection.java (revision 1511156) +++ src/test/java/org/apache/hadoop/hbase/client/TestHConnection.java (working copy) @@ -83,7 +83,7 @@ class HConnectionRaceTester extends HConnectionImplementation { public HConnectionRaceTester(Configuration configuration, boolean managed) throws ZooKeeperConnectionException { - super(configuration, managed); + super(configuration, managed, null); } /** Index: src/test/java/org/apache/hadoop/hbase/client/TestHCM.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (revision 1511156) +++ src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (working copy) @@ -25,12 +25,14 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.IOException; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -130,6 +132,61 @@ } @Test + public void testClusterConnection() throws IOException { + ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, + new SynchronousQueue(), Threads.newDaemonThreadFactory("test-hcm")); + + HConnection con1 = HConnectionManager.createConnection(TEST_UTIL.getConfiguration()); + HConnection con2 = HConnectionManager.createConnection(TEST_UTIL.getConfiguration(), otherPool); + // make sure the internally created ExecutorService is the one passed + assertTrue(otherPool == ((HConnectionImplementation) con2).getCurrentBatchPool()); + + String tableName = "testClusterConnection"; + TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close(); + HTable t = (HTable) con1.getTable(tableName, otherPool); + // make sure passing a pool to the getTable does not trigger creation of an + // internal pool + assertNull("Internal Thread pool should be null", + ((HConnectionImplementation) con1).getCurrentBatchPool()); + // table should use the pool passed + assertTrue(otherPool == t.getPool()); + t.close(); + + t = (HTable) con2.getTable(tableName); + // table should use the connectin's internal pool + assertTrue(otherPool == t.getPool()); + t.close(); + + t = (HTable) con2.getTable(Bytes.toBytes(tableName)); + // try other API too + assertTrue(otherPool == t.getPool()); + t.close(); + + t = (HTable) con1.getTable(tableName); + ExecutorService pool = ((HConnectionImplementation) con1).getCurrentBatchPool(); + // make sure an internal pool was created + assertNotNull("An internal Thread pool should have been created", pool); + // and that the table is using it + assertTrue(t.getPool() == pool); + t.close(); + + t = (HTable) con1.getTable(tableName); + // still using the *same* internal pool + assertTrue(t.getPool() == pool); + t.close(); + + con1.close(); + // if the pool was created on demand it should be closed upon connectin + // close + assertTrue(pool.isShutdown()); + + con2.close(); + // if the pool is passed, it is not closed + assertFalse(otherPool.isShutdown()); + otherPool.shutdownNow(); + } + + @Test public void abortingHConnectionRemovesItselfFromHCM() throws Exception { // Save off current HConnections Map oldHBaseInstances = @@ -180,20 +237,16 @@ public void testConnectionManagement() throws Exception{ TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM); HConnection conn = HConnectionManager.createConnection(TEST_UTIL.getConfiguration()); - ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10, - 60, TimeUnit.SECONDS, - new SynchronousQueue(), - Threads.newDaemonThreadFactory("test-hcm-table")); - HTable table = new HTable(TABLE_NAME1, conn, pool); + HTableInterface table = conn.getTable(TABLE_NAME1); table.close(); assertFalse(conn.isClosed()); - assertFalse(pool.isShutdown()); - table = new HTable(TEST_UTIL.getConfiguration(), TABLE_NAME1, pool); + assertFalse(((HTable)table).getPool().isShutdown()); + table = conn.getTable(TABLE_NAME1); table.close(); - assertFalse(pool.isShutdown()); + assertFalse(((HTable)table).getPool().isShutdown()); conn.close(); - pool.shutdownNow(); + assertTrue(((HTable)table).getPool().isShutdown()); } /** Index: src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1511156) +++ src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -3901,12 +3901,7 @@ HTable createUnmangedHConnectionHTable(final byte [] tableName) throws IOException { TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY); HConnection conn = HConnectionManager.createConnection(TEST_UTIL.getConfiguration()); - ExecutorService pool = new ThreadPoolExecutor(1, Integer.MAX_VALUE, - 60, TimeUnit.SECONDS, - new SynchronousQueue(), - Threads.newDaemonThreadFactory("test-from-client-table")); - ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true); - return new HTable(tableName, conn, pool); + return (HTable)conn.getTable(tableName); } /** Index: src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java (revision 1511156) +++ src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java (working copy) @@ -129,7 +129,7 @@ HConnectionImplementation connection = HConnectionManager.HBASE_INSTANCES.get(connectionKey); if (connection == null) { - connection = Mockito.spy(new HConnectionImplementation(conf, true)); + connection = Mockito.spy(new HConnectionImplementation(conf, true, null)); HConnectionManager.HBASE_INSTANCES.put(connectionKey, connection); } return connection; Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1511156) +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -41,6 +41,9 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -75,6 +78,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.SoftValueSortedMap; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.ClusterId; import org.apache.hadoop.hbase.zookeeper.RootRegionTracker; @@ -85,8 +89,22 @@ import org.apache.zookeeper.KeeperException; /** - * A non-instantiable class that manages {@link HConnection}s. - * This class has a static Map of {@link HConnection} instances keyed by + * A non-instantiable class that manages creation of {@link HConnection}s. + *

The simplest way to use this class is by using {@link #createConnection(Configuration)}. + * This creates a new {@link HConnection} that is managed by the caller. + * From this {@link HConnection} {@link HTableInterface} implementations are retrieved + * with {@link HConnection#getTable(byte[])}. Example: + *

+ * {@code
+ * HConnection connection = HConnectionManager.createConnection(config);
+ * HTableInterface table = connection.getTable("table1");
+ * // use the table as needed, for a single operation and a single thread
+ * table.close();
+ * connection.close();
+ * }
+ * 
+ *

The following logic and API will be removed in the future: + *

This class has a static Map of {@link HConnection} instances keyed by * {@link Configuration}; all invocations of {@link #getConnection(Configuration)} * that pass the same {@link Configuration} instance will be returned the same * {@link HConnection} instance (Adding properties to a Configuration @@ -182,11 +200,11 @@ synchronized (HBASE_INSTANCES) { HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey); if (connection == null) { - connection = new HConnectionImplementation(conf, true); + connection = new HConnectionImplementation(conf, true, null); HBASE_INSTANCES.put(connectionKey, connection); } else if (connection.isClosed()) { HConnectionManager.deleteConnection(connectionKey, true); - connection = new HConnectionImplementation(conf, true); + connection = new HConnectionImplementation(conf, true, null); HBASE_INSTANCES.put(connectionKey, connection); } connection.incCount(); @@ -197,18 +215,33 @@ /** * Create a new HConnection instance using the passed conf * instance. - * Note: This bypasses the usual HConnection life cycle management! - * Use this with caution, the caller is responsible for closing the - * created connection. + * Note: This bypasses the usual HConnection life cycle management. + * The caller is responsible for calling {@link HConnection#close()} + * on the returned connection instance. + * + * This is the recommended way to create HConnections. + * {@code + * HConnection connection = HConnectionManager.createConnection(conf); + * HTableInterface table = connection.getTable("mytable"); + * table.get(...); + * ... + * table.close(); + * connection.close(); + * } * @param conf configuration * @return HConnection object for conf * @throws ZooKeeperConnectionException */ public static HConnection createConnection(Configuration conf) throws ZooKeeperConnectionException { - return new HConnectionImplementation(conf, false); + return new HConnectionImplementation(conf, false, null); } + public static HConnection createConnection(Configuration conf, ExecutorService pool) + throws IOException { + return new HConnectionImplementation(conf, false, pool); + } + /** * Delete connection information for the instance specified by configuration. * If there are no more references to it, this will then close connection to @@ -235,6 +268,7 @@ * configuration whose identity is used to find {@link HConnection} * instance. */ + @Deprecated public static void deleteConnection(Configuration conf) { deleteConnection(new HConnectionKey(conf), false); } @@ -264,6 +298,7 @@ * Delete information for all connections. * @throws IOException */ + @Deprecated public static void deleteAllConnections() { synchronized (HBASE_INSTANCES) { Set connectionKeys = new HashSet(); @@ -275,6 +310,7 @@ } } + @Deprecated private static void deleteConnection(HConnection connection, boolean staleConnection) { synchronized (HBASE_INSTANCES) { for (Entry connectionEntry : HBASE_INSTANCES @@ -287,6 +323,7 @@ } } + @Deprecated private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) { synchronized (HBASE_INSTANCES) { @@ -533,6 +570,11 @@ private final Object resetLock = new Object(); + // thread executor shared by all HTableInterface instances created + // by this connection + private volatile ExecutorService batchPool = null; + private volatile boolean cleanupPool = false; + private final Configuration conf; private RpcEngine rpcEngine; @@ -573,9 +615,10 @@ * @param conf Configuration object */ @SuppressWarnings("unchecked") - public HConnectionImplementation(Configuration conf, boolean managed) + public HConnectionImplementation(Configuration conf, boolean managed, ExecutorService pool) throws ZooKeeperConnectionException { this.conf = conf; + this.batchPool = pool; this.managed = managed; String serverClassName = conf.get(HConstants.REGION_SERVER_CLASS, HConstants.DEFAULT_REGION_SERVER_CLASS); @@ -605,6 +648,67 @@ this.resetting = false; } + @Override + public HTableInterface getTable(String tableName) throws IOException { + return getTable(Bytes.toBytes(tableName)); + } + + @Override + public HTableInterface getTable(byte[] tableName) throws IOException { + return getTable(tableName, getBatchPool()); + } + + @Override + public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException { + return getTable(Bytes.toBytes(tableName), pool); + } + + @Override + public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException { + if (managed) { + throw new IOException("The connection has to be unmanaged."); + } + return new HTable(tableName, this, pool); + } + + private ExecutorService getBatchPool() { + if (batchPool == null) { + // shared HTable thread executor not yet initialized + synchronized (this) { + if (batchPool == null) { + int maxThreads = conf.getInt("hbase.hconnection.threads.max", Integer.MAX_VALUE); + if (maxThreads == 0) { + maxThreads = Runtime.getRuntime().availableProcessors(); + } + long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); + this.batchPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), + maxThreads, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), + Threads.newDaemonThreadFactory("hbase-connection-shared-executor")); + ((ThreadPoolExecutor) this.batchPool).allowCoreThreadTimeOut(true); + } + this.cleanupPool = true; + } + } + return this.batchPool; + } + + protected ExecutorService getCurrentBatchPool() { + return batchPool; + } + + private void shutdownBatchPool() { + if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) { + this.batchPool.shutdown(); + try { + if (!this.batchPool.awaitTermination(10, TimeUnit.SECONDS)) { + this.batchPool.shutdownNow(); + } + } catch (InterruptedException e) { + this.batchPool.shutdownNow(); + } + } + } + private synchronized void ensureZookeeperTrackers() throws ZooKeeperConnectionException { // initialize zookeeper and master address manager @@ -1814,6 +1918,7 @@ if (this.closed) { return; } + shutdownBatchPool(); master = null; this.servers.clear(); Index: src/main/java/org/apache/hadoop/hbase/client/HConnection.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnection.java (revision 1511156) +++ src/main/java/org/apache/hadoop/hbase/client/HConnection.java (working copy) @@ -64,6 +64,60 @@ public Configuration getConfiguration(); /** + * Retrieve an HTableInterface implementation for access to a table. + * The returned HTableInterface is not thread safe, a new instance should + * be created for each using thread. + * This is a lightweight operation, pooling or caching of the returned HTableInterface + * is neither required nor desired. + * Note that the HConnection needs to be unmanaged + * (created with {@link HConnectionManager#createConnection(Configuration)}). + * @param tableName + * @return an HTable to use for interactions with this table + */ + public HTableInterface getTable(String tableName) throws IOException; + + /** + * Retrieve an HTableInterface implementation for access to a table. + * The returned HTableInterface is not thread safe, a new instance should + * be created for each using thread. + * This is a lightweight operation, pooling or caching of the returned HTableInterface + * is neither required nor desired. + * Note that the HConnection needs to be unmanaged + * (created with {@link HConnectionManager#createConnection(Configuration)}). + * @param tableName + * @return an HTable to use for interactions with this table + */ + public HTableInterface getTable(byte[] tableName) throws IOException; + + /** + * Retrieve an HTableInterface implementation for access to a table. + * The returned HTableInterface is not thread safe, a new instance should + * be created for each using thread. + * This is a lightweight operation, pooling or caching of the returned HTableInterface + * is neither required nor desired. + * Note that the HConnection needs to be unmanaged + * (created with {@link HConnectionManager#createConnection(Configuration)}). + * @param tableName + * @param pool The thread pool to use for batch operations, null to use a default pool. + * @return an HTable to use for interactions with this table + */ + public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException; + + /** + * Retrieve an HTableInterface implementation for access to a table. + * The returned HTableInterface is not thread safe, a new instance should + * be created for each using thread. + * This is a lightweight operation, pooling or caching of the returned HTableInterface + * is neither required nor desired. + * Note that the HConnection needs to be unmanaged + * (created with {@link HConnectionManager#createConnection(Configuration)}). + * @param tableName + * @param pool The thread pool to use for batch operations, null to use a default pool. + * @return an HTable to use for interactions with this table + */ + public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException; + + /** * Retrieve ZooKeeperWatcher used by this connection. * @return ZooKeeperWatcher handle being used by the connection. * @throws IOException if a remote or network exception occurs Index: src/main/java/org/apache/hadoop/hbase/client/HTablePool.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (revision 1511156) +++ src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (working copy) @@ -56,6 +56,7 @@ *

* Pool will manage its own connections to the cluster. See * {@link HConnectionManager}. + * @deprecated Use {@link HConnection#getTable(String)} instead. */ public class HTablePool implements Closeable { private final PoolMap tables; Index: src/docbkx/book.xml =================================================================== --- src/docbkx/book.xml (revision 1511156) +++ src/docbkx/book.xml (working copy) @@ -1516,11 +1516,14 @@ Another solution is to precreate an HConnection using - HConnectionManager.createConnection(Configuration) as - well as an ExecutorService; then use the - HTable(byte[], HConnection, ExecutorService) - constructor to create HTable instances on demand. - This construction is very lightweight and resources are controlled/shared if you go this route. + // Create a connection to the cluster. +HConnection connection = HConnectionManager.createConnection(Configuration); +HTableInterface table = connection.getTable("myTable"); +// use table as needed, the table returned is lightweight +table.close(); +// use the connection for other access to the cluster +connection.close(); + Constructing HTableInterface implementation is very lightweight and resources are controlled/shared if you go this route.