Index: src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (revision 1202802)
+++ src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (working copy)
@@ -22,6 +22,8 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
import java.lang.reflect.Field;
import java.util.ArrayList;
@@ -203,4 +205,51 @@
Thread.sleep(50);
}
}
+
+ @Test
+ public void testClosing() throws Exception {
+ Configuration configuration = TEST_UTIL.getConfiguration();
+ configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
+ String.valueOf(_randy.nextInt()));
+
+ HConnection c1 = HConnectionManager.createConnection(configuration);
+ HConnection c2 = HConnectionManager.createConnection(configuration);
+
+ HConnection c3 = HConnectionManager.getConnection(configuration);
+ HConnection c4 = HConnectionManager.getConnection(configuration);
+ assertTrue(c3 == c4);
+
+ c1.close();
+ assertTrue(c1.isClosed());
+ assertFalse(c2.isClosed());
+ assertFalse(c3.isClosed());
+
+ c3.close();
+ // still a reference left
+ assertFalse(c3.isClosed());
+ c3.close();
+ assertTrue(c3.isClosed());
+ // c3 was removed from the cache
+ assertTrue(HConnectionManager.getConnection(configuration) != c3);
+
+ assertFalse(c2.isClosed());
+ }
+
+ /**
+ * Trivial test to verify that nobody messes with
+ * {@link HConnectionManager#createConnection(Configuration)}
+ */
+ @Test
+ public void testCreateConnection() throws Exception {
+ Configuration configuration = TEST_UTIL.getConfiguration();
+ HConnection c1 = HConnectionManager.createConnection(configuration);
+ HConnection c2 = HConnectionManager.createConnection(configuration);
+ // created from the same configuration, yet they are different
+ assertTrue(c1 != c2);
+ assertTrue(c1.getConfiguration() == c2.getConfiguration());
+ // make sure these were not cached
+ HConnection c3 = HConnectionManager.getConnection(configuration);
+ assertTrue(c1 != c3);
+ assertTrue(c2 != c3);
+ }
}
Index: src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java (revision 1202802)
+++ src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java (working copy)
@@ -79,7 +79,7 @@
HConnectionImplementation connection =
HConnectionManager.HBASE_INSTANCES.get(connectionKey);
if (connection == null) {
- connection = Mockito.spy(new HConnectionImplementation(conf));
+ connection = Mockito.spy(new HConnectionImplementation(conf, true));
HConnectionManager.HBASE_INSTANCES.put(connectionKey, connection);
}
return connection;
Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1202802)
+++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy)
@@ -177,7 +177,7 @@
synchronized (HBASE_INSTANCES) {
HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey);
if (connection == null) {
- connection = new HConnectionImplementation(conf);
+ connection = new HConnectionImplementation(conf, true);
HBASE_INSTANCES.put(connectionKey, connection);
}
connection.incCount();
@@ -186,6 +186,21 @@
}
/**
+ * Create a new HConnection instance using the passed conf
+ * instance.
+ * Note: This bypasses the usual HConnection life cycle management!
+ * Use this with caution, your application must manage the
+ * connection's life cycle explicitly.
+ * @param conf configuration
+ * @return HConnection object for conf
+ * @throws ZooKeeperConnectionException
+ */
+ public static HConnection createConnection(Configuration conf)
+ throws ZooKeeperConnectionException {
+ return new HConnectionImplementation(conf, false);
+ }
+
+ /**
* Delete connection information for the instance specified by configuration.
* If there are no more references to it, this will then close connection to
* the zookeeper ensemble and let go of all resources.
@@ -483,15 +498,17 @@
private boolean stopProxy;
private int refCount;
-
+ // indicates whether this connection's life cycle is managed
+ private final boolean isManaged;
/**
* constructor
* @param conf Configuration object
*/
@SuppressWarnings("unchecked")
- public HConnectionImplementation(Configuration conf)
+ public HConnectionImplementation(Configuration conf, boolean isManaged)
throws ZooKeeperConnectionException {
this.conf = conf;
+ this.isManaged = isManaged;
String serverClassName = conf.get(HConstants.REGION_SERVER_CLASS,
HConstants.DEFAULT_REGION_SERVER_CLASS);
this.closed = false;
@@ -1639,6 +1656,11 @@
this.aborted = true;
this.closed = true;
}
+
+ @Override
+ public boolean isClosed() {
+ return this.closed;
+ }
@Override
public boolean isAborted(){
@@ -1712,7 +1734,11 @@
}
public void close() {
- HConnectionManager.deleteConnection((HConnection)this, stopProxy, false);
+ if (isManaged) {
+ HConnectionManager.deleteConnection((HConnection)this, stopProxy, false);
+ } else {
+ close(true);
+ }
LOG.debug("The connection to " + this.zooKeeper + " has been closed.");
}
Index: src/main/java/org/apache/hadoop/hbase/client/HConnection.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/HConnection.java (revision 1202802)
+++ src/main/java/org/apache/hadoop/hbase/client/HConnection.java (working copy)
@@ -369,4 +369,8 @@
public HTableDescriptor[] getHTableDescriptors(List tableNames)
throws IOException;
+ /**
+ * @return true if this connection is closed
+ */
+ public boolean isClosed();
}
Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1202802)
+++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy)
@@ -111,7 +111,7 @@
private static final Log LOG = LogFactory.getLog(HTable.class);
private HConnection connection;
private final byte [] tableName;
- protected final int scannerTimeout;
+ protected int scannerTimeout;
private volatile Configuration configuration;
private final ArrayList writeBuffer = new ArrayList();
private long writeBufferSize;
@@ -125,6 +125,7 @@
private boolean closed;
private int operationTimeout;
private static final int DOPUT_WB_CHECK = 10; // i.e., doPut checks the writebuffer every X Puts.
+ private final boolean cleanupOnClose; // close the connection in close()
/**
* Creates an object to access a HBase table.
@@ -155,36 +156,19 @@
public HTable(Configuration conf, final byte [] tableName)
throws IOException {
this.tableName = tableName;
+ this.cleanupOnClose = true;
if (conf == null) {
this.scannerTimeout = 0;
this.connection = null;
return;
}
this.connection = HConnectionManager.getConnection(conf);
- this.scannerTimeout =
- (int) conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
- HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
- this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
- : conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
- HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.configuration = conf;
- this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
- this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152);
- this.clearBufferOnFail = true;
- this.autoFlush = true;
- this.currentWriteBufferSize = 0;
- this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 1);
- this.maxScannerResultSize = conf.getLong(
- HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
- HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
- this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
-
int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
if (maxThreads == 0) {
maxThreads = 1; // is there a better default?
}
-
// Using the "direct handoff" approach, new threads will only be created
// if it is necessary and will grow unbounded. This could be bad but in HCM
// we only create as many Runnables as there are region servers. It means
@@ -194,6 +178,59 @@
new SynchronousQueue(),
new DaemonThreadFactory());
((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true);
+
+ this.finishSetup();
+ }
+
+ /**
+ * Creates an object to access a HBase table.
+ * Shares zookeeper connection and other resources with other HTable instances
+ * created with the same connection instance.
+ * Use this constructor when the ExecutorService and HConnection instance are
+ * externally managed.
+ * @param tableName Name of the table.
+ * @param connection HConnection to be used.
+ * @param pool ExecutorService to be used.
+ * @throws IOException if a remote or network exception occurs
+ */
+ public HTable(final byte[] tableName, final HConnection connection,
+ final ExecutorService pool) throws IOException {
+ assert connection != null && pool != null;
+
+ this.tableName = tableName;
+ this.cleanupOnClose = false;
+ this.connection = connection;
+ this.configuration = connection.getConfiguration();
+ this.pool = pool;
+
+ this.finishSetup();
+ }
+
+ /**
+ * setup this HTable's parameter based on the passed configuration
+ * @param conf
+ */
+ private void finishSetup() throws IOException {
+ this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
+ this.scannerTimeout = (int) this.configuration.getLong(
+ HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
+ HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
+ this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
+ : this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+ HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
+ this.writeBufferSize = this.configuration.getLong(
+ "hbase.client.write.buffer", 2097152);
+ this.clearBufferOnFail = true;
+ this.autoFlush = true;
+ this.currentWriteBufferSize = 0;
+ this.scannerCaching = this.configuration.getInt(
+ "hbase.client.scanner.caching", 1);
+
+ this.maxScannerResultSize = this.configuration.getLong(
+ HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
+ HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
+ this.maxKeyValueSize = this.configuration.getInt(
+ "hbase.client.keyvalue.maxsize", -1);
this.closed = false;
}
@@ -886,9 +923,11 @@
return;
}
flushCommits();
- this.pool.shutdown();
- if (this.connection != null) {
- this.connection.close();
+ if (cleanupOnClose) {
+ this.pool.shutdown();
+ if (this.connection != null) {
+ this.connection.close();
+ }
}
this.closed = true;
}