Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1202802) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -111,7 +111,7 @@ private static final Log LOG = LogFactory.getLog(HTable.class); private HConnection connection; private final byte [] tableName; - protected final int scannerTimeout; + protected int scannerTimeout; private volatile Configuration configuration; private final ArrayList writeBuffer = new ArrayList(); private long writeBufferSize; @@ -125,6 +125,7 @@ private boolean closed; private int operationTimeout; private static final int DOPUT_WB_CHECK = 10; // i.e., doPut checks the writebuffer every X Puts. + private final boolean cleanupOnClose; // close the connection in close() /** * Creates an object to access a HBase table. @@ -155,36 +156,18 @@ public HTable(Configuration conf, final byte [] tableName) throws IOException { this.tableName = tableName; + this.cleanupOnClose = true; if (conf == null) { this.scannerTimeout = 0; this.connection = null; return; } this.connection = HConnectionManager.getConnection(conf); - this.scannerTimeout = - (int) conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, - HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD); - this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT - : conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - this.configuration = conf; - this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW); - this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152); - this.clearBufferOnFail = true; - this.autoFlush = true; - this.currentWriteBufferSize = 0; - this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 1); - this.maxScannerResultSize = conf.getLong( - HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); - this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1); - int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); if (maxThreads == 0) { maxThreads = 1; // is there a better default? } - // Using the "direct handoff" approach, new threads will only be created // if it is necessary and will grow unbounded. This could be bad but in HCM // we only create as many Runnables as there are region servers. It means @@ -194,6 +177,58 @@ new SynchronousQueue(), new DaemonThreadFactory()); ((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true); + + this.finishSetup(); + } + + /** + * Creates an object to access a HBase table. + * Shares zookeeper connection and other resources with other HTable instances + * created with the same connection instance. + * Use this constructor when the ExecutorService and HConnection instance are + * externally managed. + * @param tableName Name of the table. + * @param connection HConnection to be used. + * @param pool ExecutorService to be used. + * @throws IOException if a remote or network exception occurs + */ + public HTable(final byte[] tableName, final HConnection connection, + final ExecutorService pool) throws IOException { + assert connection != null && pool != null; + + this.tableName = tableName; + this.cleanupOnClose = false; + this.connection = connection; + this.pool = pool; + + this.finishSetup(); + } + + /** + * setup this HTable's parameter based on the passed configuration + * @param conf + */ + private void finishSetup() throws IOException { + Configuration conf = connection.getConfiguration(); + this.configuration = conf; + this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW); + + this.scannerTimeout = (int) conf.getLong( + HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, + HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD); + this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT + : conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152); + this.clearBufferOnFail = true; + this.autoFlush = true; + this.currentWriteBufferSize = 0; + this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 1); + + this.maxScannerResultSize = conf.getLong( + HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1); this.closed = false; } @@ -886,9 +921,11 @@ return; } flushCommits(); - this.pool.shutdown(); - if (this.connection != null) { - this.connection.close(); + if (cleanupOnClose) { + this.pool.shutdown(); + if (this.connection != null) { + this.connection.close(); + } } this.closed = true; }