commit f8809109dc069dd5c11153f56c1bb3f226d18194 Author: Karthick Sankarachary Date: Fri Apr 22 14:06:44 2011 -0700 HBASE-3777 Redefine Identity Of HBase Configuration diff --git a/src/main/java/org/apache/hadoop/hbase/HConstants.java b/src/main/java/org/apache/hadoop/hbase/HConstants.java index 5701639..c3ef5ef 100644 --- a/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -93,14 +93,29 @@ public final class HConstants { /** Name of ZooKeeper config file in conf/ directory. */ public static final String ZOOKEEPER_CONFIG_NAME = "zoo.cfg"; - /** default client port that the zookeeper listens on */ + /** Parameter name for the client port that the zookeeper listens on */ + public static final String ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort"; + + /** Default client port that the zookeeper listens on */ public static final int DEFAULT_ZOOKEPER_CLIENT_PORT = 2181; + /** Parameter name for the wait time for the recoverable zookeeper */ + public static final String ZOOKEEPER_RECOVERABLE_WAITTIME = "hbase.zookeeper.recoverable.waittime"; + + /** Default wait time for the recoverable zookeeper */ + public static final long DEFAULT_ZOOKEPER_RECOVERABLE_WAITIME = 10000; + /** Parameter name for the root dir in ZK for this cluster */ public static final String ZOOKEEPER_ZNODE_PARENT = "zookeeper.znode.parent"; public static final String DEFAULT_ZOOKEEPER_ZNODE_PARENT = "/hbase"; + /** Parameter name for the limit on concurrent client-side zookeeper connections */ + public static final String ZOOKEEPER_MAX_CLIENT_CNXNS = "hbase.zookeeper.property.maxClientCnxns"; + + /** Default wait time for the recoverable zookeeper */ + public static final int DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS = 30; + /** Parameter name for port region server listens on. */ public static final String REGIONSERVER_PORT = "hbase.regionserver.port"; @@ -343,6 +358,74 @@ public final class HConstants { */ public static long DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE = Long.MAX_VALUE; + /** + * Parameter name for client pause value, used mostly as value to wait + * before running a retry of a failed get, region lookup, etc. + */ + public static String HBASE_CLIENT_PAUSE = "hbase.client.pause"; + + /** + * Default value of {@link #HBASE_CLIENT_PAUSE}. + */ + public static long DEFAULT_HBASE_CLIENT_PAUSE = 1000; + + /** + * Parameter name for maximum retries, used as maximum for all retryable + * operations such as fetching of the root region from root region server, + * getting a cell's value, starting a row update, etc. + */ + public static String HBASE_CLIENT_RETRIES_NUMBER = "hbase.client.retries.number"; + + /** + * Default value of {@link #HBASE_CLIENT_RETRIES_NUMBER}. + */ + public static int DEFAULT_HBASE_CLIENT_RETRIES_NUMBER = 10; + + /** + * Parameter name for maximum attempts, used to limit the number of times the + * client will try to obtain the proxy for a given region server. + */ + public static String HBASE_CLIENT_RPC_MAXATTEMPTS = "hbase.client.rpc.maxattempts"; + + /** + * Default value of {@link #HBASE_CLIENT_RPC_MAXATTEMPTS}. + */ + public static int DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS = 1; + + /** + * Parameter name for client prefetch limit, used as the maximum number of regions + * info that will be prefetched. + */ + public static String HBASE_CLIENT_PREFETCH_LIMIT = "hbase.client.prefetch.limit"; + + /** + * Default value of {@link #HBASE_CLIENT_PREFETCH_LIMIT}. + */ + public static int DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT = 10; + + /** + * Parameter name for number of rows that will be fetched when calling next on + * a scanner if it is not served from memory. Higher caching values will + * enable faster scanners but will eat up more memory and some calls of next + * may take longer and longer times when the cache is empty. + */ + public static String HBASE_META_SCANNER_CACHING = "hbase.meta.scanner.caching"; + + /** + * Default value of {@link #HBASE_META_SCANNER_CACHING}. + */ + public static int DEFAULT_HBASE_META_SCANNER_CACHING = 100; + + /** + * Parameter name for unique identifier for this {@link Configuration} + * instance. If there are two or more {@link Configuration} instances that, + * for all intents and purposes, are the same except for their instance ids, + * then they will not be able to share the same {@link Connection} instance. + * On the other hand, even if the instance ids are the same, it could result + * in non-shared {@link Connection} instances if some of the other connection + * parameters differ. + */ + public static String HBASE_CLIENT_INSTANCE_ID = "hbase.client.instance.id"; /** * HRegion server lease period in milliseconds. Clients must report in within this period @@ -351,7 +434,6 @@ public final class HConstants { public static String HBASE_REGIONSERVER_LEASE_PERIOD_KEY = "hbase.regionserver.lease.period"; - /** * Default value of {@link #HBASE_REGIONSERVER_LEASE_PERIOD_KEY}. */ diff --git a/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java b/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java index be31179..d3ab5c0 100644 --- a/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java +++ b/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java @@ -28,12 +28,14 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.util.Bytes; @@ -57,6 +59,7 @@ import org.apache.hadoop.ipc.RemoteException; */ public class CatalogTracker { private static final Log LOG = LogFactory.getLog(CatalogTracker.class); + private final Configuration conf; private final HConnection connection; private final ZooKeeperWatcher zookeeper; private final RootRegionTracker rootRegionTracker; @@ -78,15 +81,18 @@ public class CatalogTracker { HRegionInfo.FIRST_META_REGIONINFO.getRegionName(); /** - * Constructs a catalog tracker. Find current state of catalog tables and - * begin active tracking by executing {@link #start()} post construction. - * Does not timeout. - * @param connection Server connection; if problem, this connections - * {@link HConnection#abort(String, Throwable)} will be called. - * @throws IOException + * Constructs a catalog tracker. Find current state of catalog tables and + * begin active tracking by executing {@link #start()} post construction. Does + * not timeout. + * + * @param conf + * the {@link Configuration} from which a {@link HConnection} will be + * obtained; if problem, this connections + * {@link HConnection#abort(String, Throwable)} will be called. + * @throws IOException */ - public CatalogTracker(final HConnection connection) throws IOException { - this(connection.getZooKeeperWatcher(), connection, connection); + public CatalogTracker(final Configuration conf) throws IOException { + this(null, conf, null); } /** @@ -98,10 +104,10 @@ public class CatalogTracker { * @param abortable if fatal exception * @throws IOException */ - public CatalogTracker(final ZooKeeperWatcher zk, final HConnection connection, + public CatalogTracker(final ZooKeeperWatcher zk, final Configuration conf, final Abortable abortable) throws IOException { - this(zk, connection, abortable, 0); + this(zk, conf, abortable, 0); } /** @@ -114,11 +120,21 @@ public class CatalogTracker { * ({@link Object#wait(long)} when passed a 0 waits for ever). * @throws IOException */ - public CatalogTracker(final ZooKeeperWatcher zk, final HConnection connection, - final Abortable abortable, final int defaultTimeout) + public CatalogTracker(final ZooKeeperWatcher zk, final Configuration conf, + Abortable abortable, final int defaultTimeout) throws IOException { - this.zookeeper = zk; + this(zk, conf, HConnectionManager.getConnection(conf), abortable, defaultTimeout); + } + + CatalogTracker(final ZooKeeperWatcher zk, final Configuration conf, + HConnection connection, Abortable abortable, final int defaultTimeout) + throws IOException { + this.conf = conf; this.connection = connection; + this.zookeeper = (zk == null) ? this.connection.getZooKeeperWatcher() : zk; + if (abortable == null) { + abortable = this.connection; + } this.rootRegionTracker = new RootRegionTracker(zookeeper, abortable); this.metaNodeTracker = new MetaNodeTracker(zookeeper, this, abortable); this.defaultTimeout = defaultTimeout; @@ -142,13 +158,16 @@ public class CatalogTracker { * Interrupts any ongoing waits. */ public void stop() { - LOG.debug("Stopping catalog tracker " + this); - this.stopped = true; - this.rootRegionTracker.stop(); - this.metaNodeTracker.stop(); - // Call this and it will interrupt any ongoing waits on meta. - synchronized (this.metaAvailable) { - this.metaAvailable.notifyAll(); + if (!this.stopped) { + LOG.debug("Stopping catalog tracker " + this); + this.stopped = true; + this.rootRegionTracker.stop(); + this.metaNodeTracker.stop(); + HConnectionManager.deleteConnection(this.conf, false); + // Call this and it will interrupt any ongoing waits on meta. + synchronized (this.metaAvailable) { + this.metaAvailable.notifyAll(); + } } } diff --git a/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index afb666a..6d5cf3f 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -63,7 +63,7 @@ import org.apache.hadoop.util.StringUtils; public class HBaseAdmin implements Abortable { private final Log LOG = LogFactory.getLog(this.getClass().getName()); // private final HConnection connection; - final HConnection connection; + private final HConnection connection; private volatile Configuration conf; private final long pause; private final int numRetries; @@ -100,9 +100,7 @@ public class HBaseAdmin implements Abortable { throws ZooKeeperConnectionException, IOException { CatalogTracker ct = null; try { - HConnection connection = - HConnectionManager.getConnection(this.conf); - ct = new CatalogTracker(connection); + ct = new CatalogTracker(this.conf); ct.start(); } catch (InterruptedException e) { // Let it out as an IOE for now until we redo all so tolerate IEs @@ -1220,4 +1218,8 @@ public class HBaseAdmin implements Abortable { copyOfConf.setInt("hbase.client.retries.number", 1); new HBaseAdmin(copyOfConf); } + + public void close() { + HConnectionManager.deleteConnection(this.conf, false); + } } diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index c348f7a..8fefabc 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -22,8 +22,16 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.lang.reflect.Proxy; import java.lang.reflect.UndeclaredThrowableException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; @@ -37,6 +45,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -50,7 +59,11 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.coprocessor.Batch; -import org.apache.hadoop.hbase.ipc.*; +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.hadoop.hbase.ipc.ExecRPCInvoker; +import org.apache.hadoop.hbase.ipc.HBaseRPC; +import org.apache.hadoop.hbase.ipc.HMasterInterface; +import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.SoftValueSortedMap; @@ -63,6 +76,8 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; +import com.google.common.collect.ImmutableMap; + /** * A non-instantiable class that manages {@link HConnection}s. * This class has a static Map of {@link HConnection} instances keyed by @@ -112,19 +127,28 @@ import org.apache.zookeeper.KeeperException; */ @SuppressWarnings("serial") public class HConnectionManager { - static final int MAX_CACHED_HBASE_INSTANCES = 31; - - // A LRU Map of Configuration hashcode -> TableServers. We set instances to 31. - // The zk default max connections to the ensemble from the one client is 30 so - // should run into zk issues before hit this value of 31. - private static final Map HBASE_INSTANCES = - new LinkedHashMap - ((int) (MAX_CACHED_HBASE_INSTANCES/0.75F)+1, 0.75F, true) { + // A LRU Map of Configuration hashcode -> TableServers. + private static final Map HBASE_INSTANCES; + + public static final int MAX_CACHED_HBASE_INSTANCES; + + static { + // We set instances to one more than the value specified for {@link + // HConstants#ZOOKEEPER_MAX_CLIENT_CNXNS}. By default, the zk default max + // connections to the ensemble from the one client is 30, so in that case we + // should run into zk issues before the LRU hit this value of 31. + MAX_CACHED_HBASE_INSTANCES = HBaseConfiguration.create().getInt( + HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, + HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1; + HBASE_INSTANCES = new LinkedHashMap( + (int) (MAX_CACHED_HBASE_INSTANCES / 0.75F) + 1, 0.75F, true) { @Override - protected boolean removeEldestEntry(Map.Entry eldest) { + protected boolean removeEldestEntry( + Map.Entry eldest) { return size() > MAX_CACHED_HBASE_INSTANCES; } - }; + }; + } /* * Non-instantiable. @@ -144,33 +168,34 @@ public class HConnectionManager { */ public static HConnection getConnection(Configuration conf) throws ZooKeeperConnectionException { - HConnectionImplementation connection; + HConnectionKey connectionKey = new HConnectionKey(conf); synchronized (HBASE_INSTANCES) { - connection = HBASE_INSTANCES.get(conf); + HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey); if (connection == null) { connection = new HConnectionImplementation(conf); - HBASE_INSTANCES.put(conf, connection); + HBASE_INSTANCES.put(connectionKey, connection); } + connection.incCount(); + return connection; } - return connection; } /** * Delete connection information for the instance specified by configuration. - * This will close connection to the zookeeper ensemble and let go of all - * resources. - * @param conf configuration whose identity is used to find {@link HConnection} - * instance. - * @param stopProxy Shuts down all the proxy's put up to cluster members - * including to cluster HMaster. Calls {@link HBaseRPC#stopProxy(org.apache.hadoop.ipc.VersionedProtocol)}. + * If there are no more references to it, this will then close connection to + * the zookeeper ensemble and let go of all resources. + * + * @param conf + * configuration whose identity is used to find {@link HConnection} + * instance. + * @param stopProxy + * Shuts down all the proxy's put up to cluster members including to + * cluster HMaster. Calls + * {@link HBaseRPC#stopProxy(org.apache.hadoop.ipc.VersionedProtocol)} + * . */ public static void deleteConnection(Configuration conf, boolean stopProxy) { - synchronized (HBASE_INSTANCES) { - HConnectionImplementation t = HBASE_INSTANCES.remove(conf); - if (t != null) { - t.close(stopProxy); - } - } + deleteConnection(new HConnectionKey(conf), stopProxy); } /** @@ -180,9 +205,41 @@ public class HConnectionManager { */ public static void deleteAllConnections(boolean stopProxy) { synchronized (HBASE_INSTANCES) { - for (HConnectionImplementation t : HBASE_INSTANCES.values()) { - if (t != null) { - t.close(stopProxy); + Set connectionKeys = new HashSet(); + connectionKeys.addAll(HBASE_INSTANCES.keySet()); + for (HConnectionKey connectionKey : connectionKeys) { + deleteConnection(connectionKey, stopProxy); + } + HBASE_INSTANCES.clear(); + } + } + + private static void deleteConnection(HConnection connection, boolean stopProxy) { + synchronized (HBASE_INSTANCES) { + HConnectionKey connectionKey = null; + for (Entry connectionEntry : HBASE_INSTANCES.entrySet()) { + if (connectionEntry.getValue() == connection) { + connectionKey = connectionEntry.getKey(); + } + } + if (connectionKey != null) { + HBASE_INSTANCES.remove(connectionKey); + ((HConnectionImplementation) connection).close(stopProxy); + } + } + } + + private static void deleteConnection(HConnectionKey connectionKey, boolean stopProxy) { + synchronized (HBASE_INSTANCES) { + HConnectionImplementation connection = HBASE_INSTANCES + .get(connectionKey); + if (connection != null) { + connection.decCount(); + if (connection.isZeroReference()) { + HBASE_INSTANCES.remove(connectionKey); + connection.close(stopProxy); + } else if (stopProxy) { + connection.stopProxyOnClose(stopProxy); } } } @@ -198,7 +255,9 @@ public class HConnectionManager { byte[] tableName) throws ZooKeeperConnectionException { HConnectionImplementation connection = (HConnectionImplementation)getConnection(conf); - return connection.getNumberOfCachedRegionLocations(tableName); + int cachedRegionCount = connection.getNumberOfCachedRegionLocations(tableName); + HConnectionManager.deleteConnection(conf, false); + return cachedRegionCount; } /** @@ -210,7 +269,91 @@ public class HConnectionManager { static boolean isRegionCached(Configuration conf, byte[] tableName, byte[] row) throws ZooKeeperConnectionException { HConnectionImplementation connection = (HConnectionImplementation)getConnection(conf); - return connection.isRegionCached(tableName, row); + boolean regionCached = connection.isRegionCached(tableName, row); + deleteConnection(conf, false); + return regionCached; + } + + /** + * Denotes a unique key to a {@link HConnection} instance. + * + * In essence, this class captures the properties in {@link Configuration} + * that may be used in the process of establishing a connection. In light of + * that, if any new such properties are introduced into the mix, they must be + * added to the {@link HConnectionKey#properties} list. + * + */ + static class HConnectionKey { + public static String[] CONNECTION_PROPERTIES = new String[] { + HConstants.ZOOKEEPER_QUORUM, HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.ZOOKEEPER_CLIENT_PORT, + HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME, + HConstants.HBASE_CLIENT_PAUSE, HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS, + HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.HBASE_CLIENT_PREFETCH_LIMIT, + HConstants.HBASE_META_SCANNER_CACHING, + HConstants.HBASE_CLIENT_INSTANCE_ID }; + + private Map properties; + + public HConnectionKey(Configuration conf) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + if (conf != null) { + for (String property : CONNECTION_PROPERTIES) { + String value = conf.get(property); + if (value != null) { + builder.put(property, value); + } + } + } + this.properties = builder.build(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + for (String property : CONNECTION_PROPERTIES) { + String value = properties.get(property); + if (value != null) { + result = prime * result + value.hashCode(); + } + } + + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + HConnectionKey that = (HConnectionKey) obj; + if (this.properties == null) { + if (that.properties != null) { + return false; + } + } else { + if (that.properties == null) { + return false; + } + for (String property : CONNECTION_PROPERTIES) { + String thisValue = this.properties.get(property); + String thatValue = that.properties.get(property); + if (thisValue == thatValue) { + continue; + } + if (thisValue == null || !thisValue.equals(thatValue)) { + return false; + } + } + } + return true; + } } /* Encapsulates connection to zookeeper and regionservers.*/ @@ -258,6 +401,10 @@ public class HConnectionManager { private final Set regionCachePrefetchDisabledTables = new CopyOnWriteArraySet(); + private boolean stopProxy; + private AtomicInteger refCount = new AtomicInteger(0); + + /** * constructor * @param conf Configuration object @@ -277,15 +424,19 @@ public class HConnectionManager { "Unable to find region server interface " + serverClassName, e); } - this.pause = conf.getLong("hbase.client.pause", 1000); - this.numRetries = conf.getInt("hbase.client.retries.number", 10); - this.maxRPCAttempts = conf.getInt("hbase.client.rpc.maxattempts", 1); + this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, + HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + this.numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.maxRPCAttempts = conf.getInt( + HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS, + HConstants.DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS); this.rpcTimeout = conf.getInt( HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - - this.prefetchRegionLimit = conf.getInt("hbase.client.prefetch.limit", - 10); + this.prefetchRegionLimit = conf.getInt( + HConstants.HBASE_CLIENT_PREFETCH_LIMIT, + HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT); setupZookeeperTrackers(); @@ -1044,24 +1195,27 @@ public class HConnectionManager { } void close(boolean stopProxy) { - if (master != null) { + if (!this.closed) { + if (master != null) { + if (stopProxy) { + HBaseRPC.stopProxy(master); + } + master = null; + masterChecked = false; + } if (stopProxy) { - HBaseRPC.stopProxy(master); + for (HRegionInterface i: servers.values()) { + HBaseRPC.stopProxy(i); + } } - master = null; - masterChecked = false; - } - if (stopProxy) { - for (HRegionInterface i: servers.values()) { - HBaseRPC.stopProxy(i); + this.servers.clear(); + if (this.zooKeeper != null) { + LOG.info("Closed zookeeper sessionid=0x" + + Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId())); + this.zooKeeper.close(); + this.zooKeeper = null; } } - if (this.zooKeeper != null) { - LOG.info("Closed zookeeper sessionid=0x" + - Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId())); - this.zooKeeper.close(); - this.zooKeeper = null; - } this.closed = true; } @@ -1458,5 +1612,53 @@ public class HConnectionManager { throw new IOException("Unexpected ZooKeeper exception", ke); } } + + public void stopProxyOnClose(boolean stopProxy) { + this.stopProxy = stopProxy; + } + + /** + * Increment this client's reference count. + */ + void incCount() { + refCount.incrementAndGet(); + } + + /** + * Decrement this client's reference count. + */ + void decCount() { + refCount.decrementAndGet(); + } + + /** + * Return if this client has no reference + * + * @return true if this client has no reference; false otherwise + */ + boolean isZeroReference() { + return refCount.get() == 0; + } + + /** + * Close the connection for real, regardless of what the value of + * {@link #refCount} is. Ideally, {@link refCount} should be zero at this + * point, which would imply that the connection was promptly cleaned up by + * its consumers. However, on the off chance that one of its consumer did + * not get around to calling + * {@link HConnectionManager#deleteConnection(Configuration, boolean)}, + * possibly because its clean up malfunctioned prematurely or was not even + * invoked, the method below will ensure that this {@link Connection} + * instance is cleaned up. Caveat: The JVM may take an unknown amount of + * time before it decides to call finalize on an unreachable object, so our + * hope is that every consumer of this {@link Connection} instance cleans up + * after itself, like any good citizen. + */ + @Override + protected void finalize() throws Throwable { + HConnectionManager.deleteConnection(this, stopProxy); + LOG.debug("The connection to " + this.zooKeeper + + " was closed in its finalize method."); + } } } diff --git a/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/src/main/java/org/apache/hadoop/hbase/client/HTable.java index edacf56..32f4c64 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.client; +import java.io.Closeable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -82,14 +83,18 @@ import org.apache.hadoop.hbase.util.Writables; * will go unnoticed. To run with changed values, make a new * {@link HTable} passing a new {@link Configuration} instance that has the * new configuration. + * + *

By design, this class implements the {@link Closeable} interface. When a + * HTable instance is no longer required, it *should* be closed in order to ensure + * that the underlying resources are promptly released. * * @see HBaseAdmin for create, drop, list, enable and disable of tables. * @see HConnection * @see HConnectionManager */ -public class HTable implements HTableInterface { +public class HTable implements HTableInterface, Closeable { private static final Log LOG = LogFactory.getLog(HTable.class); - private final HConnection connection; + private HConnection connection; private final byte [] tableName; protected final int scannerTimeout; private volatile Configuration configuration; @@ -101,6 +106,7 @@ public class HTable implements HTableInterface { private int maxKeyValueSize; private ExecutorService pool; // For Multi private long maxScannerResultSize; + private boolean closed; /** * Creates an object to access a HBase table. @@ -196,6 +202,7 @@ public class HTable implements HTableInterface { new SynchronousQueue(), new DaemonThreadFactory()); ((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true); + this.closed = false; } public Configuration getConfiguration() { @@ -249,7 +256,10 @@ public class HTable implements HTableInterface { */ public static boolean isTableEnabled(Configuration conf, byte[] tableName) throws IOException { - return HConnectionManager.getConnection(conf).isTableEnabled(tableName); + HConnection connection = HConnectionManager.getConnection(conf); + boolean tableEnabled = connection.isTableEnabled(tableName); + HConnectionManager.deleteConnection(conf, false); + return tableEnabled; } /** @@ -825,10 +835,19 @@ public class HTable implements HTableInterface { @Override public void close() throws IOException { - flushCommits(); - this.pool.shutdown(); + if (!this.closed) { + flushCommits(); + this.pool.shutdown(); + HConnectionManager.deleteConnection(this.configuration, false); + } + this.closed = true; } +// @Override +// protected void finalize() throws Throwable { +// close(); +// } + // validate for well-formedness private void validatePut(final Put put) throws IllegalArgumentException{ if (put.isEmpty()) { @@ -1289,8 +1308,10 @@ public class HTable implements HTableInterface { */ public static void setRegionCachePrefetch(final byte[] tableName, boolean enable) throws ZooKeeperConnectionException { - HConnectionManager.getConnection(HBaseConfiguration.create()). - setRegionCachePrefetch(tableName, enable); + Configuration configuration = HBaseConfiguration.create(); + HConnection connection = HConnectionManager.getConnection(configuration); + connection.setRegionCachePrefetch(tableName, enable); + HConnectionManager.deleteConnection(configuration, false); } /** @@ -1305,8 +1326,9 @@ public class HTable implements HTableInterface { */ public static void setRegionCachePrefetch(final Configuration conf, final byte[] tableName, boolean enable) throws ZooKeeperConnectionException { - HConnectionManager.getConnection(conf).setRegionCachePrefetch( - tableName, enable); + HConnection connection = HConnectionManager.getConnection(conf); + connection.setRegionCachePrefetch(tableName, enable); + HConnectionManager.deleteConnection(conf, false); } /** @@ -1319,8 +1341,10 @@ public class HTable implements HTableInterface { */ public static boolean getRegionCachePrefetch(final Configuration conf, final byte[] tableName) throws ZooKeeperConnectionException { - return HConnectionManager.getConnection(conf).getRegionCachePrefetch( - tableName); + HConnection connection = HConnectionManager.getConnection(conf); + boolean regionCachePrefetch = connection.getRegionCachePrefetch(tableName); + HConnectionManager.deleteConnection(conf, false); + return regionCachePrefetch; } /** @@ -1331,9 +1355,12 @@ public class HTable implements HTableInterface { * @throws ZooKeeperConnectionException */ public static boolean getRegionCachePrefetch(final byte[] tableName) throws ZooKeeperConnectionException { - return HConnectionManager.getConnection(HBaseConfiguration.create()). - getRegionCachePrefetch(tableName); - } + Configuration configuration = HBaseConfiguration.create(); + HConnection connection = HConnectionManager.getConnection(configuration); + boolean regionCachePrefetch = connection.getRegionCachePrefetch(tableName); + HConnectionManager.deleteConnection(configuration, false); + return regionCachePrefetch; + } /** * Explicitly clears the region cache to fetch the latest value from META. diff --git a/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java b/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java index 88827a8..5cc137c 100755 --- a/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java @@ -69,7 +69,7 @@ public class HTablePool { final HTableInterfaceFactory tableFactory) { // Make a new configuration instance so I can safely cleanup when // done with the pool. - this.config = config == null? new Configuration(): new Configuration(config); + this.config = config == null? new Configuration(): config; this.maxSize = maxSize; this.tableFactory = tableFactory == null? new HTableFactory(): tableFactory; } @@ -147,7 +147,7 @@ public class HTablePool { table = queue.poll(); } } - HConnectionManager.deleteConnection(this.config, true); +// HConnectionManager.deleteConnection(this.config, true); } /** diff --git a/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java b/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java index 9e3f4d1..476923e 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java @@ -158,8 +158,9 @@ public class MetaScanner { // Scan over each meta region ScannerCallable callable; - int rows = Math.min(rowLimit, - configuration.getInt("hbase.meta.scanner.caching", 100)); + int rows = Math.min(rowLimit, configuration.getInt( + HConstants.HBASE_META_SCANNER_CACHING, + HConstants.DEFAULT_HBASE_META_SCANNER_CACHING)); do { final Scan scan = new Scan(startRow).addFamily(HConstants.CATALOG_FAMILY); if (LOG.isDebugEnabled()) { @@ -201,6 +202,8 @@ public class MetaScanner { connection.getRegionServerWithRetries(callable); } } while (Bytes.compareTo(startRow, HConstants.LAST_ROW) != 0); + + HConnectionManager.deleteConnection(configuration, false); } /** diff --git a/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index d76e333..39a4f0f 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -66,6 +66,7 @@ import org.apache.zookeeper.KeeperException; public class ReplicationAdmin { private final ReplicationZookeeper replicationZk; + private final Configuration configuration; private final HConnection connection; /** @@ -79,6 +80,7 @@ public class ReplicationAdmin { throw new RuntimeException("hbase.replication isn't true, please " + "enable it in order to use replication"); } + this.configuration = conf; this.connection = HConnectionManager.getConnection(conf); ZooKeeperWatcher zkw = this.connection.getZooKeeperWatcher(); try { @@ -169,4 +171,8 @@ public class ReplicationAdmin { ReplicationZookeeper getReplicationZk() { return replicationZk; } + + public void close() { + HConnectionManager.deleteConnection(this.configuration, false); + } } diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index ed88bfa..26ecb59 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -107,8 +107,9 @@ public class VerifyReplication { scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); } + HConnection conn = null; try { - HConnection conn = HConnectionManager.getConnection(conf); + conn = HConnectionManager.getConnection(conf); ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf, conn.getZooKeeperWatcher()); ReplicationPeer peer = zk.getPeer(conf.get(NAME+".peerId")); @@ -116,6 +117,7 @@ public class VerifyReplication { conf.get(NAME+".tableName")); scan.setStartRow(value.getRow()); replicatedScanner = replicatedTable.getScanner(scan); + HConnectionManager.deleteConnection(conf, false); } catch (KeeperException e) { throw new IOException("Got a ZK exception", e); } @@ -151,8 +153,9 @@ public class VerifyReplication { if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) { throw new IOException("Replication needs to be enabled to verify it."); } + HConnection conn = null; try { - HConnection conn = HConnectionManager.getConnection(conf); + conn = HConnectionManager.getConnection(conf); ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf, conn.getZooKeeperWatcher()); // Just verifying it we can connect @@ -161,6 +164,7 @@ public class VerifyReplication { throw new IOException("Couldn't get access to the slave cluster," + "please see the log"); } + HConnectionManager.deleteConnection(conf, false); } catch (KeeperException ex) { throw new IOException("Couldn't get access to the slave cluster" + " because: ", ex); diff --git a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 79a48ba..5fc55f3 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -138,8 +138,6 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { // file system manager for the master FS operations private MasterFileSystem fileSystemManager; - private HConnection connection; - // server manager to deal with region server info private ServerManager serverManager; @@ -352,12 +350,11 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { ClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId()); - this.connection = HConnectionManager.getConnection(conf); this.executorService = new ExecutorService(getServerName()); this.serverManager = new ServerManager(this, this, metrics); - this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection, + this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf, this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE)); this.catalogTracker.start(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index d0a1e11..0ab52e6 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -171,7 +171,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, protected HServerInfo serverInfo; protected final Configuration conf; - private final HConnection connection; protected final AtomicBoolean haveRootRegion = new AtomicBoolean(false); private FileSystem fs; private Path rootDir; @@ -288,7 +287,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, public HRegionServer(Configuration conf) throws IOException, InterruptedException { this.fsOk = true; this.conf = conf; - this.connection = HConnectionManager.getConnection(conf); this.isOnline = false; // check to see if the codec list is available: @@ -510,7 +508,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, blockAndCheckIfStopped(this.clusterStatusTracker); // Create the catalog tracker and start it; - this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection, + this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf, this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE)); catalogTracker.start(); diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 78c3b42..0e25c7a 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -341,6 +341,7 @@ public class ReplicationSource extends Thread shipEdits(); } + HConnectionManager.deleteConnection(this.conf, false); LOG.debug("Source exiting " + peerClusterId); } diff --git a/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index 5da5e34..8edaf1a 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -229,11 +229,12 @@ public class HBaseFsck { */ private void loadDisabledTables() throws ZooKeeperConnectionException, IOException, KeeperException { - ZooKeeperWatcher zkw = - HConnectionManager.getConnection(conf).getZooKeeperWatcher(); + HConnection connection = HConnectionManager.getConnection(conf); + ZooKeeperWatcher zkw = connection.getZooKeeperWatcher(); for (String tableName : ZKTable.getDisabledOrDisablingTables(zkw)) { disabledTables.add(Bytes.toBytes(tableName)); } + HConnectionManager.deleteConnection(this.conf, false); } /** diff --git a/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java b/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java index b624d28..cd50c76 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java +++ b/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.zookeeper.ZKAssign; @@ -80,29 +81,33 @@ public class HBaseFsckRepair { private static void forceOfflineInZK(Configuration conf, HRegionInfo region) throws ZooKeeperConnectionException, KeeperException, IOException { - ZKAssign.createOrForceNodeOffline( - HConnectionManager.getConnection(conf).getZooKeeperWatcher(), - region, HConstants.HBCK_CODE_NAME); + HConnection connection = HConnectionManager.getConnection(conf); + ZKAssign.createOrForceNodeOffline(connection.getZooKeeperWatcher(), region, + HConstants.HBCK_CODE_NAME); + HConnectionManager.deleteConnection(conf, false); } private static void closeRegionSilentlyAndWait(Configuration conf, HServerAddress server, HRegionInfo region) throws IOException, InterruptedException { + HConnection connection = HConnectionManager.getConnection(conf); HRegionInterface rs = - HConnectionManager.getConnection(conf).getHRegionConnection(server); + connection.getHRegionConnection(server); rs.closeRegion(region, false); long timeout = conf.getLong("hbase.hbck.close.timeout", 120000); long expiration = timeout + System.currentTimeMillis(); while (System.currentTimeMillis() < expiration) { try { HRegionInfo rsRegion = rs.getRegionInfo(region.getRegionName()); - if (rsRegion == null) throw new NotServingRegionException(); + if (rsRegion == null) + throw new NotServingRegionException(); } catch (Exception e) { + HConnectionManager.deleteConnection(conf, false); return; } Thread.sleep(1000); } - throw new IOException("Region " + region + " failed to close within" + - " timeout " + timeout); + throw new IOException("Region " + region + " failed to close within" + + " timeout " + timeout); } } \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 7f5b377..684da7c 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -169,7 +169,7 @@ public class ZKUtil { "[\\t\\n\\x0B\\f\\r]", "")); StringBuilder builder = new StringBuilder(ensemble); builder.append(":"); - builder.append(conf.get("hbase.zookeeper.property.clientPort")); + builder.append(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)); builder.append(":"); builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); if (name != null && !name.isEmpty()) { diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index dc471c4..d2bbc7b 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -129,7 +129,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable { // Apparently this is recoverable. Retry a while. // See http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling // TODO: Generalize out in ZKUtil. - long wait = conf.getLong("hbase.zookeeper.recoverable.waittime", 10000); + long wait = conf.getLong(HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME, + HConstants.DEFAULT_ZOOKEPER_RECOVERABLE_WAITIME); long finished = System.currentTimeMillis() + wait; KeeperException ke = null; do { diff --git a/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java b/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java index e25184e..fd883f2 100644 --- a/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java +++ b/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java @@ -98,7 +98,8 @@ public class TestCatalogTracker { private CatalogTracker constructAndStartCatalogTracker(final HConnection c) throws IOException, InterruptedException { - CatalogTracker ct = new CatalogTracker(this.watcher, c, this.abortable); + CatalogTracker ct = new CatalogTracker(this.watcher, null, c, + this.abortable, 0); ct.start(); return ct; } diff --git a/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditor.java b/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditor.java index 60320a3..dc81ec5 100644 --- a/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditor.java +++ b/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditor.java @@ -70,8 +70,7 @@ public class TestMetaReaderEditor { @Before public void setup() throws IOException, InterruptedException { Configuration c = new Configuration(UTIL.getConfiguration()); zkw = new ZooKeeperWatcher(c, "TestMetaReaderEditor", ABORTABLE); - HConnection connection = HConnectionManager.getConnection(c); - ct = new CatalogTracker(zkw, connection, ABORTABLE); + ct = new CatalogTracker(zkw, c, ABORTABLE); ct.start(); } diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index b01a2d2..f296cc5 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -30,6 +30,7 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.util.Bytes; @@ -42,6 +43,7 @@ import org.apache.commons.logging.LogFactory; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; /** * This class is for testing HCM features @@ -106,9 +108,9 @@ public class TestHCM { + getValidKeyCount()); Thread.sleep(100); } - Assert.assertEquals(HConnectionManager.MAX_CACHED_HBASE_INSTANCES, + Assert.assertEquals(1, getHConnectionManagerCacheSize()); - Assert.assertEquals(HConnectionManager.MAX_CACHED_HBASE_INSTANCES, + Assert.assertEquals(1, getValidKeyCount()); } @@ -156,4 +158,69 @@ public class TestHCM { HRegionLocation rl = conn.getCachedLocation(TABLE_NAME, ROW); assertNull("What is this location?? " + rl, rl); } + + /** + * Make sure that {@link HConfiguration} instances that are essentially the + * same map to the same {@link HConnection} instance. + */ + @Test + public void testConnectionSameness() throws Exception { + HConnection previousConnection = null; + for (int i = 0; i < 2; i++) { + // set random key to differentiate the connection from previous ones + Configuration configuration = TEST_UTIL.getConfiguration(); + configuration.set("some_key", String.valueOf(_randy.nextInt())); + LOG.info("The hash code of the current configuration is: " + + configuration.hashCode()); + HConnection currentConnection = HConnectionManager + .getConnection(configuration); + if (previousConnection != null) { + assertTrue( + "Did not get the same connection even though its key didn't change", + previousConnection == currentConnection); + } + previousConnection = currentConnection; + // change the configuration, so that it is no longer reachable from the + // client's perspective. However, since its part of the LRU doubly linked + // list, it will eventually get thrown out, at which time it should also + // close the corresponding {@link HConnection}. + configuration.set("other_key", String.valueOf(_randy.nextInt())); + } + } + + /** + * Makes sure that there is no leaking of + * {@link HConnectionManager.TableServers} in the {@link HConnectionManager} + * class. + */ + @Test + public void testConnectionUniqueness() throws Exception { + HConnection previousConnection = null; + for (int i = 0; i < HConnectionManager.MAX_CACHED_HBASE_INSTANCES + 10; i++) { + // set random key to differentiate the connection from previous ones + Configuration configuration = TEST_UTIL.getConfiguration(); + configuration.set("some_key", String.valueOf(_randy.nextInt())); + configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID, + String.valueOf(_randy.nextInt())); + LOG.info("The hash code of the current configuration is: " + + configuration.hashCode()); + HConnection currentConnection = HConnectionManager + .getConnection(configuration); + if (previousConnection != null) { + assertTrue("Got the same connection even though its key changed!", + previousConnection != currentConnection); + } + // change the configuration, so that it is no longer reachable from the + // client's perspective. However, since its part of the LRU doubly linked + // list, it will eventually get thrown out, at which time it should also + // close the corresponding {@link HConnection}. + configuration.set("other_key", String.valueOf(_randy.nextInt())); + + previousConnection = currentConnection; + LOG.info("The current HConnectionManager#HBASE_INSTANCES cache size is: " + + getHConnectionManagerCacheSize() + + ", and the number of valid keys is: " + getValidKeyCount()); + Thread.sleep(50); + } + } } diff --git a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java index 624f4a8..70b21cf 100644 --- a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java +++ b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java @@ -144,6 +144,7 @@ public class TestTableMapReduce extends MultiRegionTable { FileUtil.fullyDelete( new File(job.getConfiguration().get("hadoop.tmp.dir"))); } + table.close(); } } @@ -170,6 +171,7 @@ public class TestTableMapReduce extends MultiRegionTable { } } assertTrue(verified); + table.close(); } /** diff --git a/src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java b/src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java index 8992dbb..3039df2 100644 --- a/src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java +++ b/src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java @@ -112,8 +112,7 @@ public class TestMergeTable { LOG.info("Starting mini hbase cluster"); UTIL.startMiniHBaseCluster(1, 1); Configuration c = new Configuration(UTIL.getConfiguration()); - HConnection connection = HConnectionManager.getConnection(c); - CatalogTracker ct = new CatalogTracker(connection); + CatalogTracker ct = new CatalogTracker(c); ct.start(); List originalTableRegions = MetaReader.getTableRegions(ct, desc.getName());