Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (revision 1509530) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (working copy) @@ -70,6 +70,25 @@ */ Configuration getConfiguration(); + /** + * @param tableName + * @return an HTable to use for interactions with this table + */ + public HTableInterface getTable(byte[] tableName) throws IOException; + + /** + * @param tableName + * @param pool The thread pool to use for batch operations. + * @return an HTable to use for interactions with this table + */ + public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException; + + /** + * @param tableName + * @return an HTable to use for interactions with this table + */ + public HTableInterface getTable(String tableName) throws IOException; + /** @return - true if the master server is running */ boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException; Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1509530) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -36,6 +36,9 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -137,6 +140,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.SoftValueSortedMap; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -263,18 +267,49 @@ /** * Create a new HConnection instance using the passed conf instance. *

Note: This bypasses the usual HConnection life cycle management done by - * {@link #getConnection(Configuration)}. Use this with caution, the caller is responsible for + * {@link #getConnection(Configuration)}. The caller is responsible for * calling {@link HConnection#close()} on the returned connection instance. + * + * This is the recommended way to create HConnections. + * {@code + * HConnection connection = HConnectionManager.creareConnection(conf); + * HTableInterface table = connection.getTable("mytable"); + * table.get(...); + * ... + * table.close(); + * connection.close(); + * } + * * @param conf configuration * @return HConnection object for conf * @throws ZooKeeperConnectionException */ public static HConnection createConnection(Configuration conf) throws IOException { - return createConnection(conf, false); + return createConnection(conf, false, null); } + /** + * Create a new HConnection instance using the passed conf instance. + *

Note: This bypasses the usual HConnection life cycle management done by + * {@link #getConnection(Configuration)}. The caller is responsible for + * calling {@link HConnection#close()} on the returned connection instance. + * @param conf configuration + * @param pool the thread pool to use for batch operation in HTables used via this HConnection + * @return HConnection object for conf + * @throws ZooKeeperConnectionException + */ + public static HConnection createConnection(Configuration conf, ExecutorService pool) + throws IOException { + return createConnection(conf, false, pool); + } + static HConnection createConnection(final Configuration conf, final boolean managed) + throws IOException { + return createConnection(conf, managed, null); + } + + static HConnection createConnection(final Configuration conf, final boolean managed, final ExecutorService pool) throws IOException { String className = conf.get("hbase.client.connection.impl", HConnectionManager.HConnectionImplementation.class.getName()); @@ -289,7 +324,7 @@ Constructor constructor = clazz.getDeclaredConstructor(Configuration.class, boolean.class); constructor.setAccessible(true); - return (HConnection) constructor.newInstance(conf, managed); + return (HConnection) constructor.newInstance(conf, managed, pool); } catch (Exception e) { throw new IOException(e); } @@ -452,6 +487,10 @@ private final DelayedClosing delayedClosing = DelayedClosing.createAndStart(this); + // thread executor shared by all HTableInterface instances created + // by this connection + private volatile ExecutorService batchPool = null; + private volatile boolean cleanupPool = false; private final Configuration conf; @@ -487,6 +526,10 @@ */ Registry registry; + HConnectionImplementation(Configuration conf, boolean managed) throws IOException { + this(conf, managed, null); + } + /** * constructor * @param conf Configuration object @@ -498,8 +541,9 @@ * are shared, we have reference counting going on and will only do full cleanup when no more * users of an HConnectionImplementation instance. */ - HConnectionImplementation(Configuration conf, boolean managed) throws IOException { + HConnectionImplementation(Configuration conf, boolean managed, ExecutorService pool) throws IOException { this(conf); + this.batchPool = pool; this.managed = managed; this.registry = setupRegistry(); retrieveClusterId(); @@ -545,6 +589,78 @@ } /** + * @param tableName + * @return an HTable to use for interactions with this table + */ + @Override + public HTableInterface getTable(byte[] tableName) throws IOException { + return getTable(tableName, getBatchPool()); + } + + /** + * @param tableName + * @param pool The thread pool to use for batch operations. + * @return an HTable to use for interactions with this table + */ + @Override + public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException { + if (managed) { + throw new IOException("The connection has to be unmanaged."); + } + return new HTable(tableName, this, pool); + } + + /** + * @param tableName + * @return an HTable to use for interactions with this table + */ + @Override + public HTableInterface getTable(String tableName) throws IOException { + return getTable(Bytes.toBytes(tableName)); + } + + private ExecutorService getBatchPool() { + if (batchPool == null) { + // shared HTable thread executor not yet initialized + synchronized (this) { + if (batchPool == null) { + int maxThreads = conf.getInt("hbase.hconnection.threads.max", + Integer.MAX_VALUE); + if (maxThreads == 0) { + maxThreads = Runtime.getRuntime().availableProcessors(); + } + long keepAliveTime = conf.getLong( + "hbase.hconnection.threads.keepalivetime", 60); + this.batchPool = new ThreadPoolExecutor( + Runtime.getRuntime().availableProcessors(), + maxThreads, + keepAliveTime, + TimeUnit.SECONDS, + new SynchronousQueue(), + Threads.newDaemonThreadFactory("hbase-connection-shared-executor")); + ((ThreadPoolExecutor) this.batchPool) + .allowCoreThreadTimeOut(true); + } + this.cleanupPool = true; + } + } + return this.batchPool; + } + + private void shutdownBatchPool() { + if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) { + this.batchPool.shutdown(); + try { + if (!this.batchPool.awaitTermination(10, TimeUnit.SECONDS)) { + this.batchPool.shutdownNow(); + } + } catch (InterruptedException e) { + this.batchPool.shutdownNow(); + } + } + } + + /** * @return The cluster registry implementation to use. * @throws IOException */ @@ -2255,6 +2371,7 @@ } delayedClosing.stop("Closing connection"); closeMaster(); + shutdownBatchPool(); this.closed = true; closeZooKeeperWatcher(); this.stubs.clear(); Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionWrapper.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionWrapper.java (revision 1509530) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionWrapper.java (working copy) @@ -60,6 +60,21 @@ } @Override + public HTableInterface getTable(byte[] tableName) throws IOException { + return hconnection.getTable(tableName); + } + + @Override + public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException { + return hconnection.getTable(tableName, pool); + } + + @Override + public HTableInterface getTable(String tableName) throws IOException { + return hconnection.getTable(tableName); + } + + @Override public void abort(String why, Throwable e) { hconnection.abort(why, e); } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (revision 1509530) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (working copy) @@ -59,6 +59,7 @@ *

* Pool will manage its own connections to the cluster. See * {@link HConnectionManager}. + * @deprecated Use {@see HConnection#getTable(String)} instead. */ @InterfaceAudience.Public @InterfaceStability.Stable