Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (date 1301171901000) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision ) @@ -25,7 +25,6 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; @@ -98,11 +97,11 @@ * run the cluster. All others park themselves in their constructor until * master or cluster shutdown or until the active master loses its lease in * zookeeper. Thereafter, all running master jostle to take over master role. - * + *

*

The Master can be asked shutdown the cluster. See {@link #shutdown()}. In * this case it will tell all regionservers to go down and then wait on them * all reporting in that they are down. This master will then shut itself down. - * + *

*

You can also shutdown just this master. Call {@link #stopMaster()}. * * @see HMasterInterface @@ -110,7 +109,7 @@ * @see Watcher */ public class HMaster extends Thread -implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { + implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { private static final Log LOG = LogFactory.getLog(HMaster.class.getName()); // MASTER is name of the webapp and the attribute name used stuffing this @@ -175,18 +174,19 @@ /** * Initializes the HMaster. The steps are as follows: - *

+ *

*

    *
  1. Initialize HMaster RPC and address *
  2. Connect to ZooKeeper. *
- *

+ *

* Remaining steps of initialization occur in {@link #run()} so that they * run in their own thread rather than within the context of the constructor. + * * @throws InterruptedException */ public HMaster(final Configuration conf) - throws IOException, KeeperException, InterruptedException { + throws IOException, KeeperException, InterruptedException { this.conf = conf; /* @@ -196,12 +196,12 @@ HServerAddress a = new HServerAddress(getMyAddress(this.conf)); int numHandlers = conf.getInt("hbase.regionserver.handler.count", 10); this.rpcServer = HBaseRPC.getServer(this, - new Class[]{HMasterInterface.class, HMasterRegionInterface.class}, - a.getBindAddress(), a.getPort(), - numHandlers, - 0, // we dont use high priority handlers in master - false, conf, - 0); // this is a DNC w/o high priority handlers + new Class[]{HMasterInterface.class, HMasterRegionInterface.class}, + a.getBindAddress(), a.getPort(), + numHandlers, + 0, // we dont use high priority handlers in master + false, conf, + 0); // this is a DNC w/o high priority handlers this.address = new HServerAddress(rpcServer.getListenerAddress()); // initialize server principal (if using secure Hadoop) @@ -219,7 +219,7 @@ // config param for task trackers, but we can piggyback off of it. if (this.conf.get("mapred.task.id") == null) { this.conf.set("mapred.task.id", "hb_m_" + this.address.toString() + - "_" + System.currentTimeMillis()); + "_" + System.currentTimeMillis()); } this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + @@ -231,25 +231,26 @@ /** * Stall startup if we are designated a backup master; i.e. we want someone * else to become the master before proceeding. + * * @param c * @param amm * @throws InterruptedException */ private static void stallIfBackupMaster(final Configuration c, - final ActiveMasterManager amm) - throws InterruptedException { + final ActiveMasterManager amm) + throws InterruptedException { // If we're a backup master, stall until a primary to writes his address if (!c.getBoolean(HConstants.MASTER_TYPE_BACKUP, - HConstants.DEFAULT_MASTER_TYPE_BACKUP)) { + HConstants.DEFAULT_MASTER_TYPE_BACKUP)) { return; } LOG.debug("HMaster started in backup mode. " + - "Stalling until master znode is written."); + "Stalling until master znode is written."); // This will only be a minute or so while the cluster starts up, // so don't worry about setting watches on the parent znode while (!amm.isActiveMaster()) { LOG.debug("Waiting for master address ZNode to be written " + - "(Also watching cluster state node)"); + "(Also watching cluster state node)"); Thread.sleep(c.getInt("zookeeper.session.timeout", 180 * 1000)); } } @@ -276,10 +277,8 @@ * now wait until it dies to try and become the next active master. If we * do not succeed on our first attempt, this is no longer a cluster startup. */ - this.activeMasterManager = new ActiveMasterManager(zooKeeper, address, this); - this.zooKeeper.registerListener(activeMasterManager); - stallIfBackupMaster(this.conf, this.activeMasterManager); - this.activeMasterManager.blockUntilBecomingActiveMaster(); + becomeActiveMaster(); + // We are either the active master or we were asked to shutdown if (!this.stopped) { finishInitialization(); @@ -307,6 +306,50 @@ LOG.info("HMaster main thread exiting"); } + /** + * Try becoming active master. + * @return + * @throws InterruptedException + */ + private boolean becomeActiveMaster() throws InterruptedException { + this.activeMasterManager = new ActiveMasterManager(zooKeeper, address, this); + this.zooKeeper.registerListener(activeMasterManager); + stallIfBackupMaster(this.conf, this.activeMasterManager); + return this.activeMasterManager.blockUntilBecomingActiveMaster(); + } + + /** + * Initilize all ZK based system trackers. + * @throws IOException + * @throws InterruptedException + */ + private void initilizeZKBasedSystemTrackers() throws IOException, InterruptedException, KeeperException { + this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection, + this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE)); + this.catalogTracker.start(); + + this.assignmentManager = new AssignmentManager(this, serverManager, + this.catalogTracker, this.executorService); + this.balancer = new LoadBalancer(conf); + zooKeeper.registerListenerFirst(assignmentManager); + + this.regionServerTracker = new RegionServerTracker(zooKeeper, this, + this.serverManager); + this.regionServerTracker.start(); + + // Set the cluster as up. If new RSs, they'll be waiting on this before + // going ahead with their startup. + this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this); + this.clusterStatusTracker.start(); + boolean wasUp = this.clusterStatusTracker.isClusterUp(); + if (!wasUp) this.clusterStatusTracker.setClusterUp(); + + LOG.info("Server active/primary master; " + this.address + + ", sessionid=0x" + + Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()) + + ", cluster-up flag was=" + wasUp); + } + private void loop() { // Check if we should stop every second. Sleeper sleeper = new Sleeper(1000, this); @@ -317,12 +360,12 @@ /** * Finish initialization of HMaster after becoming the primary master. - * + *

*

    *
  1. Initialize master components - file system manager, server manager, - * assignment manager, region server tracker, catalog tracker, etc
  2. + * assignment manager, region server tracker, catalog tracker, etc *
  3. Start necessary service threads - rpc server, info server, - * executor services, etc
  4. + * executor services, etc *
  5. Set cluster as UP in ZooKeeper
  6. *
  7. Wait for RegionServers to check-in
  8. *
  9. Split logs and perform data recovery, if necessary
  10. @@ -335,7 +378,7 @@ * @throws KeeperException */ private void finishInitialization() - throws IOException, InterruptedException, KeeperException { + throws IOException, InterruptedException, KeeperException { isActiveMaster = true; @@ -352,31 +395,8 @@ this.serverManager = new ServerManager(this, this, metrics); - this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection, - this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE)); - this.catalogTracker.start(); + initilizeZKBasedSystemTrackers(); - this.assignmentManager = new AssignmentManager(this, serverManager, - this.catalogTracker, this.executorService); - this.balancer = new LoadBalancer(conf); - zooKeeper.registerListenerFirst(assignmentManager); - - this.regionServerTracker = new RegionServerTracker(zooKeeper, this, - this.serverManager); - this.regionServerTracker.start(); - - // Set the cluster as up. If new RSs, they'll be waiting on this before - // going ahead with their startup. - this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this); - this.clusterStatusTracker.start(); - boolean wasUp = this.clusterStatusTracker.isClusterUp(); - if (!wasUp) this.clusterStatusTracker.setClusterUp(); - - LOG.info("Server active/primary master; " + this.address + - ", sessionid=0x" + - Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()) + - ", cluster-up flag was=" + wasUp); - // initialize master side coprocessors before we start handling requests this.cpHost = new MasterCoprocessorHost(this, this.conf); @@ -388,7 +408,7 @@ // TODO: Should do this in background rather than block master startup this.fileSystemManager. - splitLogAfterStartup(this.serverManager.getOnlineServers()); + splitLogAfterStartup(this.serverManager.getOnlineServers()); // Make sure root and meta assigned before proceeding. assignRootAndMeta(); @@ -398,6 +418,7 @@ // fresh start. TOOD: Be fancier. If regionsCount == 2, perhaps the // 2 are .META. and -ROOT- and we should fall into the fresh startup // branch below. For now, do processFailover. + LOG.info("********************** My Master ****************************************************"); if (regionCount == 0) { LOG.info("Master startup proceeding: cluster startup"); this.assignmentManager.cleanoutUnassigned(); @@ -411,7 +432,7 @@ // been assigned. this.balancerChore = getAndStartBalancerChore(this); this.catalogJanitorChore = - Threads.setDaemonThreadRunning(new CatalogJanitor(this, this)); + Threads.setDaemonThreadRunning(new CatalogJanitor(this, this)); LOG.info("Master has completed initialization"); initialized = true; @@ -420,34 +441,36 @@ /** * Check -ROOT- and .META. are assigned. If not, * assign them. + * + * @return Count of regions we assigned. * @throws InterruptedException * @throws IOException * @throws KeeperException - * @return Count of regions we assigned. */ int assignRootAndMeta() - throws InterruptedException, IOException, KeeperException { + throws InterruptedException, IOException, KeeperException { int assigned = 0; long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000); // Work on ROOT region. Is it in zk in transition? boolean rit = this.assignmentManager. - processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.ROOT_REGIONINFO); + processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.ROOT_REGIONINFO); if (!catalogTracker.verifyRootRegionLocation(timeout)) { this.assignmentManager.assignRoot(); this.catalogTracker.waitForRoot(); assigned++; } else { // Region already assigned. We didnt' assign it. Add to in-memory state. + LOG.info("-ROOT- already assigned. Updating in-memory state"); this.assignmentManager.regionOnline(HRegionInfo.ROOT_REGIONINFO, - this.serverManager.getHServerInfo(this.catalogTracker.getRootLocation())); + this.serverManager.getHServerInfo(this.catalogTracker.getRootLocation())); } LOG.info("-ROOT- assigned=" + assigned + ", rit=" + rit + - ", location=" + catalogTracker.getRootLocation()); + ", location=" + catalogTracker.getRootLocation()); // Work on meta region rit = this.assignmentManager. - processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO); + processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO); if (!this.catalogTracker.verifyMetaRegionLocation(timeout)) { this.assignmentManager.assignMeta(); this.catalogTracker.waitForMeta(); @@ -457,11 +480,12 @@ assigned++; } else { // Region already assigned. We didnt' assign it. Add to in-memory state. + LOG.info(".META. already assigned. Updating in-memory state"); this.assignmentManager.regionOnline(HRegionInfo.FIRST_META_REGIONINFO, - this.serverManager.getHServerInfo(this.catalogTracker.getMetaLocation())); + this.serverManager.getHServerInfo(this.catalogTracker.getMetaLocation())); } LOG.info(".META. assigned=" + assigned + ", rit=" + rit + - ", location=" + catalogTracker.getMetaLocation()); + ", location=" + catalogTracker.getMetaLocation()); return assigned; } @@ -470,16 +494,18 @@ * @throws UnknownHostException */ private static String getMyAddress(final Configuration c) - throws UnknownHostException { + throws UnknownHostException { // Find out our address up in DNS. - String s = DNS.getDefaultHost(c.get("hbase.master.dns.interface","default"), + String s = DNS.getDefaultHost(c.get("hbase.master.dns.interface", "default"), - c.get("hbase.master.dns.nameserver","default")); + c.get("hbase.master.dns.nameserver", "default")); s += ":" + c.get(HConstants.MASTER_PORT, Integer.toString(HConstants.DEFAULT_MASTER_PORT)); return s; } - /** @return HServerAddress of the master server */ + /** + * @return HServerAddress of the master server + */ public HServerAddress getMasterAddress() { return this.address; } @@ -488,7 +514,9 @@ return HMasterInterface.VERSION; } - /** @return InfoServer object. Maybe null.*/ + /** + * @return InfoServer object. Maybe null. + */ public InfoServer getInfoServer() { return this.infoServer; } @@ -515,6 +543,7 @@ /** * Get the ZK wrapper object - needed by master_jsp.java + * * @return the zookeeper wrapper */ public ZooKeeperWatcher getZooKeeperWatcher() { @@ -532,24 +561,26 @@ try { // Start the executor service pools this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, - conf.getInt("hbase.master.executor.openregion.threads", 5)); + conf.getInt("hbase.master.executor.openregion.threads", 5)); this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, - conf.getInt("hbase.master.executor.closeregion.threads", 5)); + conf.getInt("hbase.master.executor.closeregion.threads", 5)); this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, - conf.getInt("hbase.master.executor.serverops.threads", 3)); + conf.getInt("hbase.master.executor.serverops.threads", 3)); this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, - conf.getInt("hbase.master.executor.serverops.threads", 5)); + conf.getInt("hbase.master.executor.serverops.threads", 5)); // We depend on there being only one instance of this executor running // at a time. To do concurrency, would need fencing of enable/disable of // tables. this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1); + //this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 3); + // Start log cleaner thread String n = Thread.currentThread().getName(); this.logCleaner = - new LogCleaner(conf.getInt("hbase.master.cleaner.interval", 60 * 1000), - this, conf, getMasterFileSystem().getFileSystem(), - getMasterFileSystem().getOldLogDir()); + new LogCleaner(conf.getInt("hbase.master.cleaner.interval", 60 * 1000), + this, conf, getMasterFileSystem().getFileSystem(), + getMasterFileSystem().getOldLogDir()); Threads.setDaemonThreadRunning(logCleaner, n + ".oldLogCleaner"); // Put up info server. @@ -567,7 +598,7 @@ } } catch (IOException e) { if (e instanceof RemoteException) { - e = ((RemoteException)e).unwrapRemoteException(); + e = ((RemoteException) e).unwrapRemoteException(); } // Something happened during startup. Shut things down. abort("Failed startup", e); @@ -580,7 +611,7 @@ } if (this.rpcServer != null) this.rpcServer.stop(); // Clean up and close up shop - if (this.logCleaner!= null) this.logCleaner.interrupt(); + if (this.logCleaner != null) this.logCleaner.interrupt(); if (this.infoServer != null) { LOG.info("Stopping infoServer"); try { @@ -595,7 +626,7 @@ private static Thread getAndStartBalancerChore(final HMaster master) { String name = master.getServerName() + "-BalancerChore"; int balancerPeriod = - master.getConfiguration().getInt("hbase.balancer.period", 300000); + master.getConfiguration().getInt("hbase.balancer.period", 300000); // Start up the load balancer chore Chore chore = new Chore(name, balancerPeriod, master) { @Override @@ -617,8 +648,8 @@ @Override public MapWritable regionServerStartup(final HServerInfo serverInfo, - final long serverCurrentTime) - throws IOException { + final long serverCurrentTime) + throws IOException { // Set the ip into the passed in serverInfo. Its ip is more than likely // not the ip that the master sees here. See at end of this method where // we pass it back to the regionserver by setting "hbase.regionserver.address" @@ -633,14 +664,14 @@ this.serverManager.regionServerStartup(serverInfo, serverCurrentTime); // Send back some config info MapWritable mw = createConfigurationSubset(); - mw.put(new Text("hbase.regionserver.address"), - serverInfo.getServerAddress()); + mw.put(new Text("hbase.regionserver.address"), + serverInfo.getServerAddress()); return mw; } /** * @return Subset of configuration to pass initializing regionservers: e.g. - * the filesystem to use and root directory to use. + * the filesystem to use and root directory to use. */ protected MapWritable createConfigurationSubset() { MapWritable mw = addConfig(new MapWritable(), HConstants.HBASE_DIR); @@ -653,22 +684,23 @@ } @Override - public HMsg [] regionServerReport(HServerInfo serverInfo, HMsg msgs[], + public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[], - HRegionInfo[] mostLoadedRegions) - throws IOException { + HRegionInfo[] mostLoadedRegions) + throws IOException { return adornRegionServerAnswer(serverInfo, - this.serverManager.regionServerReport(serverInfo, msgs, mostLoadedRegions)); + this.serverManager.regionServerReport(serverInfo, msgs, mostLoadedRegions)); } /** * Override if you'd add messages to return to regionserver hsi * or to send an exception. + * * @param msgs Messages to add to * @return Messages to return to * @throws IOException exceptions that were injected for the region servers */ - protected HMsg [] adornRegionServerAnswer(final HServerInfo hsi, + protected HMsg[] adornRegionServerAnswer(final HServerInfo hsi, - final HMsg [] msgs) throws IOException { + final HMsg[] msgs) throws IOException { return msgs; } @@ -681,11 +713,11 @@ */ private int getBalancerCutoffTime() { int balancerCutoffTime = - getConfiguration().getInt("hbase.balancer.max.balancing", -1); + getConfiguration().getInt("hbase.balancer.max.balancing", -1); if (balancerCutoffTime == -1) { // No time period set so create one -- do half of balancer period. int balancerPeriod = - getConfiguration().getInt("hbase.balancer.period", 300000); + getConfiguration().getInt("hbase.balancer.period", 300000); balancerCutoffTime = balancerPeriod / 2; // If nonsense period, set it to balancerPeriod if (balancerCutoffTime <= 0) balancerCutoffTime = balancerPeriod; @@ -704,15 +736,15 @@ // Only allow one balance run at at time. if (this.assignmentManager.isRegionsInTransition()) { LOG.debug("Not running balancer because " + - this.assignmentManager.getRegionsInTransition().size() + - " region(s) in transition: " + - org.apache.commons.lang.StringUtils. - abbreviate(this.assignmentManager.getRegionsInTransition().toString(), 256)); + this.assignmentManager.getRegionsInTransition().size() + + " region(s) in transition: " + + org.apache.commons.lang.StringUtils. + abbreviate(this.assignmentManager.getRegionsInTransition().toString(), 256)); return false; } if (this.serverManager.areDeadServersInProgress()) { LOG.debug("Not running balancer because processing dead regionserver(s): " + - this.serverManager.getDeadServers()); + this.serverManager.getDeadServers()); return false; } @@ -729,9 +761,9 @@ } Map> assignments = - this.assignmentManager.getAssignments(); + this.assignmentManager.getAssignments(); // Returned Map from AM does not include mention of servers w/o assignments. - for (Map.Entry e: + for (Map.Entry e : this.serverManager.getOnlineServers().entrySet()) { HServerInfo hsi = e.getValue(); if (!assignments.containsKey(hsi)) { @@ -739,20 +771,20 @@ } } List plans = this.balancer.balanceCluster(assignments); - int rpCount = 0; // number of RegionPlans balanced so far + int rpCount = 0; // number of RegionPlans balanced so far long totalRegPlanExecTime = 0; if (plans != null && !plans.isEmpty()) { - for (RegionPlan plan: plans) { + for (RegionPlan plan : plans) { LOG.info("balance " + plan); long balStartTime = System.currentTimeMillis(); this.assignmentManager.balance(plan); - totalRegPlanExecTime += System.currentTimeMillis()-balStartTime; + totalRegPlanExecTime += System.currentTimeMillis() - balStartTime; rpCount++; if (rpCount < plans.size() && // if performing next balance exceeds cutoff time, exit the loop (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) { LOG.debug("No more balancing till next balance run; maximumBalanceTime=" + - maximumBalanceTime); + maximumBalanceTime); break; } } @@ -792,24 +824,25 @@ * Switch for the background {@link CatalogJanitor} thread. * Used for testing. The thread will continue to run. It will just be a noop * if disabled. + * * @param b If false, the catalog janitor won't do anything. */ public void setCatalogJanitorEnabled(final boolean b) { - ((CatalogJanitor)this.catalogJanitorChore).setEnabled(b); + ((CatalogJanitor) this.catalogJanitorChore).setEnabled(b); } @Override public void move(final byte[] encodedRegionName, final byte[] destServerName) - throws UnknownRegionException { + throws UnknownRegionException { Pair p = - this.assignmentManager.getAssignment(encodedRegionName); + this.assignmentManager.getAssignment(encodedRegionName); if (p == null) throw new UnknownRegionException(Bytes.toString(encodedRegionName)); HRegionInfo hri = p.getFirst(); HServerInfo dest = null; if (destServerName == null || destServerName.length == 0) { LOG.info("Passed destination servername is null/empty so " + - "choosing a server at random"); + "choosing a server at random"); this.assignmentManager.clearRegionPlan(hri); // Unassign will reassign it elsewhere choosing random server. this.assignmentManager.unassign(hri); @@ -827,29 +860,29 @@ } } - public void createTable(HTableDescriptor desc, byte [][] splitKeys) + public void createTable(HTableDescriptor desc, byte[][] splitKeys) - throws IOException { + throws IOException { createTable(desc, splitKeys, false); } - public void createTable(HTableDescriptor desc, byte [][] splitKeys, + public void createTable(HTableDescriptor desc, byte[][] splitKeys, - boolean sync) - throws IOException { + boolean sync) + throws IOException { if (!isMasterRunning()) { throw new MasterNotRunningException(); } if (cpHost != null) { cpHost.preCreateTable(desc, splitKeys); } - HRegionInfo [] newRegions = null; + HRegionInfo[] newRegions = null; - if(splitKeys == null || splitKeys.length == 0) { + if (splitKeys == null || splitKeys.length == 0) { - newRegions = new HRegionInfo [] { new HRegionInfo(desc, null, null) }; + newRegions = new HRegionInfo[]{new HRegionInfo(desc, null, null)}; } else { int numRegions = splitKeys.length + 1; newRegions = new HRegionInfo[numRegions]; - byte [] startKey = null; + byte[] startKey = null; - byte [] endKey = null; + byte[] endKey = null; - for(int i=0;i getTableRegionForRow( + Pair getTableRegionForRow( - final byte [] tableName, final byte [] rowKey) + final byte[] tableName, final byte[] rowKey) - throws IOException { + throws IOException { final AtomicReference> result = - new AtomicReference>(null); + new AtomicReference>(null); MetaScannerVisitor visitor = - new MetaScannerVisitor() { - @Override - public boolean processRow(Result data) throws IOException { - if (data == null || data.size() <= 0) { - return true; - } - Pair pair = - MetaReader.metaRowToRegionPair(data); - if (pair == null) { - return false; - } - if (!Bytes.equals(pair.getFirst().getTableDesc().getName(), + new MetaScannerVisitor() { + @Override + public boolean processRow(Result data) throws IOException { + if (data == null || data.size() <= 0) { + return true; + } + Pair pair = + MetaReader.metaRowToRegionPair(data); + if (pair == null) { + return false; + } + if (!Bytes.equals(pair.getFirst().getTableDesc().getName(), tableName)) { - return false; - } - result.set(pair); - return true; - } - }; + return false; + } + result.set(pair); + return true; + } + }; MetaScanner.metaScan(conf, visitor, tableName, rowKey, 1); return result.get(); @@ -1035,7 +1068,7 @@ @Override public void modifyTable(final byte[] tableName, HTableDescriptor htd) - throws IOException { + throws IOException { if (cpHost != null) { cpHost.preModifyTable(tableName, htd); } @@ -1046,8 +1079,8 @@ } @Override - public void checkTableModifiable(final byte [] tableName) + public void checkTableModifiable(final byte[] tableName) - throws IOException { + throws IOException { String tableNameStr = Bytes.toString(tableName); if (isCatalogTable(tableName)) { throw new IOException("Can't modify catalog tables"); @@ -1066,6 +1099,7 @@ this.assignmentManager.clearRegionFromTransition(hri); } } + /** * @return cluster status */ @@ -1080,12 +1114,66 @@ @Override public void abort(final String msg, final Throwable t) { + if (abortNow(msg, t)) { - if (t != null) LOG.fatal(msg, t); - else LOG.fatal(msg); - this.abort = true; - stop("Aborting"); - } + if (t != null) LOG.fatal(msg, t); + else LOG.fatal(msg); + this.abort = true; + stop("Aborting"); + } + } + /** + * We do the following. + * 1. Create a new ZK session. (since our current one is expired) + * 2. Try to become a primary master again + * 3. Initialize all ZK based system trackers. + * 4. Assign root and meta. (they are already assigned, but we need to update our internal memory state to reflect it) + * 5. Process any RIT if any during the process of our recovery. + * + * @return + * @throws InterruptedException + * @throws IOException + */ + private boolean recoverableFromZKSessionExpiry() throws InterruptedException, IOException, KeeperException { + this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + address.getPort(), this); + if (becomeActiveMaster()) { + // Initialize ZK based trackers since we now have a new ZK session. + initilizeZKBasedSystemTrackers(); + // Update in-memory strutures to reflect our earlier Root/Meta assignment. + assignRootAndMeta(); + // process RIT if any + this.assignmentManager.processRegionsInTransition(); + return true; + } + return false; + } + + /** + * Check to see if the current trigger for abort is due to ZooKeeper session expiry, and If yes, whether we can + * recover from ZK session expiry. + * + * @param msg Original abort message + * @param t The cause for current abort request + * @return true if we should proceed with abort operation, false other wise. + */ + private boolean abortNow(final String msg, final Throwable t) { + boolean abortNow = true; + if (!this.isActiveMaster) { + return true; + } + if (t != null && t instanceof KeeperException.SessionExpiredException) { + try { + LOG.info("Primary Master trying to recover from ZooKeeper session expiry."); + return recoverableFromZKSessionExpiry() ? false : true; + } catch (Throwable newT) { + LOG.error("Primary master encountered unexpected exception while trying to recover from ZooKeeper session" + + " expiry. Proceeding with server abort.", newT); + abortNow = true; + } + } + return abortNow; + } + @Override public ZooKeeperWatcher getZooKeeper() { return zooKeeper; @@ -1105,6 +1193,10 @@ return catalogTracker; } + public ClusterStatusTracker getClusterStatusTracker() { + return clusterStatusTracker; + } + @Override public AssignmentManager getAssignmentManager() { return this.assignmentManager; @@ -1157,7 +1249,7 @@ /** * Report whether this master is currently the active master or not. * If not active master, we are parked on ZK waiting to become active. - * + *

    * This method is used for testing. * * @return true if active master, false if not. @@ -1170,7 +1262,7 @@ * Report whether this master has completed with its initialization and is * ready. If ready, the master is also the active master. A standby master * is never ready. - * + *

    * This method is used for testing. * * @return true if master is ready to go, false if not. @@ -1180,15 +1272,15 @@ } @Override - public void assign(final byte [] regionName, final boolean force) + public void assign(final byte[] regionName, final boolean force) - throws IOException { + throws IOException { if (cpHost != null) { if (cpHost.preAssign(regionName, force)) { return; } } Pair pair = - MetaReader.getRegion(this.catalogTracker, regionName); + MetaReader.getRegion(this.catalogTracker, regionName); if (pair == null) throw new UnknownRegionException(Bytes.toString(regionName)); assignRegion(pair.getFirst()); if (cpHost != null) { @@ -1201,15 +1293,15 @@ } @Override - public void unassign(final byte [] regionName, final boolean force) + public void unassign(final byte[] regionName, final boolean force) - throws IOException { + throws IOException { if (cpHost != null) { if (cpHost.preUnassign(regionName, force)) { return; } } Pair pair = - MetaReader.getRegion(this.catalogTracker, regionName); + MetaReader.getRegion(this.catalogTracker, regionName); if (pair == null) throw new UnknownRegionException(Bytes.toString(regionName)); HRegionInfo hri = pair.getFirst(); if (force) this.assignmentManager.clearRegionFromTransition(hri); @@ -1221,26 +1313,27 @@ /** * Utility for constructing an instance of the passed HMaster class. + * * @param masterClass * @param conf * @return HMaster instance. */ public static HMaster constructMaster(Class masterClass, - final Configuration conf) { + final Configuration conf) { try { Constructor c = - masterClass.getConstructor(Configuration.class); + masterClass.getConstructor(Configuration.class); return c.newInstance(conf); } catch (InvocationTargetException ite) { - Throwable target = ite.getTargetException() != null? + Throwable target = ite.getTargetException() != null ? - ite.getTargetException(): ite; + ite.getTargetException() : ite; if (target.getCause() != null) target = target.getCause(); throw new RuntimeException("Failed construction of Master: " + - masterClass.toString(), target); + masterClass.toString(), target); } catch (Exception e) { throw new RuntimeException("Failed construction of Master: " + - masterClass.toString() + ((e.getCause() != null)? + masterClass.toString() + ((e.getCause() != null) ? - e.getCause().getMessage(): ""), e); + e.getCause().getMessage() : ""), e); } } @@ -1248,7 +1341,7 @@ /** * @see org.apache.hadoop.hbase.master.HMasterCommandLine */ - public static void main(String [] args) throws Exception { + public static void main(String[] args) throws Exception { new HMasterCommandLine(HMaster.class).doMain(args); } } Index: src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java (date 1301171901000) +++ src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java (revision ) @@ -129,8 +129,8 @@ testSanity(); } - //@Test - public void disabledTestMasterSessionExpired() throws Exception { + @Test + public void testMasterSessionExpired() throws Exception { LOG.info("Starting testMasterSessionExpired"); TEST_UTIL.expireMasterSession(); testSanity(); Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (date 1301171901000) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision ) @@ -213,6 +213,10 @@ // Process list of dead servers processDeadServers(deadServers); // Check existing regions in transition + processRegionsInTransition(); + } + + public void processRegionsInTransition() throws KeeperException, IOException { List nodes = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode); if (nodes.isEmpty()) {