Index: src/main/java/org/apache/hadoop/hbase/HServerInfo.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HServerInfo.java (revision 1244405) +++ src/main/java/org/apache/hadoop/hbase/HServerInfo.java (working copy) @@ -295,4 +295,19 @@ return inServerName; } + public static HServerInfo getServerInfo(String serverName) { + if (serverName != null && serverName.length() > 0) { + String[] split = serverName.split(SERVERNAME_SEPARATOR); + if (split.length == 3) { + String hostName = split[0]; + int port = Integer.parseInt(split[1]); + long startCode = Long.parseLong(split[2]); + String hostAndPort = hostName + ":" + port; + HServerAddress serverAddress = new HServerAddress(hostAndPort); + return new HServerInfo(serverAddress, startCode, port, hostName); + } + } + return null; + } + } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenMasterInitializing.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenMasterInitializing.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenMasterInitializing.java (revision 0) @@ -0,0 +1,285 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.master.TestMasterFailover; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.RegionServerTracker; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestRSKilledWhenMasterInitializing { + private static final Log LOG = LogFactory.getLog(TestMasterFailover.class); + + private static final HBaseTestingUtility TESTUTIL = new HBaseTestingUtility(); + private static final int NUM_MASTERS = 1; + private static final int NUM_RS = 4; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Set it so that this test runs with my custom master + TESTUTIL.getConfiguration().setClass(HConstants.MASTER_IMPL, + TestingMaster.class, HMaster.class); + // Start up the cluster. + TESTUTIL.startMiniCluster(NUM_MASTERS, NUM_RS); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + if (!TESTUTIL.getHBaseCluster().getMaster().isInitialized()) { + // master is not initialized and is waiting something forever. + for (MasterThread mt : TESTUTIL.getHBaseCluster().getLiveMasterThreads()) { + mt.interrupt(); + } + } + TESTUTIL.shutdownMiniCluster(); + } + + /** + * An HMaster instance used in this test. If 'TestingMaster.sleep' is set in + * the Configuration, then we'll sleep after log is split and we'll also + * return a custom RegionServerTracker. + */ + public static class TestingMaster extends HMaster { + private boolean isLogSplitted = false; + + public TestingMaster(Configuration conf) throws IOException, + KeeperException, InterruptedException { + super(conf); + } + + @Override + public void splitLogAfterStartup(MasterFileSystem mfs, + Set knownServers) { + super.splitLogAfterStartup(mfs, knownServers); + isLogSplitted = true; + // If "TestingMaster.sleep" is set, sleep after log split. + if (getConfiguration().getBoolean("TestingMaster.sleep", false)) { + int duration = getConfiguration().getInt( + "TestingMaster.sleep.duration", 0); + Threads.sleep(duration); + } + } + + @Override + public RegionServerTracker createRegionServerTracker( + final ZooKeeperWatcher zkw, final Abortable a, final ServerManager sm) { + // If "TestingMaster.sleep", then return our custom RegionServerTracker + return getConfiguration().getBoolean("TestingMaster.sleep", false) ? new GatedNodeDeleteRegionServerTracker( + zkw, a, sm) : super.createRegionServerTracker(zkw, a, sm); + } + + public boolean isLogSplittedAfterStartup() { + return isLogSplitted; + } + } + + /** + * A RegionServerTracker whose delete we can stall. On nodeDeleted, it will + * block until the data member gate is cleared. + */ + static class GatedNodeDeleteRegionServerTracker extends RegionServerTracker { + final AtomicBoolean gate = new AtomicBoolean(true); + + public GatedNodeDeleteRegionServerTracker(ZooKeeperWatcher watcher, + Abortable abortable, ServerManager serverManager) { + super(watcher, abortable, serverManager); + } + + @Override + public void nodeDeleted(final String path) { + if (path.startsWith(watcher.rsZNode)) { + Thread t = new Thread() { + public void run() { + while (gate.get()) { + Threads.sleep(100); + } + GatedNodeDeleteRegionServerTracker.super.nodeDeleted(path); + } + }; + t.start(); + } + } + } + + @Test(timeout = 120000) + public void testCorrectnessWhenMasterFailOver() throws Exception { + final byte[] TABLENAME = Bytes.toBytes("testCorrectnessWhenMasterFailOver"); + final byte[] FAMILY = Bytes.toBytes("family"); + final byte[][] SPLITKEYS = { Bytes.toBytes("b"), Bytes.toBytes("i") }; + + MiniHBaseCluster cluster = TESTUTIL.getHBaseCluster(); + + HTableDescriptor desc = new HTableDescriptor(TABLENAME); + desc.addFamily(new HColumnDescriptor(FAMILY)); + HBaseAdmin hbaseAdmin = TESTUTIL.getHBaseAdmin(); + hbaseAdmin.createTable(desc, SPLITKEYS); + + assertTrue(hbaseAdmin.isTableAvailable(TABLENAME)); + + HTable table = new HTable(TESTUTIL.getConfiguration(), TABLENAME); + List puts = new ArrayList(); + Put put1 = new Put(Bytes.toBytes("a")); + put1.add(FAMILY, Bytes.toBytes("q1"), Bytes.toBytes("value")); + Put put2 = new Put(Bytes.toBytes("h")); + put2.add(FAMILY, Bytes.toBytes("q1"), Bytes.toBytes("value")); + Put put3 = new Put(Bytes.toBytes("o")); + put3.add(FAMILY, Bytes.toBytes("q1"), Bytes.toBytes("value")); + puts.add(put1); + puts.add(put2); + puts.add(put3); + table.put(puts); + ResultScanner resultScanner = table.getScanner(new Scan()); + int count = 0; + while (resultScanner.next() != null) { + count++; + } + resultScanner.close(); + table.close(); + assertEquals(3, count); + + /* Starting test */ + cluster.getConfiguration().setBoolean("TestingMaster.sleep", true); + cluster.getConfiguration().setInt("TestingMaster.sleep.duration", 10000); + + /* NO.1 .META. region correctness */ + // First abort master + for (MasterThread mt : cluster.getLiveMasterThreads()) { + if (mt.getMaster().isActiveMaster()) { + mt.getMaster().abort("Aborting for tests", new Exception("Trace info")); + mt.join(); + break; + } + } + LOG.debug("Master is aborted"); + TestingMaster master = (TestingMaster) cluster.startMaster().getMaster(); + while (!master.isLogSplittedAfterStartup()) { + Thread.sleep(1000); + } + + LOG.debug("splitted:" + master.isLogSplittedAfterStartup() + + ",initialized:" + master.isInitialized()); + + // Second kill meta server + int metaServerNum = cluster.getServerWithMeta(); + int rootServerNum = cluster.getServerWith(HRegionInfo.ROOT_REGIONINFO + .getRegionName()); + HRegionServer metaRS = cluster.getRegionServer(metaServerNum); + LOG.debug("Killing metaRS and carryingRoot = " + + (metaServerNum == rootServerNum)); + metaRS.kill(); + metaRS.join(); + + Thread.sleep(10000 * 2); + ((GatedNodeDeleteRegionServerTracker) master.getRegionServerTracker()).gate + .set(false); + + while (!master.isInitialized()) { + Thread.sleep(1000); + } + LOG.debug("master isInitialized"); + // Third check whether data is correct in meta region + assertTrue(hbaseAdmin.isTableAvailable(TABLENAME)); + + /* NO.2 -ROOT- region correctness */ + if (rootServerNum != metaServerNum) { + // First abort master + for (MasterThread mt : cluster.getLiveMasterThreads()) { + if (mt.getMaster().isActiveMaster()) { + mt.getMaster().abort("Aborting for tests", + new Exception("Trace info")); + mt.join(); + break; + } + } + LOG.debug("Master is aborted"); + master = (TestingMaster) cluster.startMaster().getMaster(); + while (!master.isLogSplittedAfterStartup()) { + Thread.sleep(1000); + } + LOG.debug("splitted:" + master.isLogSplittedAfterStartup() + + ",initialized:" + master.isInitialized()); + + // Second kill meta server + HRegionServer rootRS = cluster.getRegionServer(rootServerNum); + LOG.debug("Killing rootRS"); + rootRS.kill(); + rootRS.join(); + Thread.sleep(10000 * 2); + ((GatedNodeDeleteRegionServerTracker) master.getRegionServerTracker()).gate + .set(false); + + while (!master.isInitialized()) { + Thread.sleep(1000); + } + LOG.debug("master isInitialized"); + // Third check whether data is correct in meta region + assertTrue(hbaseAdmin.isTableAvailable(TABLENAME)); + } + + /* NO.3 data region correctness */ + ServerManager serverManager = cluster.getMaster().getServerManager(); + while (serverManager.areDeadServersInProgress()) { + Thread.sleep(1000); + } + table = new HTable(TESTUTIL.getConfiguration(), TABLENAME); + resultScanner = table.getScanner(new Scan()); + count = 0; + while (resultScanner.next() != null) { + count++; + } + resultScanner.close(); + table.close(); + assertEquals(3, count); + } + +} Index: src/main/java/org/apache/hadoop/hbase/master/DeadServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/DeadServer.java (revision 1244405) +++ src/main/java/org/apache/hadoop/hbase/master/DeadServer.java (working copy) @@ -20,17 +20,19 @@ package org.apache.hadoop.hbase.master; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.hbase.HServerInfo; /** - * Class to hold dead servers list and utility querying dead server list. + * Class to hold dead servers list, utility querying dead server list and the + * dead servers being processed by the ServerShutdownHandler. */ public class DeadServer implements Set { /** @@ -42,6 +44,19 @@ */ private final Set deadServers = new HashSet(); + private enum ServerType { + NORMAL, // Normal dead server + ROOT, // The dead server carried -ROOT- + META // The dead server carried .META. + } + + /** + * Dead servers under processing by the ServerShutdownHander. Map of + * ServerType to Set of dead servers being processed + */ + private final Map> deadServersInProgress + = new HashMap>(); + /** Maximum number of dead servers to keep track of */ private final int maxDeadServers; @@ -52,6 +67,9 @@ super(); this.maxDeadServers = maxDeadServers; this.numProcessing = 0; + for (ServerType svrType : ServerType.values()) { + deadServersInProgress.put(svrType, new HashSet()); + } } /** @@ -105,19 +123,59 @@ return numProcessing != 0; } + /** + * @return true if root server is being processed as dead. + */ + boolean isDeadRootServerInProgress() { + return !deadServersInProgress.get(ServerType.ROOT).isEmpty(); + } + + /** + * @return true if meta server is being processed as dead. + */ + boolean isDeadMetaServerInProgress() { + return !deadServersInProgress.get(ServerType.META).isEmpty(); + } + public synchronized Set clone() { Set clone = new HashSet(this.deadServers.size()); clone.addAll(this.deadServers); return clone; } + synchronized Set getDeadServersInProgress() { + Set clone = new HashSet(); + clone.addAll(this.deadServersInProgress.get(ServerType.NORMAL)); + return clone; + } + public synchronized boolean add(String e) { this.numProcessing++; + deadServersInProgress.get(ServerType.NORMAL).add(e); return deadServers.add(e); } + /** + * Add server to set of dead root servers if carryingRoot or set of dead meta + * servers if carryingMeta + * We don't need to increment numProcessing because add() has done that + */ + synchronized void add(String server, boolean carryingRoot, + boolean carryingMeta) { + if (carryingRoot) { + deadServersInProgress.get(ServerType.ROOT).add(server); + } + if (carryingMeta) { + deadServersInProgress.get(ServerType.META).add(server); + } + } + public synchronized void finish(String e) { this.numProcessing--; + for(Entry> deadServerInProgress: + deadServersInProgress.entrySet()){ + deadServerInProgress.getValue().remove(e); + } } public synchronized int size() { Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1244405) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -664,7 +664,7 @@ // Interrupt catalog tracker here in case any regions being opened out in // handlers are stuck waiting on meta or root. if (this.catalogTracker != null) this.catalogTracker.stop(); - if (this.fsOk) + if (!this.killed && this.fsOk) waitOnAllRegionsToClose(abortRequested); //fsOk flag may be changed when closing region throws exception. Index: src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java (revision 1244405) +++ src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java (working copy) @@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; -import java.util.Map; +import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -158,7 +158,7 @@ * @param onlineServers Map of online servers keyed by * {@link HServerInfo#getServerName()} */ - void splitLogAfterStartup(final Map onlineServers) { + void splitLogAfterStartup(final Set onlineServers) { Path logsDirPath = new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME); try { if (!this.fs.exists(logsDirPath)) { @@ -179,7 +179,7 @@ } for (FileStatus status : logFolders) { String serverName = status.getPath().getName(); - if (onlineServers.get(serverName) == null) { + if (!onlineServers.contains(serverName)) { LOG.info("Log folder " + status.getPath() + " doesn't belong " + "to a known region server, splitting"); splitLog(serverName); @@ -346,4 +346,13 @@ new Path(rootdir, region.getTableDesc().getNameAsString()), region.getEncodedName(), familyName), true); } + + /** + * return path to log dir if server's log dir exists. Otherwise null + */ + public Path getLogDirIfExists(String serverName) throws IOException { + Path serverLogDir = new Path(this.rootdir, + HLog.getHLogDirectoryName(serverName.toString())); + return this.fs.exists(serverLogDir) ? serverLogDir : null; + } } Index: src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java (revision 1244405) +++ src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java (working copy) @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; -import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.RetriesExhaustedException; @@ -227,6 +226,22 @@ } /** + * Method used by master on startup trying to figure state of cluster. Returns + * the current meta location unless its null. In this latter case, it has not + * yet been set so go check whats up in -ROOT- and return that. + * + * @return {@link ServerName} for server hosting .META. or if + * null, we'll read the location that is up in -ROOT- + * table (which could be null or just plain stale). + * @throws IOException + */ + public HServerAddress getMetaLocationOrReadLocationFromRoot() + throws IOException { + HServerAddress sn = getMetaLocation(); + return sn != null ? sn : MetaReader.getMetaRegionLocation(this); + } + + /** * Waits indefinitely for availability of -ROOT-. Used during * cluster startup. * @throws InterruptedException if interrupted while waiting Index: src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java (revision 1244405) +++ src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java (working copy) @@ -276,7 +276,22 @@ } /** + * Gets the location of .META. region by reading content of + * -ROOT-. + * + * @param ct + * @return location of .META. region as a {@link ServerName} or + * null if not found + * @throws IOException + */ + static HServerAddress getMetaRegionLocation(final CatalogTracker ct) + throws IOException { + return MetaReader.readMetaLocation(ct.waitForRootServerConnectionDefault()); + } + + /** * Reads the location of META from ROOT. + * * @param metaServer connection to server hosting ROOT * @return location of META in ROOT, null if not available * @throws IOException Index: src/main/java/org/apache/hadoop/hbase/master/ServerManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 1244405) +++ src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (working copy) @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -49,7 +50,6 @@ import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler; import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler; import org.apache.hadoop.hbase.master.metrics.MasterMetrics; -import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; /** * The ServerManager class manages info about region servers - HServerInfo, @@ -162,7 +162,7 @@ } } - private HServerInfo haveServerWithSameHostAndPortAlready(final String hostnamePort) { + HServerInfo haveServerWithSameHostAndPortAlready(final String hostnamePort) { synchronized (this.onlineServers) { for (Map.Entry e: this.onlineServers.entrySet()) { if (e.getValue().getHostnamePort().equals(hostnamePort)) { @@ -420,11 +420,22 @@ } } + /** + * @return Set of known dead servers. + */ public Set getDeadServers() { return this.deadservers.clone(); } /** + * @return Set of dead servers which are being processed by the + * ServerShutdownHander. + */ + public Set getDeadServersInProgress() { + return this.deadservers.getDeadServersInProgress(); + } + + /** * Checks if any dead servers are currently in progress. * @return true if any RS are being processed as dead, false if not */ @@ -433,6 +444,39 @@ } /** + * @return true if root server is being processed as dead. + */ + public boolean isDeadRootServerInProgress() { + return this.deadservers.isDeadRootServerInProgress(); + } + + /** + * @return true if meta server is being processed as dead. + */ + public boolean isDeadMetaServerInProgress() { + return this.deadservers.isDeadMetaServerInProgress(); + } + + /** + * @param address serverAddress + * @return name of dead servers in progress with the given address + */ + Set getDeadServerInProgressByAddress(HServerAddress address) { + Set deadServers = getDeadServersInProgress(); + Set returnServers = new HashSet(); + String givenServerName = HServerInfo.getServerName(address, 1); + String svrNameLessStartCode = + HServerInfo.getServerNameLessStartCode(givenServerName); + for (String deadServer : deadServers) { + if (HServerInfo.getServerNameLessStartCode(deadServer).equals( + svrNameLessStartCode)) { + returnServers.add(deadServer); + } + } + return returnServers; + } + + /** * @param hsa * @return The HServerInfo whose HServerAddress is hsa or null * if nothing found. @@ -538,6 +582,7 @@ boolean carryingMeta = address != null && hsi.getServerAddress().equals(address); if (carryingRoot || carryingMeta) { + this.deadservers.add(serverName, carryingRoot, carryingMeta); this.services.getExecutorService().submit(new MetaServerShutdownHandler(this.master, this.services, this.deadservers, info, carryingRoot, carryingMeta)); } else { Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1244405) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -26,13 +26,16 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -41,6 +44,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.HServerLoad; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; @@ -52,7 +56,6 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.catalog.MetaReader; -import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.MetaScanner; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.Result; @@ -302,6 +305,19 @@ LOG.info("HMaster main thread exiting"); } + /** + * Used testing + * + * @param zkw + * @param a + * @param sm + * @return Instance of RegionServerTracker + */ + public RegionServerTracker createRegionServerTracker( + final ZooKeeperWatcher zkw, final Abortable a, final ServerManager sm) { + return new RegionServerTracker(zkw, a, sm); + } + private void loop() { // Check if we should stop every second. Sleeper sleeper = new Sleeper(1000, this); @@ -310,6 +326,35 @@ } } + // Check zk for regionservers that are up but didn't register + private void checkZookeeperForUnregisteredRegionServers() + throws KeeperException { + for (String sn : this.regionServerTracker.getOnlineServerNames()) { + if (!this.serverManager.isServerOnline(sn)) { + HServerInfo serverInfo = HServerInfo.getServerInfo(sn); + if (serverInfo != null) { + HServerInfo existingServer = serverManager + .haveServerWithSameHostAndPortAlready(serverInfo + .getHostnamePort()); + if (existingServer == null) { + // Not registered; add it. + LOG.info("Registering server found up in zk but who has not yet " + + "reported in: " + sn); + // We set serverLoad with one region, it could differentiate with + // regionserver which is started just now + HServerLoad serverLoad = new HServerLoad(); + serverLoad.setNumberOfRegions(1); + serverInfo.setLoad(serverLoad); + this.serverManager.recordNewServer(serverInfo, true, null); + } + } else { + LOG.warn("Server " + sn + + " found up in zk, but is not a correct server name"); + } + } + } + } + /** * Finish initialization of HMaster after becoming the primary master. * @@ -355,8 +400,8 @@ this.balancer = new LoadBalancer(conf); zooKeeper.registerListenerFirst(assignmentManager); - this.regionServerTracker = new RegionServerTracker(zooKeeper, this, - this.serverManager); + this.regionServerTracker = createRegionServerTracker(this.zooKeeper, this, + this.serverManager); this.regionServerTracker.start(); // Set the cluster as up. If new RSs, they'll be waiting on this before @@ -377,9 +422,18 @@ // Wait for region servers to report in. Returns count of regions. int regionCount = this.serverManager.waitForRegionServers(); + checkZookeeperForUnregisteredRegionServers(); + + Set knownServers = new HashSet(); + knownServers.addAll(serverManager.getOnlineServers().keySet()); + if (this.serverManager.areDeadServersInProgress()) { + // Dead servers are processing, their logs would be split by + // ServerShutdownHandler + knownServers.addAll(serverManager.getDeadServersInProgress()); + } + // TODO: Should do this in background rather than block master startup - this.fileSystemManager. - splitLogAfterStartup(this.serverManager.getOnlineServers()); + splitLogAfterStartup(this.fileSystemManager, knownServers); // Make sure root and meta assigned before proceeding. assignRootAndMeta(); @@ -395,7 +449,7 @@ this.assignmentManager.assignAllUserRegions(); } else { LOG.info("Master startup proceeding: master failover"); - this.assignmentManager.processFailover(); + this.assignmentManager.processFailover(knownServers); } // Fixing up missing daughters if any @@ -410,7 +464,34 @@ LOG.info("Master has completed initialization"); initialized = true; } + + /** + * Used in tests + * @param mfs + * @param sm + */ + public void splitLogAfterStartup(final MasterFileSystem mfs, + final Set knownServers) { + mfs.splitLogAfterStartup(knownServers); + } + /* + * -ROOT- or .META. server may be processed as dead server. Before assigning + * them, we need to wait until its log is split. + * + * @param serverAddress address of -ROOT- or .META. server + */ + private void waitUntilNoLogDir(HServerAddress serverAddress) + throws InterruptedException, IOException { + if (serverAddress != null) { + Set deadServersInProgress = this.serverManager + .getDeadServerInProgressByAddress(serverAddress); + for (String deadServer : deadServersInProgress) { + waitUntilNoLogDir(deadServer); + } + } + } + /** * Check -ROOT- and .META. are assigned. If not, * assign them. @@ -428,9 +509,23 @@ boolean rit = this.assignmentManager. processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.ROOT_REGIONINFO); if (!catalogTracker.verifyRootRegionLocation(timeout)) { - this.assignmentManager.assignRoot(); + HServerAddress rootServerAddress = catalogTracker.getRootLocation(); + HServerInfo rootServerInfo = this.serverManager + .getHServerInfo(rootServerAddress); + if (rootServerInfo != null) { + HServerLoad rootServerLoad = rootServerInfo.getLoad(); + if (rootServerLoad != null && rootServerLoad.getNumberOfRegions() > 0) { + // If rootServer is online && not start just now, we expire it + this.serverManager.expireServer(rootServerInfo); + } + } + waitUntilNoLogDir(rootServerAddress); + + if (!this.serverManager.isDeadRootServerInProgress()) { + this.assignmentManager.assignRoot(); + } this.catalogTracker.waitForRoot(); - //This guarantees that the transition has completed + // This guarantees that the transition has completed this.assignmentManager.waitForAssignment(HRegionInfo.ROOT_REGIONINFO); assigned++; } @@ -444,7 +539,22 @@ rit = this.assignmentManager. processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO); if (!this.catalogTracker.verifyMetaRegionLocation(timeout)) { - this.assignmentManager.assignMeta(); + HServerAddress metaServerAddress = catalogTracker + .getMetaLocationOrReadLocationFromRoot(); + HServerInfo metaServerInfo = this.serverManager + .getHServerInfo(metaServerAddress); + if (metaServerInfo != null) { + HServerLoad metaServerLoad = metaServerInfo.getLoad(); + if (metaServerLoad != null && metaServerLoad.getNumberOfRegions() > 0 + && !catalogTracker.getRootLocation().equals(metaServerAddress)) { + // If metaServer is online && not start just now, we expire it + this.serverManager.expireServer(metaServerInfo); + } + } + waitUntilNoLogDir(metaServerAddress); + if (!this.serverManager.isDeadMetaServerInProgress()) { + this.assignmentManager.assignMeta(); + } this.catalogTracker.waitForMeta(); // Above check waits for general meta availability but this does not // guarantee that the transition has completed @@ -496,6 +606,29 @@ } } + /** + * Wait until server's log dir doesn't exist or time out. + * + */ + private void waitUntilNoLogDir(final String serverName) throws IOException, + InterruptedException { + LOG.debug("Wait for " + serverName + " log dir to not exist: " + + this.fileSystemManager.getLogDirIfExists(serverName)); + long waitTime = conf.getInt("hbase.master.meta.assignment.timeout", 300000); + long start = System.currentTimeMillis(); + for (; serverName != null; ) { + if (this.fileSystemManager.getLogDirIfExists(serverName) == null) { + break; + } + if (System.currentTimeMillis() - start > waitTime) { + throw new RuntimeException( + "Timed out waiting to finish splitting log for " + serverName); + } + Thread.sleep(500); + } + LOG.debug("Spent " + (System.currentTimeMillis()-start) + " ms waiting"); + } + /* * @return This masters' address. * @throws UnknownHostException @@ -520,6 +653,10 @@ return HBaseRPCProtocolVersion.versionID; } + public RegionServerTracker getRegionServerTracker() { + return this.regionServerTracker; + } + /** @return InfoServer object. Maybe null.*/ public InfoServer getInfoServer() { return this.infoServer; Index: src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java (revision 1244405) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java (working copy) @@ -98,4 +98,14 @@ public List getOnlineServers() throws KeeperException { return ZKUtil.listChildrenAndGetAsAddresses(watcher, watcher.rsZNode); } + + /** + * Gets the online servers from zookeeper. + * + * @return list of online servers from zk + * @throws KeeperException + */ + public List getOnlineServerNames() throws KeeperException { + return ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode); + } } Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1244405) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.RegionTransitionData; +import org.apache.hadoop.hbase.master.AssignmentManager.RegionState; import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler; import org.apache.hadoop.hbase.master.handler.DisableTableHandler; @@ -198,11 +199,14 @@ /** * Handle failover. Restore state from META and ZK. Handle any regions in * transition. Presumes .META. and -ROOT- deployed. + * @param onlineServers onlined servers when master starts, + * including dead servers in progress * @throws KeeperException * @throws IOException * @throws InterruptedException */ - void processFailover() throws KeeperException, IOException, InterruptedException { + void processFailover(final Set onlineServers) + throws KeeperException, IOException, InterruptedException { // Concurrency note: In the below the accesses on regionsInTransition are // outside of a synchronization block where usually all accesses to RIT are // synchronized. The presumption is that in this case it is safe since this @@ -223,7 +227,7 @@ // Scan META to build list of existing regions, servers, and assignment // Returns servers who have not checked in (assumed dead) and their regions Map>> deadServers = - rebuildUserRegions(); + rebuildUserRegions(onlineServers); // Process list of dead servers; note this will add regions to the RIT. // processRegionsInTransition will read them and assign them out. processDeadServers(deadServers); @@ -261,6 +265,10 @@ LOG.info("No regions in transition in ZK to process on failover"); return; } + // Remove regions in RIT, they are may being processed by the SSH. + synchronized (regionsInTransition) { + nodes.removeAll(regionsInTransition.keySet()); + } LOG.info("Failed-over master needs to process " + nodes.size() + " regions in transition"); for (String encodedRegionName: nodes) { @@ -1614,12 +1622,15 @@ *

* Returns a map of servers that are not found to be online and the regions * they were hosting. + * @param onlineServers if one region's location belongs to onlineServers, it + * doesn't need to be assigned * @return map of servers not online to their assigned regions, as stored * in META * @throws IOException * @throws KeeperException */ - private Map>> rebuildUserRegions() + private Map>> rebuildUserRegions( + final Set onlineServers) throws IOException, KeeperException { // Region assignment from META List results = MetaReader.fullScanOfResults(catalogTracker); @@ -1652,7 +1663,7 @@ // need to enable the table if not disabled or disabling // this will be used in rolling restarts enableTableIfNotDisabledOrDisabling(disabled, disabling, tableName); - } else if (!serverManager.isServerOnline(regionLocation.getServerName())) { + } else if (!onlineServers.contains(regionLocation.getServerName())) { // Region is located on a server that isn't online List> offlineRegions = offlineServers.get(regionLocation.getServerName());