Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (date 1301171901000) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision ) @@ -25,7 +25,6 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; @@ -98,11 +97,11 @@ * run the cluster. All others park themselves in their constructor until * master or cluster shutdown or until the active master loses its lease in * zookeeper. Thereafter, all running master jostle to take over master role. - * + *
*The Master can be asked shutdown the cluster. See {@link #shutdown()}. In * this case it will tell all regionservers to go down and then wait on them * all reporting in that they are down. This master will then shut itself down. - * + *
*You can also shutdown just this master. Call {@link #stopMaster()}. * * @see HMasterInterface @@ -110,7 +109,7 @@ * @see Watcher */ public class HMaster extends Thread -implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { + implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { private static final Log LOG = LogFactory.getLog(HMaster.class.getName()); // MASTER is name of the webapp and the attribute name used stuffing this @@ -175,18 +174,19 @@ /** * Initializes the HMaster. The steps are as follows: - *
+ *
*+ *
* Remaining steps of initialization occur in {@link #run()} so that they * run in their own thread rather than within the context of the constructor. + * * @throws InterruptedException */ public HMaster(final Configuration conf) - throws IOException, KeeperException, InterruptedException { + throws IOException, KeeperException, InterruptedException { this.conf = conf; /* @@ -196,12 +196,12 @@ HServerAddress a = new HServerAddress(getMyAddress(this.conf)); int numHandlers = conf.getInt("hbase.regionserver.handler.count", 10); this.rpcServer = HBaseRPC.getServer(this, - new Class>[]{HMasterInterface.class, HMasterRegionInterface.class}, - a.getBindAddress(), a.getPort(), - numHandlers, - 0, // we dont use high priority handlers in master - false, conf, - 0); // this is a DNC w/o high priority handlers + new Class>[]{HMasterInterface.class, HMasterRegionInterface.class}, + a.getBindAddress(), a.getPort(), + numHandlers, + 0, // we dont use high priority handlers in master + false, conf, + 0); // this is a DNC w/o high priority handlers this.address = new HServerAddress(rpcServer.getListenerAddress()); // initialize server principal (if using secure Hadoop) @@ -219,7 +219,7 @@ // config param for task trackers, but we can piggyback off of it. if (this.conf.get("mapred.task.id") == null) { this.conf.set("mapred.task.id", "hb_m_" + this.address.toString() + - "_" + System.currentTimeMillis()); + "_" + System.currentTimeMillis()); } this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + @@ -231,25 +231,26 @@ /** * Stall startup if we are designated a backup master; i.e. we want someone * else to become the master before proceeding. + * * @param c * @param amm * @throws InterruptedException */ private static void stallIfBackupMaster(final Configuration c, - final ActiveMasterManager amm) - throws InterruptedException { + final ActiveMasterManager amm) + throws InterruptedException { // If we're a backup master, stall until a primary to writes his address if (!c.getBoolean(HConstants.MASTER_TYPE_BACKUP, - HConstants.DEFAULT_MASTER_TYPE_BACKUP)) { + HConstants.DEFAULT_MASTER_TYPE_BACKUP)) { return; } LOG.debug("HMaster started in backup mode. " + - "Stalling until master znode is written."); + "Stalling until master znode is written."); // This will only be a minute or so while the cluster starts up, // so don't worry about setting watches on the parent znode while (!amm.isActiveMaster()) { LOG.debug("Waiting for master address ZNode to be written " + - "(Also watching cluster state node)"); + "(Also watching cluster state node)"); Thread.sleep(c.getInt("zookeeper.session.timeout", 180 * 1000)); } } @@ -276,10 +277,8 @@ * now wait until it dies to try and become the next active master. If we * do not succeed on our first attempt, this is no longer a cluster startup. */ - this.activeMasterManager = new ActiveMasterManager(zooKeeper, address, this); - this.zooKeeper.registerListener(activeMasterManager); - stallIfBackupMaster(this.conf, this.activeMasterManager); - this.activeMasterManager.blockUntilBecomingActiveMaster(); + becomeActiveMaster(); + // We are either the active master or we were asked to shutdown if (!this.stopped) { finishInitialization(); @@ -307,6 +306,50 @@ LOG.info("HMaster main thread exiting"); } + /** + * Try becoming active master. + * @return + * @throws InterruptedException + */ + private boolean becomeActiveMaster() throws InterruptedException { + this.activeMasterManager = new ActiveMasterManager(zooKeeper, address, this); + this.zooKeeper.registerListener(activeMasterManager); + stallIfBackupMaster(this.conf, this.activeMasterManager); + return this.activeMasterManager.blockUntilBecomingActiveMaster(); + } + + /** + * Initilize all ZK based system trackers. + * @throws IOException + * @throws InterruptedException + */ + private void initilizeZKBasedSystemTrackers() throws IOException, InterruptedException, KeeperException { + this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection, + this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE)); + this.catalogTracker.start(); + + this.assignmentManager = new AssignmentManager(this, serverManager, + this.catalogTracker, this.executorService); + this.balancer = new LoadBalancer(conf); + zooKeeper.registerListenerFirst(assignmentManager); + + this.regionServerTracker = new RegionServerTracker(zooKeeper, this, + this.serverManager); + this.regionServerTracker.start(); + + // Set the cluster as up. If new RSs, they'll be waiting on this before + // going ahead with their startup. + this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this); + this.clusterStatusTracker.start(); + boolean wasUp = this.clusterStatusTracker.isClusterUp(); + if (!wasUp) this.clusterStatusTracker.setClusterUp(); + + LOG.info("Server active/primary master; " + this.address + + ", sessionid=0x" + + Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()) + + ", cluster-up flag was=" + wasUp); + } + private void loop() { // Check if we should stop every second. Sleeper sleeper = new Sleeper(1000, this); @@ -317,12 +360,12 @@ /** * Finish initialization of HMaster after becoming the primary master. - * + * *-ROOT- and .META. are assigned. If not,
* assign them.
+ *
+ * @return Count of regions we assigned.
* @throws InterruptedException
* @throws IOException
* @throws KeeperException
- * @return Count of regions we assigned.
*/
int assignRootAndMeta()
- throws InterruptedException, IOException, KeeperException {
+ throws InterruptedException, IOException, KeeperException {
int assigned = 0;
long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
// Work on ROOT region. Is it in zk in transition?
boolean rit = this.assignmentManager.
- processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.ROOT_REGIONINFO);
+ processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.ROOT_REGIONINFO);
if (!catalogTracker.verifyRootRegionLocation(timeout)) {
this.assignmentManager.assignRoot();
this.catalogTracker.waitForRoot();
assigned++;
} else {
// Region already assigned. We didnt' assign it. Add to in-memory state.
+ LOG.info("-ROOT- already assigned. Updating in-memory state");
this.assignmentManager.regionOnline(HRegionInfo.ROOT_REGIONINFO,
- this.serverManager.getHServerInfo(this.catalogTracker.getRootLocation()));
+ this.serverManager.getHServerInfo(this.catalogTracker.getRootLocation()));
}
LOG.info("-ROOT- assigned=" + assigned + ", rit=" + rit +
- ", location=" + catalogTracker.getRootLocation());
+ ", location=" + catalogTracker.getRootLocation());
// Work on meta region
rit = this.assignmentManager.
- processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO);
+ processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO);
if (!this.catalogTracker.verifyMetaRegionLocation(timeout)) {
this.assignmentManager.assignMeta();
this.catalogTracker.waitForMeta();
@@ -457,11 +480,12 @@
assigned++;
} else {
// Region already assigned. We didnt' assign it. Add to in-memory state.
+ LOG.info(".META. already assigned. Updating in-memory state");
this.assignmentManager.regionOnline(HRegionInfo.FIRST_META_REGIONINFO,
- this.serverManager.getHServerInfo(this.catalogTracker.getMetaLocation()));
+ this.serverManager.getHServerInfo(this.catalogTracker.getMetaLocation()));
}
LOG.info(".META. assigned=" + assigned + ", rit=" + rit +
- ", location=" + catalogTracker.getMetaLocation());
+ ", location=" + catalogTracker.getMetaLocation());
return assigned;
}
@@ -470,16 +494,18 @@
* @throws UnknownHostException
*/
private static String getMyAddress(final Configuration c)
- throws UnknownHostException {
+ throws UnknownHostException {
// Find out our address up in DNS.
- String s = DNS.getDefaultHost(c.get("hbase.master.dns.interface","default"),
+ String s = DNS.getDefaultHost(c.get("hbase.master.dns.interface", "default"),
- c.get("hbase.master.dns.nameserver","default"));
+ c.get("hbase.master.dns.nameserver", "default"));
s += ":" + c.get(HConstants.MASTER_PORT,
Integer.toString(HConstants.DEFAULT_MASTER_PORT));
return s;
}
- /** @return HServerAddress of the master server */
+ /**
+ * @return HServerAddress of the master server
+ */
public HServerAddress getMasterAddress() {
return this.address;
}
@@ -488,7 +514,9 @@
return HMasterInterface.VERSION;
}
- /** @return InfoServer object. Maybe null.*/
+ /**
+ * @return InfoServer object. Maybe null.
+ */
public InfoServer getInfoServer() {
return this.infoServer;
}
@@ -515,6 +543,7 @@
/**
* Get the ZK wrapper object - needed by master_jsp.java
+ *
* @return the zookeeper wrapper
*/
public ZooKeeperWatcher getZooKeeperWatcher() {
@@ -532,24 +561,26 @@
try {
// Start the executor service pools
this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
- conf.getInt("hbase.master.executor.openregion.threads", 5));
+ conf.getInt("hbase.master.executor.openregion.threads", 5));
this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
- conf.getInt("hbase.master.executor.closeregion.threads", 5));
+ conf.getInt("hbase.master.executor.closeregion.threads", 5));
this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
- conf.getInt("hbase.master.executor.serverops.threads", 3));
+ conf.getInt("hbase.master.executor.serverops.threads", 3));
this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
- conf.getInt("hbase.master.executor.serverops.threads", 5));
+ conf.getInt("hbase.master.executor.serverops.threads", 5));
// We depend on there being only one instance of this executor running
// at a time. To do concurrency, would need fencing of enable/disable of
// tables.
this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
+ //this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 3);
+
// Start log cleaner thread
String n = Thread.currentThread().getName();
this.logCleaner =
- new LogCleaner(conf.getInt("hbase.master.cleaner.interval", 60 * 1000),
- this, conf, getMasterFileSystem().getFileSystem(),
- getMasterFileSystem().getOldLogDir());
+ new LogCleaner(conf.getInt("hbase.master.cleaner.interval", 60 * 1000),
+ this, conf, getMasterFileSystem().getFileSystem(),
+ getMasterFileSystem().getOldLogDir());
Threads.setDaemonThreadRunning(logCleaner, n + ".oldLogCleaner");
// Put up info server.
@@ -567,7 +598,7 @@
}
} catch (IOException e) {
if (e instanceof RemoteException) {
- e = ((RemoteException)e).unwrapRemoteException();
+ e = ((RemoteException) e).unwrapRemoteException();
}
// Something happened during startup. Shut things down.
abort("Failed startup", e);
@@ -580,7 +611,7 @@
}
if (this.rpcServer != null) this.rpcServer.stop();
// Clean up and close up shop
- if (this.logCleaner!= null) this.logCleaner.interrupt();
+ if (this.logCleaner != null) this.logCleaner.interrupt();
if (this.infoServer != null) {
LOG.info("Stopping infoServer");
try {
@@ -595,7 +626,7 @@
private static Thread getAndStartBalancerChore(final HMaster master) {
String name = master.getServerName() + "-BalancerChore";
int balancerPeriod =
- master.getConfiguration().getInt("hbase.balancer.period", 300000);
+ master.getConfiguration().getInt("hbase.balancer.period", 300000);
// Start up the load balancer chore
Chore chore = new Chore(name, balancerPeriod, master) {
@Override
@@ -617,8 +648,8 @@
@Override
public MapWritable regionServerStartup(final HServerInfo serverInfo,
- final long serverCurrentTime)
- throws IOException {
+ final long serverCurrentTime)
+ throws IOException {
// Set the ip into the passed in serverInfo. Its ip is more than likely
// not the ip that the master sees here. See at end of this method where
// we pass it back to the regionserver by setting "hbase.regionserver.address"
@@ -633,14 +664,14 @@
this.serverManager.regionServerStartup(serverInfo, serverCurrentTime);
// Send back some config info
MapWritable mw = createConfigurationSubset();
- mw.put(new Text("hbase.regionserver.address"),
- serverInfo.getServerAddress());
+ mw.put(new Text("hbase.regionserver.address"),
+ serverInfo.getServerAddress());
return mw;
}
/**
* @return Subset of configuration to pass initializing regionservers: e.g.
- * the filesystem to use and root directory to use.
+ * the filesystem to use and root directory to use.
*/
protected MapWritable createConfigurationSubset() {
MapWritable mw = addConfig(new MapWritable(), HConstants.HBASE_DIR);
@@ -653,22 +684,23 @@
}
@Override
- public HMsg [] regionServerReport(HServerInfo serverInfo, HMsg msgs[],
+ public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[],
- HRegionInfo[] mostLoadedRegions)
- throws IOException {
+ HRegionInfo[] mostLoadedRegions)
+ throws IOException {
return adornRegionServerAnswer(serverInfo,
- this.serverManager.regionServerReport(serverInfo, msgs, mostLoadedRegions));
+ this.serverManager.regionServerReport(serverInfo, msgs, mostLoadedRegions));
}
/**
* Override if you'd add messages to return to regionserver hsi
* or to send an exception.
+ *
* @param msgs Messages to add to
* @return Messages to return to
* @throws IOException exceptions that were injected for the region servers
*/
- protected HMsg [] adornRegionServerAnswer(final HServerInfo hsi,
+ protected HMsg[] adornRegionServerAnswer(final HServerInfo hsi,
- final HMsg [] msgs) throws IOException {
+ final HMsg[] msgs) throws IOException {
return msgs;
}
@@ -681,11 +713,11 @@
*/
private int getBalancerCutoffTime() {
int balancerCutoffTime =
- getConfiguration().getInt("hbase.balancer.max.balancing", -1);
+ getConfiguration().getInt("hbase.balancer.max.balancing", -1);
if (balancerCutoffTime == -1) {
// No time period set so create one -- do half of balancer period.
int balancerPeriod =
- getConfiguration().getInt("hbase.balancer.period", 300000);
+ getConfiguration().getInt("hbase.balancer.period", 300000);
balancerCutoffTime = balancerPeriod / 2;
// If nonsense period, set it to balancerPeriod
if (balancerCutoffTime <= 0) balancerCutoffTime = balancerPeriod;
@@ -704,15 +736,15 @@
// Only allow one balance run at at time.
if (this.assignmentManager.isRegionsInTransition()) {
LOG.debug("Not running balancer because " +
- this.assignmentManager.getRegionsInTransition().size() +
- " region(s) in transition: " +
- org.apache.commons.lang.StringUtils.
- abbreviate(this.assignmentManager.getRegionsInTransition().toString(), 256));
+ this.assignmentManager.getRegionsInTransition().size() +
+ " region(s) in transition: " +
+ org.apache.commons.lang.StringUtils.
+ abbreviate(this.assignmentManager.getRegionsInTransition().toString(), 256));
return false;
}
if (this.serverManager.areDeadServersInProgress()) {
LOG.debug("Not running balancer because processing dead regionserver(s): " +
- this.serverManager.getDeadServers());
+ this.serverManager.getDeadServers());
return false;
}
@@ -729,9 +761,9 @@
}
Map