Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
===================================================================
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (revision 1509530)
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (working copy)
@@ -70,6 +70,25 @@
*/
Configuration getConfiguration();
+ /**
+ * @param tableName
+ * @return an HTable to use for interactions with this table
+ */
+ public HTableInterface getTable(byte[] tableName) throws IOException;
+
+ /**
+ * @param tableName
+ * @param pool The thread pool to use for batch operations.
+ * @return an HTable to use for interactions with this table
+ */
+ public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException;
+
+ /**
+ * @param tableName
+ * @return an HTable to use for interactions with this table
+ */
+ public HTableInterface getTable(String tableName) throws IOException;
+
/** @return - true if the master server is running */
boolean isMasterRunning()
throws MasterNotRunningException, ZooKeeperConnectionException;
Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
===================================================================
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1509530)
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy)
@@ -36,6 +36,9 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -137,6 +140,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -263,18 +267,49 @@
/**
* Create a new HConnection instance using the passed conf instance.
*
Note: This bypasses the usual HConnection life cycle management done by
- * {@link #getConnection(Configuration)}. Use this with caution, the caller is responsible for
+ * {@link #getConnection(Configuration)}. The caller is responsible for
* calling {@link HConnection#close()} on the returned connection instance.
+ *
+ * This is the recommended way to create HConnections.
+ * {@code
+ * HConnection connection = HConnectionManager.creareConnection(conf);
+ * HTableInterface table = connection.getTable("mytable");
+ * table.get(...);
+ * ...
+ * table.close();
+ * connection.close();
+ * }
+ *
* @param conf configuration
* @return HConnection object for conf
* @throws ZooKeeperConnectionException
*/
public static HConnection createConnection(Configuration conf)
throws IOException {
- return createConnection(conf, false);
+ return createConnection(conf, false, null);
}
+ /**
+ * Create a new HConnection instance using the passed conf instance.
+ *
Note: This bypasses the usual HConnection life cycle management done by
+ * {@link #getConnection(Configuration)}. The caller is responsible for
+ * calling {@link HConnection#close()} on the returned connection instance.
+ * @param conf configuration
+ * @param pool the thread pool to use for batch operation in HTables used via this HConnection
+ * @return HConnection object for
* Pool will manage its own connections to the cluster. See
* {@link HConnectionManager}.
+ * @deprecated Use {@see HConnection#getTable(String)} instead.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
conf
+ * @throws ZooKeeperConnectionException
+ */
+ public static HConnection createConnection(Configuration conf, ExecutorService pool)
+ throws IOException {
+ return createConnection(conf, false, pool);
+ }
+
static HConnection createConnection(final Configuration conf, final boolean managed)
+ throws IOException {
+ return createConnection(conf, managed, null);
+ }
+
+ static HConnection createConnection(final Configuration conf, final boolean managed, final ExecutorService pool)
throws IOException {
String className = conf.get("hbase.client.connection.impl",
HConnectionManager.HConnectionImplementation.class.getName());
@@ -289,7 +324,7 @@
Constructor> constructor =
clazz.getDeclaredConstructor(Configuration.class, boolean.class);
constructor.setAccessible(true);
- return (HConnection) constructor.newInstance(conf, managed);
+ return (HConnection) constructor.newInstance(conf, managed, pool);
} catch (Exception e) {
throw new IOException(e);
}
@@ -452,6 +487,10 @@
private final DelayedClosing delayedClosing =
DelayedClosing.createAndStart(this);
+ // thread executor shared by all HTableInterface instances created
+ // by this connection
+ private volatile ExecutorService batchPool = null;
+ private volatile boolean cleanupPool = false;
private final Configuration conf;
@@ -487,6 +526,10 @@
*/
Registry registry;
+ HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
+ this(conf, managed, null);
+ }
+
/**
* constructor
* @param conf Configuration object
@@ -498,8 +541,9 @@
* are shared, we have reference counting going on and will only do full cleanup when no more
* users of an HConnectionImplementation instance.
*/
- HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
+ HConnectionImplementation(Configuration conf, boolean managed, ExecutorService pool) throws IOException {
this(conf);
+ this.batchPool = pool;
this.managed = managed;
this.registry = setupRegistry();
retrieveClusterId();
@@ -545,6 +589,78 @@
}
/**
+ * @param tableName
+ * @return an HTable to use for interactions with this table
+ */
+ @Override
+ public HTableInterface getTable(byte[] tableName) throws IOException {
+ return getTable(tableName, getBatchPool());
+ }
+
+ /**
+ * @param tableName
+ * @param pool The thread pool to use for batch operations.
+ * @return an HTable to use for interactions with this table
+ */
+ @Override
+ public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
+ if (managed) {
+ throw new IOException("The connection has to be unmanaged.");
+ }
+ return new HTable(tableName, this, pool);
+ }
+
+ /**
+ * @param tableName
+ * @return an HTable to use for interactions with this table
+ */
+ @Override
+ public HTableInterface getTable(String tableName) throws IOException {
+ return getTable(Bytes.toBytes(tableName));
+ }
+
+ private ExecutorService getBatchPool() {
+ if (batchPool == null) {
+ // shared HTable thread executor not yet initialized
+ synchronized (this) {
+ if (batchPool == null) {
+ int maxThreads = conf.getInt("hbase.hconnection.threads.max",
+ Integer.MAX_VALUE);
+ if (maxThreads == 0) {
+ maxThreads = Runtime.getRuntime().availableProcessors();
+ }
+ long keepAliveTime = conf.getLong(
+ "hbase.hconnection.threads.keepalivetime", 60);
+ this.batchPool = new ThreadPoolExecutor(
+ Runtime.getRuntime().availableProcessors(),
+ maxThreads,
+ keepAliveTime,
+ TimeUnit.SECONDS,
+ new SynchronousQueue