Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1202802)
+++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy)
@@ -125,6 +125,8 @@
private boolean closed;
private int operationTimeout;
private static final int DOPUT_WB_CHECK = 10; // i.e., doPut checks the writebuffer every X Puts.
+ private final boolean closeConnection; // close the connection in close()
+ private final boolean closePool; // shutdown the ExecutorService in close()
/**
* Creates an object to access a HBase table.
@@ -136,11 +138,23 @@
* @param tableName Name of the table.
* @throws IOException if a remote or network exception occurs
*/
- public HTable(Configuration conf, final String tableName)
- throws IOException {
+ public HTable(Configuration conf, final String tableName) throws IOException {
this(conf, Bytes.toBytes(tableName));
}
+ /**
+ * Creates an object to access a HBase table.
+ * Shares zookeeper connection and other resources with other HTable instances
+ * created with the same conf instance. Uses already-populated
+ * region cache if one is available, populated by any other HTable instances
+ * sharing this conf instance. Recommended.
+ * @param conf Configuration object to use.
+ * @param tableName Name of the table.
+ * @throws IOException if a remote or network exception occurs
+ */
+ public HTable(Configuration conf, final byte[] tableName) throws IOException {
+ this(conf, tableName, null, null);
+ }
/**
* Creates an object to access a HBase table.
@@ -150,17 +164,28 @@
* sharing this conf instance. Recommended.
* @param conf Configuration object to use.
* @param tableName Name of the table.
+ * @param pool Optional (can be null) ExecutorService to be used.
+ * @param connection Optional (can be null) HConnection to be used.
* @throws IOException if a remote or network exception occurs
*/
- public HTable(Configuration conf, final byte [] tableName)
- throws IOException {
+ public HTable(Configuration conf, final byte[] tableName,
+ ExecutorService pool, HConnection connection) throws IOException {
this.tableName = tableName;
if (conf == null) {
this.scannerTimeout = 0;
this.connection = null;
+ this.closeConnection = true;
+ this.closePool = true;
return;
}
- this.connection = HConnectionManager.getConnection(conf);
+ if (connection == null) {
+ this.connection = HConnectionManager.getConnection(conf);
+ this.closeConnection = true;
+ } else {
+ this.connection = connection;
+ this.closeConnection = false;
+ }
+
this.scannerTimeout =
(int) conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
@@ -180,20 +205,25 @@
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
- int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
- if (maxThreads == 0) {
- maxThreads = 1; // is there a better default?
- }
-
// Using the "direct handoff" approach, new threads will only be created
// if it is necessary and will grow unbounded. This could be bad but in HCM
// we only create as many Runnables as there are region servers. It means
// it also scales when new region servers are added.
- this.pool = new ThreadPoolExecutor(1, maxThreads,
- 60, TimeUnit.SECONDS,
- new SynchronousQueue(),
- new DaemonThreadFactory());
- ((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true);
+ if (pool == null) {
+ int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
+ if (maxThreads == 0) {
+ maxThreads = 1; // is there a better default?
+ }
+ this.pool = new ThreadPoolExecutor(1, maxThreads,
+ 60, TimeUnit.SECONDS,
+ new SynchronousQueue(),
+ new DaemonThreadFactory());
+ ((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true);
+ this.closePool = true;
+ } else {
+ this.pool = pool;
+ this.closePool = false;
+ }
this.closed = false;
}
@@ -886,8 +916,10 @@
return;
}
flushCommits();
- this.pool.shutdown();
- if (this.connection != null) {
+ if (closePool) {
+ this.pool.shutdown();
+ }
+ if (this.connection != null && closeConnection) {
this.connection.close();
}
this.closed = true;