commit 27a9dea53103f1ad70c03fd16e4c1b6f0171a197 Author: Karthick Sankarachary Date: Wed Apr 20 16:08:33 2011 -0700 HBASE-3777 Redefine Identity Of HBase Configuration diff --git a/src/main/java/org/apache/hadoop/hbase/HConstants.java b/src/main/java/org/apache/hadoop/hbase/HConstants.java index 5701639..a2ff900 100644 --- a/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -93,9 +93,18 @@ public final class HConstants { /** Name of ZooKeeper config file in conf/ directory. */ public static final String ZOOKEEPER_CONFIG_NAME = "zoo.cfg"; - /** default client port that the zookeeper listens on */ + /** Parameter name for the client port that the zookeeper listens on */ + public static final String ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort"; + + /** Default client port that the zookeeper listens on */ public static final int DEFAULT_ZOOKEPER_CLIENT_PORT = 2181; + /** Parameter name for the wait time for the recoverable zookeeper */ + public static final String ZOOKEEPER_RECOVERABLE_WAITTIME = "hbase.zookeeper.recoverable.waittime"; + + /** Default wait time for the recoverable zookeeper */ + public static final long DEFAULT_ZOOKEPER_RECOVERABLE_WAITIME = 10000; + /** Parameter name for the root dir in ZK for this cluster */ public static final String ZOOKEEPER_ZNODE_PARENT = "zookeeper.znode.parent"; @@ -343,6 +352,71 @@ public final class HConstants { */ public static long DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE = Long.MAX_VALUE; + /** + * Parameter name for client pause value, used mostly as value to wait + * before running a retry of a failed get, region lookup, etc. + */ + public static String HBASE_CLIENT_PAUSE = "hbase.client.pause"; + + /** + * Default value of {@link #HBASE_CLIENT_PAUSE}. + */ + public static long DEFAULT_HBASE_CLIENT_PAUSE = 1000; + + /** + * Parameter name for maximum retries, used as maximum for all retryable + * operations such as fetching of the root region from root region server, + * getting a cell's value, starting a row update, etc. + */ + public static String HBASE_CLIENT_RETRIES_NUMBER = "hbase.client.retries.number"; + + /** + * Default value of {@link #HBASE_CLIENT_RETRIES_NUMBER}. + */ + public static int DEFAULT_HBASE_CLIENT_RETRIES_NUMBER = 10; + + /** + * Parameter name for maximum attempts, used to limit the number of times the + * client will try to obtain the proxy for a given region server. + */ + public static String HBASE_CLIENT_RPC_MAXATTEMPTS = "hbase.client.rpc.maxattempts"; + + /** + * Default value of {@link #HBASE_CLIENT_RPC_MAXATTEMPTS}. + */ + public static int DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS = 1; + + /** + * Parameter name for client prefetch limit, used as the maximum number of regions + * info that will be prefetched. + */ + public static String HBASE_CLIENT_PREFETCH_LIMIT = "hbase.client.prefetch.limit"; + + /** + * Default value of {@link #HBASE_CLIENT_PREFETCH_LIMIT}. + */ + public static int DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT = 10; + + /** + * Parameter name for number of rows that will be fetched when calling next on + * a scanner if it is not served from memory. Higher caching values will + * enable faster scanners but will eat up more memory and some calls of next + * may take longer and longer times when the cache is empty. + */ + public static String HBASE_META_SCANNER_CACHING = "hbase.meta.scanner.caching"; + + /** + * Default value of {@link #HBASE_META_SCANNER_CACHING}. + */ + public static int DEFAULT_HBASE_META_SCANNER_CACHING = 100; + + /** + * Parameter name for unique identifier for this {@link Configuration} + * instance. If there are two or more {@link Configuration} instances that, + * for all intents and purposes, are the same except for their instance ids, + * then they will not be able to share the same {@link Connection} instance. + */ + public static String HBASE_CLIENT_INSTANCE_ID = "hbase.client.instance.id"; /** * HRegion server lease period in milliseconds. Clients must report in within this period diff --git a/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java b/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java index be31179..4687808 100644 --- a/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java +++ b/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.util.Bytes; @@ -57,7 +58,7 @@ import org.apache.hadoop.ipc.RemoteException; */ public class CatalogTracker { private static final Log LOG = LogFactory.getLog(CatalogTracker.class); - private final HConnection connection; + private HConnection connection; private final ZooKeeperWatcher zookeeper; private final RootRegionTracker rootRegionTracker; private final MetaNodeTracker metaNodeTracker; @@ -146,6 +147,7 @@ public class CatalogTracker { this.stopped = true; this.rootRegionTracker.stop(); this.metaNodeTracker.stop(); + HConnectionManager.putConnection(this.connection); // Call this and it will interrupt any ongoing waits on meta. synchronized (this.metaAvailable) { this.metaAvailable.notifyAll(); diff --git a/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index afb666a..2087157 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -63,7 +63,7 @@ import org.apache.hadoop.util.StringUtils; public class HBaseAdmin implements Abortable { private final Log LOG = LogFactory.getLog(this.getClass().getName()); // private final HConnection connection; - final HConnection connection; + private HConnection connection; private volatile Configuration conf; private final long pause; private final int numRetries; @@ -1220,4 +1220,8 @@ public class HBaseAdmin implements Abortable { copyOfConf.setInt("hbase.client.retries.number", 1); new HBaseAdmin(copyOfConf); } + + public void close() { + this.connection.close(); + } } diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnection.java b/src/main/java/org/apache/hadoop/hbase/client/HConnection.java index 2bb4725..af041a3 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HConnection.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HConnection.java @@ -345,4 +345,10 @@ public interface HConnection extends Abortable { * @throws IOException if a remote or network exception occurs */ public int getCurrentNrHRS() throws IOException; + + /** + * Close the connection and clean up underlying resources if no longer in use. + */ + public void close(); + } diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index c348f7a..7aa1455 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -22,8 +22,15 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.lang.reflect.Proxy; import java.lang.reflect.UndeclaredThrowableException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; @@ -37,6 +44,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -50,7 +58,11 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.coprocessor.Batch; -import org.apache.hadoop.hbase.ipc.*; +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.hadoop.hbase.ipc.ExecRPCInvoker; +import org.apache.hadoop.hbase.ipc.HBaseRPC; +import org.apache.hadoop.hbase.ipc.HMasterInterface; +import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.SoftValueSortedMap; @@ -63,6 +75,8 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; +import com.google.common.collect.ImmutableMap; + /** * A non-instantiable class that manages {@link HConnection}s. * This class has a static Map of {@link HConnection} instances keyed by @@ -117,13 +131,14 @@ public class HConnectionManager { // A LRU Map of Configuration hashcode -> TableServers. We set instances to 31. // The zk default max connections to the ensemble from the one client is 30 so // should run into zk issues before hit this value of 31. - private static final Map HBASE_INSTANCES = - new LinkedHashMap - ((int) (MAX_CACHED_HBASE_INSTANCES/0.75F)+1, 0.75F, true) { - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > MAX_CACHED_HBASE_INSTANCES; - } + private static final Map HBASE_INSTANCES = + new LinkedHashMap( + (int) (MAX_CACHED_HBASE_INSTANCES / 0.75F) + 1, 0.75F, true) { + @Override + protected boolean removeEldestEntry( + Map.Entry eldest) { + return size() > MAX_CACHED_HBASE_INSTANCES; + } }; /* @@ -134,6 +149,18 @@ public class HConnectionManager { } /** + * Get the connection that goes with the default HBase configuration. If no + * current connection exists, method creates a new connection for it. + * + * @return HConnection object for the default HBase configuration + * @throws ZooKeeperConnectionException + */ + public static HConnection getConnection() + throws ZooKeeperConnectionException { + return getConnection(HBaseConfiguration.create()); + } + + /** * Get the connection that goes with the passed conf * configuration instance. * If no current connection exists, method creates a new connection for the @@ -144,31 +171,70 @@ public class HConnectionManager { */ public static HConnection getConnection(Configuration conf) throws ZooKeeperConnectionException { - HConnectionImplementation connection; synchronized (HBASE_INSTANCES) { - connection = HBASE_INSTANCES.get(conf); + HConnectionKey connectionKey = new HConnectionKey(conf); + HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey); if (connection == null) { connection = new HConnectionImplementation(conf); - HBASE_INSTANCES.put(conf, connection); + HBASE_INSTANCES.put(connectionKey, connection); + } + connection.incCount(); + return connection; + } + } + + /** + * Decrement the number of references to the given connection. If there are no + * more references, then and only then close the connection to the zookeeper + * ensemble and let go of all resources. + * + * @param connection + * connection, one of whose references is being closed + */ + public static void putConnection(HConnection connection) { + synchronized (HBASE_INSTANCES) { + HConnectionKey connectionKey = null; + for (Map.Entry entry : HBASE_INSTANCES + .entrySet()) { + if (entry.getValue() == connection) { + // remove connection if and only if reference count drops to zero + connectionKey = (entry.getValue().decCount() > 0) ? null : entry + .getKey(); + break; + } + } + if (connectionKey != null) { + HBASE_INSTANCES.remove(connectionKey); } } - return connection; } /** * Delete connection information for the instance specified by configuration. - * This will close connection to the zookeeper ensemble and let go of all - * resources. - * @param conf configuration whose identity is used to find {@link HConnection} - * instance. - * @param stopProxy Shuts down all the proxy's put up to cluster members - * including to cluster HMaster. Calls {@link HBaseRPC#stopProxy(org.apache.hadoop.ipc.VersionedProtocol)}. + * If there are no more references to it, this will then close connection to + * the zookeeper ensemble and let go of all resources. + * + * @param conf + * configuration whose identity is used to find {@link HConnection} + * instance. + * @param stopProxy + * Shuts down all the proxy's put up to cluster members including to + * cluster HMaster. Calls + * {@link HBaseRPC#stopProxy(org.apache.hadoop.ipc.VersionedProtocol)} + * . */ public static void deleteConnection(Configuration conf, boolean stopProxy) { synchronized (HBASE_INSTANCES) { - HConnectionImplementation t = HBASE_INSTANCES.remove(conf); - if (t != null) { - t.close(stopProxy); + HConnectionKey connectionKey = new HConnectionKey(conf); + HConnectionImplementation connection = HBASE_INSTANCES + .get(connectionKey); + if (connection != null) { + if (stopProxy) { + connection.stopProxyOnClose(stopProxy); + } + if (connection.decCount() == 0) { + HBASE_INSTANCES.remove(connectionKey); + } } } } @@ -180,11 +246,14 @@ public class HConnectionManager { */ public static void deleteAllConnections(boolean stopProxy) { synchronized (HBASE_INSTANCES) { - for (HConnectionImplementation t : HBASE_INSTANCES.values()) { - if (t != null) { - t.close(stopProxy); + for (HConnectionImplementation connection : HBASE_INSTANCES.values()) { + if (connection != null) { + if (stopProxy) { + connection.stopProxyOnClose(stopProxy); + } } } + HBASE_INSTANCES.clear(); } } @@ -198,7 +267,11 @@ public class HConnectionManager { byte[] tableName) throws ZooKeeperConnectionException { HConnectionImplementation connection = (HConnectionImplementation)getConnection(conf); - return connection.getNumberOfCachedRegionLocations(tableName); + try { + return connection.getNumberOfCachedRegionLocations(tableName); + } finally { + connection.close(); + } } /** @@ -210,7 +283,81 @@ public class HConnectionManager { static boolean isRegionCached(Configuration conf, byte[] tableName, byte[] row) throws ZooKeeperConnectionException { HConnectionImplementation connection = (HConnectionImplementation)getConnection(conf); - return connection.isRegionCached(tableName, row); + try { + return connection.isRegionCached(tableName, row); + } finally { + connection.close(); + } + } + + static class HConnectionKey { + public static String[] CONNECTION_PROPERTIES = new String[] { + HConstants.ZOOKEEPER_QUORUM, HConstants.ZOOKEEPER_CLIENT_PORT, + HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME, + HConstants.HBASE_CLIENT_PAUSE, HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS, + HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.HBASE_CLIENT_PREFETCH_LIMIT, + HConstants.HBASE_META_SCANNER_CACHING, + HConstants.HBASE_CLIENT_INSTANCE_ID }; + + private Map properties; + + public HConnectionKey(Configuration conf) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (String property : CONNECTION_PROPERTIES) { + String value = conf.get(property); + if (value != null) { + builder.put(property, value); + } + } + this.properties = builder.build(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + for (String property : CONNECTION_PROPERTIES) { + String value = properties.get(property); + if (value != null) { + result = prime * result + value.hashCode(); + } + } + + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + HConnectionKey that = (HConnectionKey) obj; + if (this.properties == null) { + if (that.properties != null) { + return false; + } + } else { + if (that.properties == null) { + return false; + } + for (String property : CONNECTION_PROPERTIES) { + String thisValue = this.properties.get(property); + String thatValue = that.properties.get(property); + if (thisValue == thatValue) { + continue; + } + if (thisValue == null || !thisValue.equals(thatValue)) { + return false; + } + } + } + return true; + } } /* Encapsulates connection to zookeeper and regionservers.*/ @@ -258,6 +405,10 @@ public class HConnectionManager { private final Set regionCachePrefetchDisabledTables = new CopyOnWriteArraySet(); + private boolean stopProxy; + private AtomicInteger refCount = new AtomicInteger(0); + + /** * constructor * @param conf Configuration object @@ -277,15 +428,19 @@ public class HConnectionManager { "Unable to find region server interface " + serverClassName, e); } - this.pause = conf.getLong("hbase.client.pause", 1000); - this.numRetries = conf.getInt("hbase.client.retries.number", 10); - this.maxRPCAttempts = conf.getInt("hbase.client.rpc.maxattempts", 1); + this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, + HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + this.numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.maxRPCAttempts = conf.getInt( + HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS, + HConstants.DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS); this.rpcTimeout = conf.getInt( HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - - this.prefetchRegionLimit = conf.getInt("hbase.client.prefetch.limit", - 10); + this.prefetchRegionLimit = conf.getInt( + HConstants.HBASE_CLIENT_PREFETCH_LIMIT, + HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT); setupZookeeperTrackers(); @@ -1458,5 +1613,42 @@ public class HConnectionManager { throw new IOException("Unexpected ZooKeeper exception", ke); } } + + public void stopProxyOnClose(boolean stopProxy) { + this.stopProxy = stopProxy; + } + + /** + * Increment this client's reference count. + */ + int incCount() { + return refCount.incrementAndGet(); + } + + /** + * Decrement this client's reference count. + */ + int decCount() { + return refCount.decrementAndGet(); + } + + /** + * Return if this client has no reference + * + * @return true if this client has no reference; false otherwise + */ + boolean isZeroReference() { + return refCount.get() == 0; + } + + @Override + protected void finalize() throws Throwable { + close(stopProxy); + } + + @Override + public void close() { + HConnectionManager.putConnection(this); + } } } diff --git a/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/src/main/java/org/apache/hadoop/hbase/client/HTable.java index edacf56..f43d0e2 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -89,7 +89,7 @@ import org.apache.hadoop.hbase.util.Writables; */ public class HTable implements HTableInterface { private static final Log LOG = LogFactory.getLog(HTable.class); - private final HConnection connection; + private HConnection connection; private final byte [] tableName; protected final int scannerTimeout; private volatile Configuration configuration; @@ -101,6 +101,7 @@ public class HTable implements HTableInterface { private int maxKeyValueSize; private ExecutorService pool; // For Multi private long maxScannerResultSize; + private boolean closed; /** * Creates an object to access a HBase table. @@ -196,6 +197,7 @@ public class HTable implements HTableInterface { new SynchronousQueue(), new DaemonThreadFactory()); ((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true); + this.closed = false; } public Configuration getConfiguration() { @@ -249,7 +251,12 @@ public class HTable implements HTableInterface { */ public static boolean isTableEnabled(Configuration conf, byte[] tableName) throws IOException { - return HConnectionManager.getConnection(conf).isTableEnabled(tableName); + HConnection connection = HConnectionManager.getConnection(conf); + try { + return connection.isTableEnabled(tableName); + } finally { + connection.close(); + } } /** @@ -825,8 +832,20 @@ public class HTable implements HTableInterface { @Override public void close() throws IOException { - flushCommits(); - this.pool.shutdown(); + if (!this.closed) { + try { + flushCommits(); + this.pool.shutdown(); + } finally { + this.connection.close(); + } + } + this.closed = true; + } + + @Override + protected void finalize() throws Throwable { + close(); } // validate for well-formedness @@ -1289,8 +1308,12 @@ public class HTable implements HTableInterface { */ public static void setRegionCachePrefetch(final byte[] tableName, boolean enable) throws ZooKeeperConnectionException { - HConnectionManager.getConnection(HBaseConfiguration.create()). - setRegionCachePrefetch(tableName, enable); + HConnection connection = HConnectionManager.getConnection(); + try { + connection.setRegionCachePrefetch(tableName, enable); + } finally { + connection.close(); + } } /** @@ -1305,8 +1328,12 @@ public class HTable implements HTableInterface { */ public static void setRegionCachePrefetch(final Configuration conf, final byte[] tableName, boolean enable) throws ZooKeeperConnectionException { - HConnectionManager.getConnection(conf).setRegionCachePrefetch( - tableName, enable); + HConnection connection = HConnectionManager.getConnection(conf); + try { + connection.setRegionCachePrefetch(tableName, enable); + } finally { + connection.close(); + } } /** @@ -1319,8 +1346,12 @@ public class HTable implements HTableInterface { */ public static boolean getRegionCachePrefetch(final Configuration conf, final byte[] tableName) throws ZooKeeperConnectionException { - return HConnectionManager.getConnection(conf).getRegionCachePrefetch( - tableName); + HConnection connection = HConnectionManager.getConnection(conf); + try { + return connection.getRegionCachePrefetch(tableName); + } finally { + connection.close(); + } } /** @@ -1331,9 +1362,13 @@ public class HTable implements HTableInterface { * @throws ZooKeeperConnectionException */ public static boolean getRegionCachePrefetch(final byte[] tableName) throws ZooKeeperConnectionException { - return HConnectionManager.getConnection(HBaseConfiguration.create()). - getRegionCachePrefetch(tableName); - } + HConnection connection = HConnectionManager.getConnection(); + try { + return connection.getRegionCachePrefetch(tableName); + } finally { + connection.close(); + } + } /** * Explicitly clears the region cache to fetch the latest value from META. diff --git a/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java b/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java index 88827a8..099fff3 100755 --- a/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java @@ -69,7 +69,7 @@ public class HTablePool { final HTableInterfaceFactory tableFactory) { // Make a new configuration instance so I can safely cleanup when // done with the pool. - this.config = config == null? new Configuration(): new Configuration(config); + this.config = config == null? new Configuration(): config; this.maxSize = maxSize; this.tableFactory = tableFactory == null? new HTableFactory(): tableFactory; } diff --git a/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java b/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java index 9e3f4d1..9cec45a 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java @@ -158,8 +158,9 @@ public class MetaScanner { // Scan over each meta region ScannerCallable callable; - int rows = Math.min(rowLimit, - configuration.getInt("hbase.meta.scanner.caching", 100)); + int rows = Math.min(rowLimit, configuration.getInt( + HConstants.HBASE_META_SCANNER_CACHING, + HConstants.DEFAULT_HBASE_META_SCANNER_CACHING)); do { final Scan scan = new Scan(startRow).addFamily(HConstants.CATALOG_FAMILY); if (LOG.isDebugEnabled()) { @@ -201,6 +202,7 @@ public class MetaScanner { connection.getRegionServerWithRetries(callable); } } while (Bytes.compareTo(startRow, HConstants.LAST_ROW) != 0); + connection.close(); } /** diff --git a/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index d76e333..31a8db6 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -169,4 +169,8 @@ public class ReplicationAdmin { ReplicationZookeeper getReplicationZk() { return replicationZk; } + + public void close() { + this.connection.close(); + } } diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index ed88bfa..09da00a 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -107,8 +107,9 @@ public class VerifyReplication { scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); } + HConnection conn = null; try { - HConnection conn = HConnectionManager.getConnection(conf); + conn = HConnectionManager.getConnection(conf); ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf, conn.getZooKeeperWatcher()); ReplicationPeer peer = zk.getPeer(conf.get(NAME+".peerId")); @@ -118,6 +119,10 @@ public class VerifyReplication { replicatedScanner = replicatedTable.getScanner(scan); } catch (KeeperException e) { throw new IOException("Got a ZK exception", e); + } finally { + if (conn != null) { + conn.close(); + } } } Result res = replicatedScanner.next(); @@ -151,8 +156,9 @@ public class VerifyReplication { if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) { throw new IOException("Replication needs to be enabled to verify it."); } + HConnection conn = null; try { - HConnection conn = HConnectionManager.getConnection(conf); + conn = HConnectionManager.getConnection(conf); ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf, conn.getZooKeeperWatcher()); // Just verifying it we can connect @@ -164,6 +170,10 @@ public class VerifyReplication { } catch (KeeperException ex) { throw new IOException("Couldn't get access to the slave cluster" + " because: ", ex); + } finally { + if (conn != null) { + conn.close(); + } } conf.set(NAME+".peerId", peerId); conf.set(NAME+".tableName", tableName); diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 78c3b42..d516aca 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -341,6 +341,9 @@ public class ReplicationSource extends Thread shipEdits(); } + if (this.conn != null) { + this.conn.close(); + } LOG.debug("Source exiting " + peerClusterId); } diff --git a/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index 5da5e34..ea00e32 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -229,10 +229,14 @@ public class HBaseFsck { */ private void loadDisabledTables() throws ZooKeeperConnectionException, IOException, KeeperException { - ZooKeeperWatcher zkw = - HConnectionManager.getConnection(conf).getZooKeeperWatcher(); - for (String tableName : ZKTable.getDisabledOrDisablingTables(zkw)) { - disabledTables.add(Bytes.toBytes(tableName)); + HConnection connection = HConnectionManager.getConnection(conf); + try { + ZooKeeperWatcher zkw = connection.getZooKeeperWatcher(); + for (String tableName : ZKTable.getDisabledOrDisablingTables(zkw)) { + disabledTables.add(Bytes.toBytes(tableName)); + } + } finally { + connection.close(); } } diff --git a/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java b/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java index b624d28..b1fa0fc 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java +++ b/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.zookeeper.ZKAssign; @@ -80,29 +81,39 @@ public class HBaseFsckRepair { private static void forceOfflineInZK(Configuration conf, HRegionInfo region) throws ZooKeeperConnectionException, KeeperException, IOException { - ZKAssign.createOrForceNodeOffline( - HConnectionManager.getConnection(conf).getZooKeeperWatcher(), - region, HConstants.HBCK_CODE_NAME); + HConnection connection = HConnectionManager.getConnection(conf); + try { + ZKAssign.createOrForceNodeOffline(connection.getZooKeeperWatcher(), + region, HConstants.HBCK_CODE_NAME); + } finally { + connection.close(); + } } private static void closeRegionSilentlyAndWait(Configuration conf, HServerAddress server, HRegionInfo region) throws IOException, InterruptedException { + HConnection connection = HConnectionManager.getConnection(conf); HRegionInterface rs = - HConnectionManager.getConnection(conf).getHRegionConnection(server); + connection.getHRegionConnection(server); rs.closeRegion(region, false); long timeout = conf.getLong("hbase.hbck.close.timeout", 120000); long expiration = timeout + System.currentTimeMillis(); - while (System.currentTimeMillis() < expiration) { - try { - HRegionInfo rsRegion = rs.getRegionInfo(region.getRegionName()); - if (rsRegion == null) throw new NotServingRegionException(); - } catch (Exception e) { - return; + try { + while (System.currentTimeMillis() < expiration) { + try { + HRegionInfo rsRegion = rs.getRegionInfo(region.getRegionName()); + if (rsRegion == null) + throw new NotServingRegionException(); + } catch (Exception e) { + return; + } + Thread.sleep(1000); } - Thread.sleep(1000); + throw new IOException("Region " + region + " failed to close within" + + " timeout " + timeout); + } finally { + connection.close(); } - throw new IOException("Region " + region + " failed to close within" + - " timeout " + timeout); } } \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 7f5b377..684da7c 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -169,7 +169,7 @@ public class ZKUtil { "[\\t\\n\\x0B\\f\\r]", "")); StringBuilder builder = new StringBuilder(ensemble); builder.append(":"); - builder.append(conf.get("hbase.zookeeper.property.clientPort")); + builder.append(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)); builder.append(":"); builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); if (name != null && !name.isEmpty()) { diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index dc471c4..d2bbc7b 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -129,7 +129,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable { // Apparently this is recoverable. Retry a while. // See http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling // TODO: Generalize out in ZKUtil. - long wait = conf.getLong("hbase.zookeeper.recoverable.waittime", 10000); + long wait = conf.getLong(HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME, + HConstants.DEFAULT_ZOOKEPER_RECOVERABLE_WAITIME); long finished = System.currentTimeMillis() + wait; KeeperException ke = null; do { diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index b01a2d2..f296cc5 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -30,6 +30,7 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.util.Bytes; @@ -42,6 +43,7 @@ import org.apache.commons.logging.LogFactory; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; /** * This class is for testing HCM features @@ -106,9 +108,9 @@ public class TestHCM { + getValidKeyCount()); Thread.sleep(100); } - Assert.assertEquals(HConnectionManager.MAX_CACHED_HBASE_INSTANCES, + Assert.assertEquals(1, getHConnectionManagerCacheSize()); - Assert.assertEquals(HConnectionManager.MAX_CACHED_HBASE_INSTANCES, + Assert.assertEquals(1, getValidKeyCount()); } @@ -156,4 +158,69 @@ public class TestHCM { HRegionLocation rl = conn.getCachedLocation(TABLE_NAME, ROW); assertNull("What is this location?? " + rl, rl); } + + /** + * Make sure that {@link HConfiguration} instances that are essentially the + * same map to the same {@link HConnection} instance. + */ + @Test + public void testConnectionSameness() throws Exception { + HConnection previousConnection = null; + for (int i = 0; i < 2; i++) { + // set random key to differentiate the connection from previous ones + Configuration configuration = TEST_UTIL.getConfiguration(); + configuration.set("some_key", String.valueOf(_randy.nextInt())); + LOG.info("The hash code of the current configuration is: " + + configuration.hashCode()); + HConnection currentConnection = HConnectionManager + .getConnection(configuration); + if (previousConnection != null) { + assertTrue( + "Did not get the same connection even though its key didn't change", + previousConnection == currentConnection); + } + previousConnection = currentConnection; + // change the configuration, so that it is no longer reachable from the + // client's perspective. However, since its part of the LRU doubly linked + // list, it will eventually get thrown out, at which time it should also + // close the corresponding {@link HConnection}. + configuration.set("other_key", String.valueOf(_randy.nextInt())); + } + } + + /** + * Makes sure that there is no leaking of + * {@link HConnectionManager.TableServers} in the {@link HConnectionManager} + * class. + */ + @Test + public void testConnectionUniqueness() throws Exception { + HConnection previousConnection = null; + for (int i = 0; i < HConnectionManager.MAX_CACHED_HBASE_INSTANCES + 10; i++) { + // set random key to differentiate the connection from previous ones + Configuration configuration = TEST_UTIL.getConfiguration(); + configuration.set("some_key", String.valueOf(_randy.nextInt())); + configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID, + String.valueOf(_randy.nextInt())); + LOG.info("The hash code of the current configuration is: " + + configuration.hashCode()); + HConnection currentConnection = HConnectionManager + .getConnection(configuration); + if (previousConnection != null) { + assertTrue("Got the same connection even though its key changed!", + previousConnection != currentConnection); + } + // change the configuration, so that it is no longer reachable from the + // client's perspective. However, since its part of the LRU doubly linked + // list, it will eventually get thrown out, at which time it should also + // close the corresponding {@link HConnection}. + configuration.set("other_key", String.valueOf(_randy.nextInt())); + + previousConnection = currentConnection; + LOG.info("The current HConnectionManager#HBASE_INSTANCES cache size is: " + + getHConnectionManagerCacheSize() + + ", and the number of valid keys is: " + getValidKeyCount()); + Thread.sleep(50); + } + } }