Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 1023967) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (working copy) @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.util.Threads; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; @@ -283,7 +282,7 @@ switch(event.getState()) { case SyncConnected: // Update our identifier. Otherwise ignore. - LOG.info(this.identifier + " connected"); + LOG.debug(this.identifier + " connected"); // Now, this callback can be invoked before the this.zookeeper is set. // Wait a little while. long finished = System.currentTimeMillis() + Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (revision 1023967) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (working copy) @@ -91,7 +91,7 @@ throw new IOException("Unable to determine ZooKeeper ensemble"); } int timeout = conf.getInt("zookeeper.session.timeout", 60 * 1000); - LOG.info(descriptor + " opening connection to ZooKeeper with ensemble (" + + LOG.debug(descriptor + " opening connection to ZooKeeper with ensemble (" + ensemble + ")"); return new ZooKeeper(ensemble, timeout, watcher); } @@ -194,7 +194,7 @@ throws KeeperException { try { Stat s = zkw.getZooKeeper().exists(znode, zkw); - LOG.info(zkw.prefix("Set watcher on existing znode " + znode)); + LOG.debug(zkw.prefix("Set watcher on existing znode " + znode)); return s != null ? true : false; } catch (KeeperException e) { LOG.warn(zkw.prefix("Unable to set watcher on znode " + znode), e); Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1023967) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -1243,6 +1243,10 @@ r.hasReferences()? "Region has references on open" : "Region has too many store files"); } + + // Add to online regions if all above was successful. + addToOnlineRegions(r); + // Update ZK, ROOT or META if (r.getRegionInfo().isRootRegion()) { RootLocationEditor.setRootLocation(getZooKeeper(), @@ -1257,8 +1261,6 @@ MetaEditor.updateRegionLocation(ct, r.getRegionInfo(), getServerInfo()); } } - // Add to online regions if all above was successful. - addToOnlineRegions(r); } /** Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1023967) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -1322,7 +1322,6 @@ // If bulkAssign in progress, suspend checks if (this.bulkAssign) return; synchronized (regionsInTransition) { - LOG.debug("Checking for timed out RIT"); // Iterate all regions in transition checking for time outs long now = System.currentTimeMillis(); for (RegionState regionState : regionsInTransition.values()) { Index: src/main/java/org/apache/hadoop/hbase/master/LogCleanerDelegate.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/LogCleanerDelegate.java (revision 1023967) +++ src/main/java/org/apache/hadoop/hbase/master/LogCleanerDelegate.java (working copy) @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Stoppable; /** * Interface for the log cleaning function inside the master. By default, three @@ -30,15 +31,14 @@ * "hbase.master.logcleaner.plugins", which is a comma-separated list of fully * qualified class names. LogsCleaner will add it to the chain. * - * HBase ships with LogsCleaner as the default implementation. + *

HBase ships with LogsCleaner as the default implementation. * - * This interface extends Configurable, so setConf needs to be called once + *

This interface extends Configurable, so setConf needs to be called once * before using the cleaner. * Since LogCleanerDelegates are created in LogsCleaner by reflection. Classes * that implements this interface should provide a default constructor. */ -public interface LogCleanerDelegate extends Configurable { - +public interface LogCleanerDelegate extends Configurable, Stoppable { /** * Should the master delete the log or keep it? * @param filePath full path to log. Index: src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java (revision 1023967) +++ src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java (working copy) @@ -178,7 +178,7 @@ /* * Version of master that will shutdown the passed zk cluster on its way out. */ - static class LocalHMaster extends HMaster { + public static class LocalHMaster extends HMaster { private MiniZooKeeperCluster zkcluster = null; public LocalHMaster(Configuration conf) Index: src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java (revision 1023967) +++ src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java (working copy) @@ -153,4 +153,19 @@ LOG.warn("Error while cleaning the logs", e); } } + + @Override + public void run() { + try { + super.run(); + } finally { + for (LogCleanerDelegate lc: this.logCleanersChain) { + try { + lc.stop("Exiting"); + } catch (Throwable t) { + LOG.warn("Stopping", t); + } + } + } + } } \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/master/TimeToLiveLogCleaner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/TimeToLiveLogCleaner.java (revision 1023967) +++ src/main/java/org/apache/hadoop/hbase/master/TimeToLiveLogCleaner.java (working copy) @@ -29,12 +29,11 @@ * be deleted. By default they are allowed to live for 10 minutes. */ public class TimeToLiveLogCleaner implements LogCleanerDelegate { - - static final Log LOG = - LogFactory.getLog(TimeToLiveLogCleaner.class.getName()); + static final Log LOG = LogFactory.getLog(TimeToLiveLogCleaner.class.getName()); private Configuration conf; // Configured time a log can be kept after it was closed private long ttl; + private boolean stopped = false; @Override public boolean isLogDeletable(Path filePath) { @@ -67,4 +66,14 @@ public Configuration getConf() { return conf; } -} + + @Override + public void stop(String why) { + this.stopped = true; + } + + @Override + public boolean isStopped() { + return this.stopped; + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java (revision 1023967) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.io.Writable; @@ -102,6 +103,12 @@ public void close(TaskAttemptContext context) throws IOException { table.flushCommits(); + // The following call will shutdown all connections to the cluster from + // this JVM. It will close out our zk session otherwise zk wil log + // expired sessions rather than closed ones. If any other HTable instance + // running in this JVM, this next call will cause it damage. Presumption + // is that the above this.table is only instance. + HConnectionManager.deleteAllConnections(true); } /** Index: src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (revision 1023967) +++ src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (working copy) @@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.master.LogCleanerDelegate; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -38,12 +39,11 @@ * replication before deleting it when its TTL is over. */ public class ReplicationLogCleaner implements LogCleanerDelegate { - - private static final Log LOG = - LogFactory.getLog(ReplicationLogCleaner.class); + private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class); private Configuration conf; private ReplicationZookeeper zkHelper; private Set hlogs = new HashSet(); + private boolean stopped = false; /** * Instantiates the cleaner, does nothing more. @@ -105,11 +105,13 @@ @Override public void setConf(Configuration conf) { - this.conf = conf; + // Make my own Configuration. Then I'll have my own connection to zk that + // I can close myself when comes time. + this.conf = new Configuration(conf); try { ZooKeeperWatcher zkw = - new ZooKeeperWatcher(conf, this.getClass().getName(), null); - this.zkHelper = new ReplicationZookeeper(conf, zkw); + new ZooKeeperWatcher(this.conf, "replicationLogCleaner", null); + this.zkHelper = new ReplicationZookeeper(this.conf, zkw); } catch (KeeperException e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } catch (IOException e) { @@ -122,4 +124,20 @@ public Configuration getConf() { return conf; } -} + + @Override + public void stop(String why) { + if (this.stopped) return; + this.stopped = true; + if (this.zkHelper != null) { + LOG.info("Stopping " + this.zkHelper.getZookeeperWatcher()); + this.zkHelper.getZookeeperWatcher().close(); + } + HConnectionManager.deleteConnection(this.conf, true); + } + + @Override + public boolean isStopped() { + return this.stopped; + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/util/HMerge.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/HMerge.java (revision 1023967) +++ src/main/java/org/apache/hadoop/hbase/util/HMerge.java (working copy) @@ -106,7 +106,7 @@ HConnection connection = HConnectionManager.getConnection(conf); masterIsRunning = connection.isMasterRunning(); } - HConnectionManager.deleteConnection(conf, false); + HConnectionManager.deleteConnection(conf, true); if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) { if (masterIsRunning) { throw new IllegalStateException( Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1023967) +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -68,21 +68,54 @@ import org.apache.zookeeper.KeeperException; /** - * A non-instantiable class that manages connections to tables. - * Used by {@link HTable} and {@link HBaseAdmin} + * A non-instantiable class that manages {@link HConnection}s. + * This class has a static Map of {@link HConnection} instances keyed by + * {@link Configuration}; all invocations of {@link #getConnection(Configuration)} + * that pass the same {@link Configuration} instance will be returned the same + * {@link HConnection} instance (Adding properties to a Configuration + * instance does not change its object identity). Sharing {@link HConnection} + * instances is usually what you want; all clients of the {@link HConnection} + * instances share the HConnections' cache of Region locations rather than each + * having to discover for itself the location of meta, root, etc. It makes + * sense for the likes of the pool of HTables class {@link HTablePool}, for + * instance (If concerned that a single {@link HConnection} is insufficient + * for sharing amongst clients in say an heavily-multithreaded environment, + * in practise its not proven to be an issue. Besides, {@link HConnection} is + * implemented atop Hadoop RPC and as of this writing, Hadoop RPC does a + * connection per cluster-member, exclusively). + * + *

But sharing connections + * makes clean up of {@link HConnection} instances a little awkward. Currently, + * clients cleanup by calling + * {@link #deleteConnection(Configuration, boolean)}. This will shutdown the + * zookeeper connection the HConnection was using and clean up all + * HConnection resources as well as stopping proxies to servers out on the + * cluster. Not running the cleanup will not end the world; it'll + * just stall the closeup some and spew some zookeeper connection failed + * messages into the log. Running the cleanup on a {@link HConnection} that is + * subsequently used by another will cause breakage so be careful running + * cleanup. + *

To create a {@link HConnection} that is not shared by others, you can + * create a new {@link Configuration} instance, pass this new instance to + * {@link #getConnection(Configuration)}, and then when done, close it up by + * doing something like the following: + *

+ * {@code
+ * Configuration newConfig = new Configuration(originalConf);
+ * HConnection connection = HConnectionManager.getConnection(newConfig);
+ * // Use the connection to your hearts' delight and then when done...
+ * HConnectionManager.deleteConnection(newConfig, true);
+ * }
+ * 
+ *

Cleanup used to be done inside in a shutdown hook. On startup we'd + * register a shutdown hook that called {@link #deleteAllConnections(boolean)} + * on its way out but the order in which shutdown hooks run is not defined so + * were problematic for clients of HConnection that wanted to register their + * own shutdown hooks so we removed ours though this shifts the onus for + * cleanup to the client. */ @SuppressWarnings("serial") public class HConnectionManager { - // Register a shutdown hook, one that cleans up RPC and closes zk sessions. - static { - Runtime.getRuntime().addShutdownHook(new Thread("HCM.shutdownHook") { - @Override - public void run() { - HConnectionManager.deleteAllConnections(true); - } - }); - } - static final int MAX_CACHED_HBASE_INSTANCES = 31; // A LRU Map of Configuration hashcode -> TableServers. We set instances to 31. @@ -105,7 +138,8 @@ } /** - * Get the connection that goes with the passed conf configuration. + * Get the connection that goes with the passed conf + * configuration instance. * If no current connection exists, method creates a new connection for the * passed conf instance. * @param conf configuration @@ -126,9 +160,13 @@ } /** - * Delete connection information for the instance specified by configuration - * @param conf configuration - * @param stopProxy stop the proxy as well + * Delete connection information for the instance specified by configuration. + * This will close connection to the zookeeper ensemble and let go of all + * resources. + * @param conf configuration whose identity is used to find {@link HConnection} + * instance. + * @param stopProxy Shuts down all the proxy's put up to cluster members + * including to cluster HMaster. Calls {@link HBaseRPC#stopProxy(org.apache.hadoop.ipc.VersionedProtocol)}. */ public static void deleteConnection(Configuration conf, boolean stopProxy) { synchronized (HBASE_INSTANCES) { @@ -140,14 +178,6 @@ } /** - * Delete connection information for the instance - * @param connection configuration - */ - public static void deleteConnection(HConnection connection) { - deleteConnection(connection.getConfiguration(), false); - } - - /** * Delete information for all connections. * @param stopProxy stop the proxy as well * @throws IOException Index: src/main/java/org/apache/hadoop/hbase/client/HConnection.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnection.java (revision 1023967) +++ src/main/java/org/apache/hadoop/hbase/client/HConnection.java (working copy) @@ -33,13 +33,27 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; /** - * Cluster connection. + * Cluster connection. Hosts a connection to the ZooKeeper ensemble and + * thereafter into the HBase cluster. Knows how to locate regions out on the cluster, + * keeps a cache of locations and then knows how to recalibrate after they move. * {@link HConnectionManager} manages instances of this class. + * + *

HConnections are used by {@link HTable} mostly but also by + * {@link HBaseAdmin}, {@link CatalogTracker}, + * and {@link ZooKeeperWatcher}. HConnection instances can be shared. Sharing + * is usually what you want because rather than each HConnection instance + * having to do its own discovery of regions out on the cluster, instead, all + * clients get to share the one cache of locations. Sharing makes cleanup of + * HConnections awkward. See {@link HConnectionManager} for cleanup + * discussion. + * + * @see HConnectionManager */ public interface HConnection extends Abortable { /** @@ -48,7 +62,7 @@ public Configuration getConfiguration(); /** - * Retrieve ZooKeeperWatcher used by the connection. + * Retrieve ZooKeeperWatcher used by this connection. * @return ZooKeeperWatcher handle being used by the connection. * @throws IOException if a remote or network exception occurs */ @@ -302,4 +316,4 @@ */ public void prewarmRegionCache(final byte[] tableName, final Map regions); -} +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/client/HTablePool.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (revision 1023967) +++ src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (working copy) @@ -19,18 +19,15 @@ */ package org.apache.hadoop.hbase.client; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.util.Bytes; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.Queue; - /** * A simple pool of HTable instances.

* @@ -38,16 +35,16 @@ * HTablePool and use {@link #getTable(String)} to get an HTable from the pool. * Once you are done with it, return it to the pool with {@link #putTable(HTableInterface)}.

* - * A pool can be created with a maxSize which defines the most HTable + *

A pool can be created with a maxSize which defines the most HTable * references that will ever be retained for each table. Otherwise the default - * is {@link Integer#MAX_VALUE}.

+ * is {@link Integer#MAX_VALUE}. */ public class HTablePool { private final ConcurrentMap> tables = new ConcurrentHashMap>(); private final Configuration config; private final int maxSize; - private HTableInterfaceFactory tableFactory = new HTableFactory(); + private final HTableInterfaceFactory tableFactory; /** * Default Constructor. Default HBaseConfiguration and no limit on pool size. @@ -61,15 +58,17 @@ * @param config configuration * @param maxSize maximum number of references to keep for each table */ - public HTablePool(Configuration config, int maxSize) { - this.config = config; - this.maxSize = maxSize; + public HTablePool(final Configuration config, final int maxSize) { + this(config, maxSize, null); } - public HTablePool(Configuration config, int maxSize, HTableInterfaceFactory tableFactory) { - this.config = config; + public HTablePool(final Configuration config, final int maxSize, + final HTableInterfaceFactory tableFactory) { + // Make a new configuration instance so I can safely cleanup when + // done with the pool. + this.config = new Configuration(config); this.maxSize = maxSize; - this.tableFactory = tableFactory; + this.tableFactory = tableFactory == null? new HTableFactory(): tableFactory; } /** @@ -146,7 +145,7 @@ table = queue.poll(); } } - + HConnectionManager.deleteConnection(this.config, true); } /** @@ -164,4 +163,4 @@ return queue.size(); } } -} +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (revision 1023967) +++ src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (working copy) @@ -108,7 +108,7 @@ private void cleanupCatalogTracker(final CatalogTracker ct) { ct.stop(); - HConnectionManager.deleteConnection(ct.getConnection()); + HConnectionManager.deleteConnection(ct.getConnection().getConfiguration(), true); } @Override Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1023967) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -62,11 +62,13 @@ * be corrupted if multiple threads contend over a single HTable instance. * *

Instances of HTable passed the same {@link Configuration} instance will - * share connections to master and the zookeeper ensemble as well as caches of - * region locations. This happens because they will all share the same - * {@link HConnection} instance (internally we keep a Map of {@link HConnection} - * instances keyed by {@link Configuration}). - * {@link HConnection} will read most of the + * share connections to servers out on the cluster and to the zookeeper ensemble + * as well as caches of region locations. This is usually a *good* thing. + * This happens because they will all share the same underlying + * {@link HConnection} instance. See {@link HConnectionManager} for more on + * how this mechanism works. + * + *

{@link HConnection} will read most of the * configuration it needs from the passed {@link Configuration} on initial * construction. Thereafter, for settings such as * hbase.client.pause, hbase.client.retries.number, @@ -77,6 +79,8 @@ * new configuration. * * @see HBaseAdmin for create, drop, list, enable and disable of tables. + * @see HConnection + * @see HConnectionManager */ public class HTable implements HTableInterface { private static final Log LOG = LogFactory.getLog(HTable.class);