Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (revision 1510351) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (working copy) @@ -70,6 +70,46 @@ */ Configuration getConfiguration(); + /** + * Retrieve an HTableInterface implementation for access to a table. + * The returned HTableInterface is not thread safe, a new instance should + * be create for each using thread. + * This is a lightweight operation, pooling or caching of the returned HTableInterface + * is neither required nor desired. + * Note that the HConnection needs to be unmanaged + * (created with {@link HConnectionManager#createConnection(Configuration)}). + * @param tableName + * @return an HTable to use for interactions with this table + */ + public HTableInterface getTable(byte[] tableName) throws IOException; + + /** + * Retrieve an HTableInterface implementation for access to a table. + * The returned HTableInterface is not thread safe, a new instance should + * be create for each using thread. + * This is a lightweight operation, pooling or caching of the returned HTableInterface + * is neither required nor desired. + * Note that the HConnection needs to be unmanaged + * (created with {@link HConnectionManager#createConnection(Configuration)}). + * @param tableName + * @param pool The thread pool to use for batch operations, null to use a default pool. + * @return an HTable to use for interactions with this table + */ + public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException; + + /** + * Retrieve an HTableInterface implementation for access to a table. + * The returned HTableInterface is not thread safe, a new instance should + * be create for each using thread. + * This is a lightweight operation, pooling or caching of the returned HTableInterface + * is neither required nor desired. + * Note that the HConnection needs to be unmanaged + * (created with {@link HConnectionManager#createConnection(Configuration)}). + * @param tableName + * @return an HTable to use for interactions with this table + */ + public HTableInterface getTable(String tableName) throws IOException; + /** @return - true if the master server is running */ boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException; Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1510351) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -36,6 +36,9 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -137,6 +140,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.SoftValueSortedMap; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -148,8 +152,22 @@ import com.google.protobuf.ServiceException; /** - * A non-instantiable class that manages {@link HConnection}s. - * This class has a static Map of {@link HConnection} instances keyed by + * A non-instantiable class that manages creation of {@link HConnection}s. + *

The simplest way to use this class is by using {@link #createConnection(Configuration)}. + * This creates a new {@link HConnection} that is managed by the caller. + * From this {@link HConnection} {@link HTableInterface} implementations are retrieved + * with {@link HConnection#getTable(byte[])}. Example: + *

+ * {@code
+ * HConnection connection = HConnectionManager.createConnection(config);
+ * HTableInterface table = connection.getTable("table1");
+ * // use the table as needed, for a single operation and a single thread
+ * table.close();
+ * connection.close();
+ * }
+ * 
+ *

The following logic and API will be removed in the future: + *

This class has a static Map of {@link HConnection} instances keyed by * {@link Configuration}; all invocations of {@link #getConnection(Configuration)} * that pass the same {@link Configuration} instance will be returned the same * {@link HConnection} instance (Adding properties to a Configuration @@ -241,6 +259,7 @@ * @return HConnection object for conf * @throws ZooKeeperConnectionException */ + @Deprecated @SuppressWarnings("resource") public static HConnection getConnection(final Configuration conf) throws IOException { @@ -263,18 +282,61 @@ /** * Create a new HConnection instance using the passed conf instance. *

Note: This bypasses the usual HConnection life cycle management done by - * {@link #getConnection(Configuration)}. Use this with caution, the caller is responsible for + * {@link #getConnection(Configuration)}. The caller is responsible for * calling {@link HConnection#close()} on the returned connection instance. + * + * This is the recommended way to create HConnections. + * {@code + * HConnection connection = HConnectionManager.createConnection(conf); + * HTableInterface table = connection.getTable("mytable"); + * table.get(...); + * ... + * table.close(); + * connection.close(); + * } + * * @param conf configuration * @return HConnection object for conf * @throws ZooKeeperConnectionException */ public static HConnection createConnection(Configuration conf) throws IOException { - return createConnection(conf, false); + return createConnection(conf, false, null); } + /** + * Create a new HConnection instance using the passed conf instance. + *

Note: This bypasses the usual HConnection life cycle management done by + * {@link #getConnection(Configuration)}. The caller is responsible for + * calling {@link HConnection#close()} on the returned connection instance. + * This is the recommended way to create HConnections. + * {@code + * ExecutorService pool = ...; + * HConnection connection = HConnectionManager.createConnection(conf, pool); + * HTableInterface table = connection.getTable("mytable"); + * table.get(...); + * ... + * table.close(); + * connection.close(); + * } + * @param conf configuration + * @param pool the thread pool to use for batch operation in HTables used via this HConnection + * @return HConnection object for conf + * @throws ZooKeeperConnectionException + */ + public static HConnection createConnection(Configuration conf, ExecutorService pool) + throws IOException { + return createConnection(conf, false, pool); + } + + @Deprecated static HConnection createConnection(final Configuration conf, final boolean managed) + throws IOException { + return createConnection(conf, managed, null); + } + + @Deprecated + static HConnection createConnection(final Configuration conf, final boolean managed, final ExecutorService pool) throws IOException { String className = conf.get("hbase.client.connection.impl", HConnectionManager.HConnectionImplementation.class.getName()); @@ -287,9 +349,9 @@ try { // Default HCM#HCI is not accessible; make it so before invoking. Constructor constructor = - clazz.getDeclaredConstructor(Configuration.class, boolean.class); + clazz.getDeclaredConstructor(Configuration.class, boolean.class, ExecutorService.class); constructor.setAccessible(true); - return (HConnection) constructor.newInstance(conf, managed); + return (HConnection) constructor.newInstance(conf, managed, pool); } catch (Exception e) { throw new IOException(e); } @@ -301,6 +363,7 @@ * then close connection to the zookeeper ensemble and let go of all associated resources. * * @param conf configuration whose identity is used to find {@link HConnection} instance. + * @deprecated */ public static void deleteConnection(Configuration conf) { deleteConnection(new HConnectionKey(conf), false); @@ -311,6 +374,7 @@ * This will then close connection to the zookeeper ensemble and let go of all resources. * * @param connection + * @deprecated */ public static void deleteStaleConnection(HConnection connection) { deleteConnection(connection, true); @@ -320,6 +384,7 @@ * Delete information for all connections. Close or not the connection, depending on the * staleConnection boolean and the ref count. By default, you should use it with * staleConnection to true. + * @deprecated */ public static void deleteAllConnections(boolean staleConnection) { synchronized (CONNECTION_INSTANCES) { @@ -342,6 +407,7 @@ } + @Deprecated private static void deleteConnection(HConnection connection, boolean staleConnection) { synchronized (CONNECTION_INSTANCES) { for (Entry e: CONNECTION_INSTANCES.entrySet()) { @@ -353,6 +419,7 @@ } } + @Deprecated private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) { synchronized (CONNECTION_INSTANCES) { HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey); @@ -464,6 +531,10 @@ private final DelayedClosing delayedClosing = DelayedClosing.createAndStart(this); + // thread executor shared by all HTableInterface instances created + // by this connection + private volatile ExecutorService batchPool = null; + private volatile boolean cleanupPool = false; private final Configuration conf; @@ -499,6 +570,10 @@ */ Registry registry; + HConnectionImplementation(Configuration conf, boolean managed) throws IOException { + this(conf, managed, null); + } + /** * constructor * @param conf Configuration object @@ -510,8 +585,9 @@ * are shared, we have reference counting going on and will only do full cleanup when no more * users of an HConnectionImplementation instance. */ - HConnectionImplementation(Configuration conf, boolean managed) throws IOException { + HConnectionImplementation(Configuration conf, boolean managed, ExecutorService pool) throws IOException { this(conf); + this.batchPool = pool; this.managed = managed; this.registry = setupRegistry(); retrieveClusterId(); @@ -556,6 +632,65 @@ HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT); } + @Override + public HTableInterface getTable(byte[] tableName) throws IOException { + return getTable(tableName, getBatchPool()); + } + + @Override + public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException { + if (managed) { + throw new IOException("The connection has to be unmanaged."); + } + return new HTable(tableName, this, pool); + } + + @Override + public HTableInterface getTable(String tableName) throws IOException { + return getTable(Bytes.toBytes(tableName)); + } + + private ExecutorService getBatchPool() { + if (batchPool == null) { + // shared HTable thread executor not yet initialized + synchronized (this) { + if (batchPool == null) { + int maxThreads = conf.getInt("hbase.hconnection.threads.max", + Integer.MAX_VALUE); + if (maxThreads == 0) { + maxThreads = Runtime.getRuntime().availableProcessors(); + } + long keepAliveTime = conf.getLong( + "hbase.hconnection.threads.keepalivetime", 60); + this.batchPool = new ThreadPoolExecutor( + Runtime.getRuntime().availableProcessors(), + maxThreads, + keepAliveTime, + TimeUnit.SECONDS, + new SynchronousQueue(), + Threads.newDaemonThreadFactory("hbase-connection-shared-executor")); + ((ThreadPoolExecutor) this.batchPool) + .allowCoreThreadTimeOut(true); + } + this.cleanupPool = true; + } + } + return this.batchPool; + } + + private void shutdownBatchPool() { + if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) { + this.batchPool.shutdown(); + try { + if (!this.batchPool.awaitTermination(10, TimeUnit.SECONDS)) { + this.batchPool.shutdownNow(); + } + } catch (InterruptedException e) { + this.batchPool.shutdownNow(); + } + } + } + /** * @return The cluster registry implementation to use. * @throws IOException @@ -2267,6 +2402,7 @@ } delayedClosing.stop("Closing connection"); closeMaster(); + shutdownBatchPool(); this.closed = true; closeZooKeeperWatcher(); this.stubs.clear(); Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionWrapper.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionWrapper.java (revision 1510351) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionWrapper.java (working copy) @@ -60,6 +60,21 @@ } @Override + public HTableInterface getTable(byte[] tableName) throws IOException { + return hconnection.getTable(tableName); + } + + @Override + public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException { + return hconnection.getTable(tableName, pool); + } + + @Override + public HTableInterface getTable(String tableName) throws IOException { + return hconnection.getTable(tableName); + } + + @Override public void abort(String why, Throwable e) { hconnection.abort(why, e); } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (revision 1510351) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (working copy) @@ -59,6 +59,7 @@ *

* Pool will manage its own connections to the cluster. See * {@link HConnectionManager}. + * @deprecated Use {@see HConnection#getTable(String)} instead. */ @InterfaceAudience.Public @InterfaceStability.Stable Index: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1510351) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -4097,15 +4097,10 @@ * @return the created HTable object * @throws IOException */ - HTable createUnmangedHConnectionHTable(final byte [] tableName) throws IOException { + HTableInterface createUnmangedHConnectionHTable(final byte [] tableName) throws IOException { TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY); HConnection conn = HConnectionManager.createConnection(TEST_UTIL.getConfiguration()); - ExecutorService pool = new ThreadPoolExecutor(1, Integer.MAX_VALUE, - 60, TimeUnit.SECONDS, - new SynchronousQueue(), - Threads.newDaemonThreadFactory("test-from-client")); - ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true); - return new HTable(tableName, conn, pool); + return conn.getTable(tableName); } /** @@ -4117,10 +4112,15 @@ @Test public void testUnmanagedHConnection() throws IOException { final byte[] tableName = Bytes.toBytes("testUnmanagedHConnection"); - HTable t = createUnmangedHConnectionHTable(tableName); - HBaseAdmin ha = new HBaseAdmin(t.getConnection()); + HConnection conn = HConnectionManager.createConnection(TEST_UTIL.getConfiguration()); + HTableInterface t = conn.getTable(tableName); + + HBaseAdmin ha = new HBaseAdmin(conn); assertTrue(ha.tableExists(tableName)); assertTrue(t.get(new Get(ROW)).isEmpty()); + ha.close(); + assertFalse(conn.isClosed()); + conn.close(); } /** @@ -4132,8 +4132,9 @@ @Test public void testUnmanagedHConnectionReconnect() throws Exception { final byte[] tableName = Bytes.toBytes("testUnmanagedHConnectionReconnect"); - HTable t = createUnmangedHConnectionHTable(tableName); - HConnection conn = t.getConnection(); + HConnection conn = HConnectionManager.createConnection(TEST_UTIL.getConfiguration()); + HTableInterface t = conn.getTable(tableName); + HBaseAdmin ha = new HBaseAdmin(conn); assertTrue(ha.tableExists(tableName)); assertTrue(t.get(new Get(ROW)).isEmpty()); @@ -4152,6 +4153,8 @@ HBaseAdmin newAdmin = new HBaseAdmin(conn); assertTrue(newAdmin.tableExists(tableName)); assert(newAdmin.getClusterStatus().getServersSize() == SLAVES); + + conn.close(); } @Test Index: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (revision 1510351) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (working copy) @@ -459,20 +459,16 @@ public void testConnectionManagement() throws Exception{ TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM); HConnection conn = HConnectionManager.createConnection(TEST_UTIL.getConfiguration()); - ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10, - 60, TimeUnit.SECONDS, - new SynchronousQueue(), - Threads.newDaemonThreadFactory("test-hcm")); - - HTable table = new HTable(TABLE_NAME1, conn, pool); + HTableInterface table = conn.getTable(TABLE_NAME1); + //new HTable(TABLE_NAME1, conn, pool); table.close(); assertFalse(conn.isClosed()); - assertFalse(pool.isShutdown()); - table = new HTable(TEST_UTIL.getConfiguration(), TABLE_NAME1, pool); + assertFalse(((HTable)table).getPool().isShutdown()); + table = conn.getTable(TABLE_NAME1); table.close(); - assertFalse(pool.isShutdown()); + assertFalse(((HTable)table).getPool().isShutdown()); conn.close(); - pool.shutdownNow(); + assertTrue(((HTable)table).getPool().isShutdown()); } /** Index: src/main/docbkx/book.xml =================================================================== --- src/main/docbkx/book.xml (revision 1510351) +++ src/main/docbkx/book.xml (working copy) @@ -1101,11 +1101,14 @@ Another solution is to precreate an HConnection using - HConnectionManager.createConnection(Configuration) as - well as an ExecutorService; then use the - HTable(byte[], HConnection, ExecutorService) - constructor to create HTable instances on demand. - This construction is very lightweight and resources are controlled/shared if you go this route. + // Create a connection to the cluster. +HConnection connection = HConnectionManager.createConnection(Configuration); +HTableInterface table = connection.getTable("myTable"); +// use table as needed, the table returned is lightweight +table.close(); +// use the connection for other access to the cluster +connection.close(); + Constructing HTableInterface implementation is very lightweight and resources are controlled/shared if you go this route.