diff --git src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java index 5f2f148..58c61ee 100644 --- src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java +++ src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java @@ -118,7 +118,7 @@ public class CatalogTracker { */ private ServerName metaLocation; - private boolean stopped = false; + private volatile boolean stopped = false; static final byte [] ROOT_REGION_NAME = HRegionInfo.ROOT_REGIONINFO.getRegionName(); diff --git src/main/java/org/apache/hadoop/hbase/master/HMaster.java src/main/java/org/apache/hadoop/hbase/master/HMaster.java index cf6b4fc..8658c12 100644 --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -41,9 +41,6 @@ import java.util.concurrent.atomic.AtomicReference; import javax.management.ObjectName; -import com.google.common.collect.ClassToInstanceMap; -import com.google.common.collect.Maps; -import com.google.common.collect.MutableClassToInstanceMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -129,6 +126,10 @@ import org.apache.hadoop.net.DNS; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; +import com.google.common.collect.ClassToInstanceMap; +import com.google.common.collect.Maps; +import com.google.common.collect.MutableClassToInstanceMap; + /** * HMaster is the "master server" for HBase. An HBase cluster has one active * master. If many masters are started, all compete. Whichever wins goes on to @@ -253,6 +254,12 @@ Server { /** The health check chore. */ private HealthCheckChore healthCheckChore; + /** flag when true, Master waits for log splitting complete before start up */ + private boolean waitingOnLogSplitting = false; + + /** flag used in test cases in order to simulate RS failures during master initialization */ + private volatile boolean initializationBeforeMetaAssignment = false; + /** * Initializes the HMaster. The steps are as follows: *

@@ -336,7 +343,9 @@ Server { if (isHealthCheckerConfigured()) { healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); } + this.shouldSplitMetaSeparately = conf.getBoolean(HLog.SEPARATE_HLOG_FOR_META, false); + waitingOnLogSplitting = this.conf.getBoolean("hbase.master.wait.for.log.splitting", false); } /** @@ -579,14 +588,49 @@ Server { if (!masterRecovery) { this.assignmentManager.startTimeOutMonitor(); } - // TODO: Should do this in background rather than block master startup - status.setStatus("Splitting logs after master startup"); - splitLogAfterStartup(this.fileSystemManager); - // Make sure root and meta assigned before proceeding. - assignRootAndMeta(status); + // get a list for previously failed RS which need recovery work + Set failedServers = this.fileSystemManager.getFailedServersFromLogFolders(); + if (waitingOnLogSplitting) { + List servers = new ArrayList(failedServers); + this.fileSystemManager.splitAllLogs(servers); + failedServers.clear(); + } + + ServerName preRootServer = this.catalogTracker.getRootLocation(); + if (preRootServer != null && failedServers.contains(preRootServer)) { + // create recovered edits file for _ROOT_ server + this.fileSystemManager.splitAllLogs(preRootServer); + failedServers.remove(preRootServer); + } + + this.initializationBeforeMetaAssignment = true; + // Make sure root assigned before proceeding. + assignRoot(status); + + // log splitting for .META. server + ServerName preMetaServer = this.catalogTracker.getMetaLocationOrReadLocationFromRoot(); + if (preMetaServer != null && failedServers.contains(preMetaServer)) { + // create recovered edits file for .META. server + this.fileSystemManager.splitAllLogs(preMetaServer); + failedServers.remove(preMetaServer); + } + + // SSH should enabled for ROOT before META region assignment + // because META region assignment is depending on ROOT server online. + this.serverManager.enableSSHForRoot(); + + // Make sure meta assigned before proceeding. + assignMeta(status, preRootServer); + enableServerShutdownHandler(); + // handle other dead servers in SSH + status.setStatus("Submit log splitting work of non-meta region servers"); + for (ServerName curServer : failedServers) { + this.serverManager.expireServer(curServer); + } + // Update meta with new HRI if required. i.e migrate all HRI with HTD to // HRI with out HTD in meta and update the status in ROOT. This must happen // before we assign all user regions or else the assignment will fail. @@ -658,22 +702,13 @@ Server { } /** - * Override to change master's splitLogAfterStartup. Used testing - * @param mfs - */ - protected void splitLogAfterStartup(final MasterFileSystem mfs) { - mfs.splitLogAfterStartup(); - } - - /** - * Check -ROOT- and .META. are assigned. If not, - * assign them. + * Check -ROOT- is assigned. If not, assign it. + * @param status MonitoredTask * @throws InterruptedException * @throws IOException * @throws KeeperException - * @return Count of regions we assigned. */ - int assignRootAndMeta(MonitoredTask status) + private void assignRoot(MonitoredTask status) throws InterruptedException, IOException, KeeperException { int assigned = 0; long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000); @@ -704,16 +739,32 @@ Server { LOG.info("-ROOT- assigned=" + assigned + ", rit=" + rit + ", location=" + catalogTracker.getRootLocation()); - // Work on meta region + status.setStatus("ROOT assigned."); + } + + /** + * Check .META. is assigned. If not, assign it. + * @param status MonitoredTask + * @param previousRootServer ServerName of previous root region server before current start up + * @return + * @throws InterruptedException + * @throws IOException + * @throws KeeperException + */ + private void assignMeta(MonitoredTask status, ServerName previousRootServer) + throws InterruptedException, + IOException, KeeperException { + int assigned = 0; + long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000); + status.setStatus("Assigning META region"); - rit = this.assignmentManager. - processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO); + boolean rit = + this.assignmentManager + .processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO); boolean metaRegionLocation = this.catalogTracker.verifyMetaRegionLocation(timeout); if (!rit && !metaRegionLocation) { - ServerName currentMetaServer = - this.catalogTracker.getMetaLocationOrReadLocationFromRoot(); - if (currentMetaServer != null - && !currentMetaServer.equals(currentRootServer)) { + ServerName currentMetaServer = this.catalogTracker.getMetaLocationOrReadLocationFromRoot(); + if (currentMetaServer != null && !currentMetaServer.equals(previousRootServer)) { splitLogAndExpireIfOnline(currentMetaServer); } assignmentManager.assignMeta(); @@ -723,15 +774,14 @@ Server { enableSSHandWaitForMeta(); assigned++; } else { - // Region already assigned. We didnt' assign it. Add to in-memory state. + // Region already assigned. We didnt' assign it. Add to in-memory state. this.assignmentManager.regionOnline(HRegionInfo.FIRST_META_REGIONINFO, this.catalogTracker.getMetaLocation()); } enableCatalogTables(Bytes.toString(HConstants.META_TABLE_NAME)); - LOG.info(".META. assigned=" + assigned + ", rit=" + rit + - ", location=" + catalogTracker.getMetaLocation()); - status.setStatus("META and ROOT assigned."); - return assigned; + LOG.info(".META. assigned=" + assigned + ", rit=" + rit + ", location=" + + catalogTracker.getMetaLocation()); + status.setStatus("META assigned."); } private void enableSSHandWaitForMeta() throws IOException, @@ -790,8 +840,7 @@ Server { } /** - * Split a server's log and expire it if we find it is one of the online - * servers. + * Expire a server if we find it is one of the online servers. * @param sn ServerName to check. * @throws IOException */ @@ -1661,12 +1710,23 @@ Server { } if (this.assignmentManager != null) this.assignmentManager.shutdown(); if (this.serverManager != null) this.serverManager.shutdownCluster(); + try { if (this.clusterStatusTracker != null){ this.clusterStatusTracker.setClusterDown(); } } catch (KeeperException e) { - LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e); + if (e instanceof KeeperException.SessionExpiredException) { + LOG.warn("ZK session expired. Retry a new connection..."); + try { + this.zooKeeper.reconnectAfterExpiration(); + this.clusterStatusTracker.setClusterDown(); + } catch (Exception ex) { + LOG.warn("Retry setClusterDown failed", ex); + } + } else { + LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e); + } } } @@ -1749,6 +1809,14 @@ Server { return this.shouldSplitMetaSeparately; } + /** + * Report whether this master has started initialization and is about to do meta region assignment + * @return true if master is in initialization & about to assign ROOT & META regions + */ + public boolean isInitializationStartsMetaRegoinAssignment() { + return this.initializationBeforeMetaAssignment; + } + @Override @Deprecated public void assign(final byte[] regionName, final boolean force) diff --git src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index 31c1e9b..74a37cf 100644 --- src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.UUID; @@ -211,30 +212,31 @@ public class MasterFileSystem { } /** - * Inspect the log directory to recover any log file without - * an active region server. + * Inspect the log directory to find dead servers which need log splitting */ - void splitLogAfterStartup() { + Set getFailedServersFromLogFolders() { boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors", - HLog.SPLIT_SKIP_ERRORS_DEFAULT); + HLog.SPLIT_SKIP_ERRORS_DEFAULT); + + Set serverNames = new HashSet(); Path logsDirPath = new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME); + do { if (master.isStopped()) { - LOG.warn("Master stopped while splitting logs"); + LOG.warn("Master stopped while trying to get failed servers."); break; } - List serverNames = new ArrayList(); try { - if (!this.fs.exists(logsDirPath)) return; + if (!this.fs.exists(logsDirPath)) return serverNames; FileStatus[] logFolders = FSUtils.listStatus(this.fs, logsDirPath, null); // Get online servers after getting log folders to avoid log folder deletion of newly // checked in region servers . see HBASE-5916 - Set onlineServers = ((HMaster) master).getServerManager().getOnlineServers() - .keySet(); + Set onlineServers = + ((HMaster) master).getServerManager().getOnlineServers().keySet(); if (logFolders == null || logFolders.length == 0) { LOG.debug("No log files to split, proceeding..."); - return; + return serverNames; } for (FileStatus status : logFolders) { String sn = status.getPath().getName(); @@ -248,27 +250,19 @@ public class MasterFileSystem { + "to a known region server, splitting"); serverNames.add(serverName); } else { - LOG.info("Log folder " + status.getPath() - + " belongs to an existing region server"); + LOG.info("Log folder " + status.getPath() + " belongs to an existing region server"); } } - if (services.shouldSplitMetaSeparately()) { - splitLog(serverNames, META_FILTER); - splitLog(serverNames, NON_META_FILTER); - } else { - splitAllLogs(serverNames); - } retrySplitting = false; } catch (IOException ioe) { - LOG.warn("Failed splitting of " + serverNames, ioe); + LOG.warn("Failed getting failed servers to be recovered.", ioe); if (!checkFileSystem()) { LOG.warn("Bad Filesystem, exiting"); Runtime.getRuntime().halt(1); } try { if (retrySplitting) { - Thread.sleep(conf.getInt( - "hbase.hlog.split.failure.retry.interval", 30 * 1000)); + Thread.sleep(conf.getInt("hbase.hlog.split.failure.retry.interval", 30 * 1000)); } } catch (InterruptedException e) { LOG.warn("Interrupted, aborting since cannot return w/o splitting"); @@ -278,6 +272,8 @@ public class MasterFileSystem { } } } while (retrySplitting); + + return serverNames; } public void splitLog(final ServerName serverName) throws IOException { diff --git src/main/java/org/apache/hadoop/hbase/master/ServerManager.java src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 0397020..20cb122 100644 --- src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -119,6 +119,12 @@ public class ServerManager { private Set deadNotExpiredServers = new HashSet(); /** + * Flag to enable SSH for ROOT region server. It's used in master initialization to enable SSH for + * ROOT before META assignment. + */ + private boolean isSSHForRootEnabled = false; + + /** * Constructor. * @param master * @param services @@ -373,7 +379,8 @@ public class ServerManager { * shutdown processing. */ public synchronized void expireServer(final ServerName serverName) { - if (!services.isServerShutdownHandlerEnabled()) { + boolean carryingRoot = services.getAssignmentManager().isCarryingRoot(serverName); + if (!services.isServerShutdownHandlerEnabled() && (!carryingRoot || !this.isSSHForRootEnabled)) { LOG.info("Master doesn't enable ServerShutdownHandler during initialization, " + "delay expiring server " + serverName); this.deadNotExpiredServers.add(serverName); @@ -382,7 +389,6 @@ public class ServerManager { if (!this.onlineServers.containsKey(serverName)) { LOG.warn("Received expiration of " + serverName + " but server is not currently online"); - return; } if (this.deadservers.contains(serverName)) { // TODO: Can this happen? It shouldn't be online in this case? @@ -410,7 +416,6 @@ public class ServerManager { return; } - boolean carryingRoot = services.getAssignmentManager().isCarryingRoot(serverName); boolean carryingMeta = services.getAssignmentManager().isCarryingMeta(serverName); if (carryingRoot || carryingMeta) { this.services.getExecutorService().submit(new MetaServerShutdownHandler(this.master, @@ -440,6 +445,26 @@ public class ServerManager { } } + /** + * Enable SSH for ROOT region server and expire ROOT which died during master's initialization. It + * will be called before Meta assignment. + * @throws IOException + */ + void enableSSHForRoot() throws IOException { + if (this.isSSHForRootEnabled) { + return; + } + this.isSSHForRootEnabled = true; + Iterator serverIterator = deadNotExpiredServers.iterator(); + while (serverIterator.hasNext()) { + ServerName curServerName = serverIterator.next(); + if (services.getAssignmentManager().isCarryingRoot(curServerName)) { + expireServer(curServerName); + serverIterator.remove(); + } + } + } + /* * Remove the server from the drain list. */ diff --git src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java index 198d6f4..b9ebdf4 100644 --- src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java +++ src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java @@ -84,6 +84,12 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler { LOG.info("ROOT has been assigned to otherwhere, skip assigning."); } } + + if(!this.services.isServerShutdownHandlerEnabled()) { + // resubmit in case we're in master initialization and SSH hasn't been enabled yet. + this.services.getExecutorService().submit(this); + this.deadServers.add(serverName); + } // Carrying meta? if (isCarryingMeta()) { diff --git src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java index 2be3a84..41efd27 100644 --- src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java +++ src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.zookeeper.KeeperException; @@ -116,6 +117,10 @@ public class ServerShutdownHandler extends EventHandler { public void process() throws IOException { final ServerName serverName = this.serverName; try { + if (this.server.isStopped()) { + throw new IOException("Server is stopped"); + } + try { if (this.shouldSplitHlog) { LOG.info("Splitting logs for " + serverName); @@ -200,9 +205,15 @@ public class ServerShutdownHandler extends EventHandler { if (!this.services.getAssignmentManager().isRegionAssigned(hri)) { if (!regionsToAssign.contains(hri)) { regionsToAssign.add(hri); + RegionState rit = + services.getAssignmentManager().getRegionsInTransition().get(hri.getEncodedName()); + removeRITsOfRregionInDisablingOrDisabledTables(regionsToAssign, rit, + services.getAssignmentManager(), hri); } } } + + // re-assign regions for (HRegionInfo hri : regionsToAssign) { this.services.getAssignmentManager().assign(hri, true); } @@ -244,12 +255,13 @@ public class ServerShutdownHandler extends EventHandler { } } + AssignmentManager assignmentManager = this.services.getAssignmentManager(); for (Map.Entry e : metaHRIs.entrySet()) { - RegionState rit = services.getAssignmentManager().getRegionsInTransition().get( - e.getKey().getEncodedName()); - AssignmentManager assignmentManager = this.services.getAssignmentManager(); + RegionState rit = + assignmentManager.getRegionsInTransition().get(e.getKey().getEncodedName()); + if (processDeadRegion(e.getKey(), e.getValue(), assignmentManager, - this.server.getCatalogTracker())) { + this.server.getCatalogTracker())) { ServerName addressFromAM = assignmentManager.getRegionServerOfRegion(e.getKey()); if (rit != null && !rit.isClosing() && !rit.isPendingClose() && !rit.isSplitting() && !ritsGoingToServer.contains(e.getKey())) { @@ -268,7 +280,7 @@ public class ServerShutdownHandler extends EventHandler { ZKAssign.deleteNodeFailSilent(services.getZooKeeper(), e.getKey()); } catch (KeeperException ke) { this.server.abort("Unexpected ZK exception deleting unassigned node " + e.getKey(), - ke); + ke); return null; } } @@ -291,27 +303,25 @@ public class ServerShutdownHandler extends EventHandler { // CLOSING state. Doing this will have no harm. The rit can be null if region server went // down during master startup. In that case If any znodes' exists for partially disabled // table regions deleting them during startup only. See HBASE-8127. - toAssign = - checkForDisablingOrDisabledTables(ritsGoingToServer, toAssign, rit, e.getKey(), - assignmentManager); + removeRITsOfRregionInDisablingOrDisabledTables(toAssign, rit, assignmentManager, e.getKey()); } + return toAssign; } - private List checkForDisablingOrDisabledTables(Set regionsFromRIT, - List toAssign, RegionState rit, HRegionInfo hri, - AssignmentManager assignmentManager) { - boolean disabled = - assignmentManager.getZKTable().isDisablingOrDisabledTable(hri.getTableNameAsString()); - if (disabled) { - // To avoid region assignment if table is in disabling or disabled state. - toAssign.remove(hri); - regionsFromRIT.remove(hri); + private void removeRITsOfRregionInDisablingOrDisabledTables(List toAssign, + RegionState rit, AssignmentManager assignmentManager, HRegionInfo hri) { + + if (!assignmentManager.getZKTable().isDisablingOrDisabledTable(hri.getTableNameAsString())) { + return; } - if (rit != null && disabled) { + + // To avoid region assignment if table is in disabling or disabled state. + toAssign.remove(hri); + + if (rit != null) { assignmentManager.deleteNodeAndOfflineRegion(hri); } - return toAssign; } /** diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index f94e0f1..cf47aa8 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -67,12 +67,12 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HDFSBlocksDistribution; -import org.apache.hadoop.hbase.HealthCheckChore; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.HServerLoad; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HealthCheckChore; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MasterAddressTracker; import org.apache.hadoop.hbase.NotServingRegionException; @@ -173,7 +173,6 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.KeeperException; import org.codehaus.jackson.map.ObjectMapper; -import org.joda.time.field.MillisDurationField; import com.google.common.base.Function; import com.google.common.collect.Lists; @@ -1782,7 +1781,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, @Override public void stop(final String msg) { try { - this.rsHost.preStop(msg); + if (this.rsHost != null) { + this.rsHost.preStop(msg); + } this.stopped = true; LOG.info("STOPPED: " + msg); // Wakes run() if it is sleeping diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java index c6e607e..4365f78 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java @@ -45,7 +45,7 @@ public abstract class ZooKeeperNodeTracker extends ZooKeeperListener { /** Used to abort if a fatal error occurs */ protected final Abortable abortable; - private boolean stopped = false; + private volatile boolean stopped = false; /** * Constructs a new ZK node tracker. diff --git src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index 288bd6c..109d94e 100644 --- src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.security.User; @@ -452,7 +453,9 @@ public class MiniHBaseCluster extends HBaseCluster { while (!(mts = getMasterThreads()).isEmpty() && (System.currentTimeMillis() - start) < timeout) { for (JVMClusterUtil.MasterThread mt : mts) { - if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) { + ServerManager serverManager = mt.getMaster().getServerManager(); + if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized() + && !serverManager.areDeadServersInProgress()) { return true; } } diff --git src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 823d7da..83bd4c1 100644 --- src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -19,22 +19,29 @@ */ package org.apache.hadoop.hbase.master; -import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; +import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.tot_mgr_wait_for_zk_delete; +import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.tot_wkr_final_transistion_failed; +import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.tot_wkr_preempt_task; +import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.tot_wkr_task_acquired; +import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.tot_wkr_task_done; +import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.tot_wkr_task_err; +import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.tot_wkr_task_resigned; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.NavigableSet; import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,7 +50,13 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; @@ -54,8 +67,9 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -82,15 +96,21 @@ public class TestDistributedLogSplitting { Configuration conf; HBaseTestingUtility TEST_UTIL; + private void startCluster(int num_rs) throws Exception{ + conf = HBaseConfiguration.create(); + startCluster(NUM_MASTERS, num_rs, conf); + } + + private void startCluster(int num_master, int num_rs, Configuration inConf) throws Exception { ZKSplitLog.Counters.resetCounters(); LOG.info("Starting cluster"); - conf = HBaseConfiguration.create(); + this.conf = inConf; conf.getLong("hbase.splitlog.max.resubmit", 0); // Make the failure test faster conf.setInt("zookeeper.recovery.retry", 0); TEST_UTIL = new HBaseTestingUtility(conf); - TEST_UTIL.startMiniCluster(NUM_MASTERS, num_rs); + TEST_UTIL.startMiniCluster(num_master, num_rs); cluster = TEST_UTIL.getHBaseCluster(); LOG.info("Waiting for active/ready master"); cluster.waitForActiveAndReadyMaster(); @@ -102,6 +122,10 @@ public class TestDistributedLogSplitting { @After public void after() throws Exception { + for (MasterThread mt : TEST_UTIL.getHBaseCluster().getLiveMasterThreads()) { + mt.getMaster().abort("closing...", new Exception("Trace info")); + } + TEST_UTIL.shutdownMiniCluster(); } @@ -205,6 +229,89 @@ public class TestDistributedLogSplitting { assertEquals(NUM_LOG_LINES, count); } + @Test(timeout = 300000) + public void testMasterStartsUpWithLogSplittingWork() throws Exception { + LOG.info("testMasterStartsUpWithLogSplittingWork"); + Configuration curConf = HBaseConfiguration.create(); + curConf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1); + startCluster(2, NUM_RS, curConf); + + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_LOG_LINES = 1000; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + HTable ht = installTable(zkw, "table", "f", NUM_REGIONS_TO_CREATE); + + List regions = null; + HRegionServer hrs = null; + for (int i = 0; i < NUM_RS; i++) { + boolean isCarryingMeta = false; + hrs = rsts.get(i).getRegionServer(); + regions = hrs.getOnlineRegions(); + for (HRegionInfo region : regions) { + if (region.isRootRegion() || region.isMetaRegion()) { + isCarryingMeta = true; + break; + } + } + if (isCarryingMeta) { + continue; + } + break; + } + + LOG.info("#regions = " + regions.size()); + Iterator it = regions.iterator(); + while (it.hasNext()) { + HRegionInfo region = it.next(); + if (region.isMetaTable()) { + it.remove(); + } + } + makeHLog(hrs.getWAL(), regions, "table", NUM_LOG_LINES, 100); + + // abort master + abortMaster(cluster); + + // abort RS + int numRS = cluster.getLiveRegionServerThreads().size(); + LOG.info("Aborting region server: " + hrs.getServerName()); + hrs.abort("testing"); + + // wait for the RS dies + long start = EnvironmentEdgeManager.currentTimeMillis(); + while (cluster.getLiveRegionServerThreads().size() > (numRS - 1)) { + if (EnvironmentEdgeManager.currentTimeMillis() - start > 60000) { + assertTrue(false); + } + Thread.sleep(200); + } + + Thread.sleep(2000); + LOG.info("Current Open Regions:" + getAllOnlineRegions(cluster).size()); + + startMasterTillNoDeadServers(cluster); + + start = EnvironmentEdgeManager.currentTimeMillis(); + while (getAllOnlineRegions(cluster).size() < (NUM_REGIONS_TO_CREATE + 2)) { + if (EnvironmentEdgeManager.currentTimeMillis() - start > 60000) { + assertTrue("Timedout", false); + } + Thread.sleep(200); + } + + LOG.info("Current Open Regions After Master Node Starts Up:" + + getAllOnlineRegions(cluster).size()); + + assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht)); + + ht.close(); + } + /** * The original intention of this test was to force an abort of a region * server and to make sure that the failure path in the region servers is @@ -393,33 +500,40 @@ public class TestDistributedLogSplitting { List hris, String tname, int num_edits, int edit_size) throws IOException { + // remove root and meta region + hris.remove(HRegionInfo.ROOT_REGIONINFO); + hris.remove(HRegionInfo.FIRST_META_REGIONINFO); byte[] table = Bytes.toBytes(tname); HTableDescriptor htd = new HTableDescriptor(tname); byte[] value = new byte[edit_size]; for (int i = 0; i < edit_size; i++) { - value[i] = (byte)('a' + (i % 26)); + value[i] = (byte) ('a' + (i % 26)); } int n = hris.size(); int[] counts = new int[n]; - int j = 0; if (n > 0) { for (int i = 0; i < num_edits; i += 1) { WALEdit e = new WALEdit(); - byte [] row = Bytes.toBytes("r" + Integer.toString(i)); - byte [] family = Bytes.toBytes("f"); - byte [] qualifier = Bytes.toBytes("c" + Integer.toString(i)); - e.add(new KeyValue(row, family, qualifier, - System.currentTimeMillis(), value)); - j++; - log.append(hris.get(j % n), table, e, System.currentTimeMillis(), htd); - counts[j % n] += 1; + HRegionInfo curRegionInfo = hris.get(i % n); + byte[] startRow = curRegionInfo.getStartKey(); + if (startRow == null || startRow.length == 0) { + startRow = new byte[] { 0, 0, 0, 0, 1 }; + } + byte[] row = Bytes.incrementBytes(startRow, counts[i % n]); + row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because + // HBaseTestingUtility.createMultiRegions use 5 bytes + // key + byte[] family = Bytes.toBytes("f"); + byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); + e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value)); + log.append(curRegionInfo, table, e, System.currentTimeMillis(), htd); + counts[i % n] += 1; } } log.sync(); log.close(); for (int i = 0; i < n; i++) { - LOG.info("region " + hris.get(i).getRegionNameAsString() + - " has " + counts[i] + " edits"); + LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits"); } return; } @@ -479,6 +593,30 @@ public class TestDistributedLogSplitting { assertTrue(false); } + private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException { + for (MasterThread mt : cluster.getLiveMasterThreads()) { + if (mt.getMaster().isActiveMaster()) { + mt.getMaster().abort("Aborting for tests", new Exception("Trace info")); + mt.join(); + break; + } + } + LOG.debug("Master is aborted"); + } + + private void startMasterTillNoDeadServers(MiniHBaseCluster cluster) + throws IOException, InterruptedException { + cluster.startMaster(); + HMaster master = cluster.getMaster(); + while (!master.isInitialized()) { + Thread.sleep(100); + } + ServerManager serverManager = master.getServerManager(); + while (serverManager.areDeadServersInProgress()) { + Thread.sleep(100); + } + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); diff --git src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java index a75aeeb..6d1492d 100644 --- src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java +++ src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java @@ -611,7 +611,7 @@ public class TestMasterFailover { * * @throws Exception */ - @Test (timeout=180000) + @Test(timeout = 180000) public void testMasterFailoverWithMockedRITOnDeadRS() throws Exception { final int NUM_MASTERS = 1; @@ -1030,6 +1030,141 @@ public class TestMasterFailover { TEST_UTIL.shutdownMiniCluster(); } + @Test(timeout = 180000) + public void testRSKilledWithMockedOpeningRITGoingToDeadRS() throws Exception { + final int NUM_MASTERS = 1; + final int NUM_RS = 2; + + // Create config to use for this cluster + Configuration conf = HBaseConfiguration.create(); + // Need to drop the timeout much lower + conf.setInt("hbase.master.assignment.timeoutmonitor.period", 10000); + conf.setInt("hbase.master.assignment.timeoutmonitor.timeout", 30000); + conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); + conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 2); + + // Create and start the cluster + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf); + TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS); + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + log("Cluster started"); + + // Create a ZKW to use in the test + ZooKeeperWatcher zkw = + new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "unittest", new Abortable() { + + @Override + public void abort(String why, Throwable e) { + LOG.error("Fatal ZK Error: " + why, e); + org.junit.Assert.assertFalse("Fatal ZK error", true); + } + + @Override + public boolean isAborted() { + return false; + } + + }); + + // get all the master threads + List masterThreads = cluster.getMasterThreads(); + assertEquals(1, masterThreads.size()); + + // only one master thread, let's wait for it to be initialized + assertTrue(cluster.waitForActiveAndReadyMaster()); + HMaster master = masterThreads.get(0).getMaster(); + assertTrue(master.isActiveMaster()); + assertTrue(master.isInitialized()); + + // disable load balancing on this master + master.balanceSwitch(false); + + // create two tables in META, each with 30 regions + byte[] FAMILY = Bytes.toBytes("family"); + byte[][] SPLIT_KEYS = + TEST_UTIL.getRegionSplitStartKeys(Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 15); + + FileSystem filesystem = FileSystem.get(conf); + Path rootdir = filesystem.makeQualified(new Path(conf.get(HConstants.HBASE_DIR))); + + byte[] disabledTable = Bytes.toBytes("disabledTable"); + HTableDescriptor htdDisabled = new HTableDescriptor(disabledTable); + htdDisabled.addFamily(new HColumnDescriptor(FAMILY)); + // Write the .tableinfo + FSTableDescriptors.createTableDescriptor(filesystem, rootdir, htdDisabled); + HRegionInfo hriDisabled = new HRegionInfo(htdDisabled.getName(), null, null); + createRegion(hriDisabled, rootdir, conf, htdDisabled); + + List tableRegions = + TEST_UTIL.createMultiRegionsInMeta(TEST_UTIL.getConfiguration(), htdDisabled, SPLIT_KEYS); + + log("Regions in META have been created"); + + // at this point we only expect 2 regions to be assigned out (catalogs) + assertEquals(2, cluster.countServedRegions()); + + // The first RS will stay online + List regionservers = cluster.getRegionServerThreads(); + HRegionServer hrs = regionservers.get(0).getRegionServer(); + + // The second RS is going to be hard-killed + RegionServerThread hrsDeadThread = regionservers.get(1); + HRegionServer hrsDead = hrsDeadThread.getRegionServer(); + ServerName deadServerName = hrsDead.getServerName(); + + // we'll need some regions to already be assigned out properly on live RS + List assignedRegionsOnLiveRS = new ArrayList(); + assignedRegionsOnLiveRS.addAll(tableRegions.subList(0, 3)); + tableRegions.removeAll(assignedRegionsOnLiveRS); + + // now actually assign them + for (HRegionInfo hri : assignedRegionsOnLiveRS) { + master.assignmentManager.regionPlans.put(hri.getEncodedName(), + new RegionPlan(hri, null, hrs.getServerName())); + master.assignRegion(hri); + } + + log("Waiting for assignment to finish"); + ZKAssign.blockUntilNoRIT(zkw); + master.assignmentManager.waitUntilNoRegionsInTransition(60000); + log("Assignment completed"); + + // Due to master.assignRegion(hri) could fail to assign a region to a specified RS + // therefore, we need make sure that regions are in the expected RS + verifyRegionLocation(hrs, assignedRegionsOnLiveRS); + + assertTrue(" Table must be enabled.", master.getAssignmentManager().getZKTable() + .isEnabledTable("disabledTable")); + + assertTrue(" Didn't get enough regions of enabledTalbe on live rs.", + assignedRegionsOnLiveRS.size() >= 1); + + // Disable the disabledTable in ZK + ZKTable zktable = master.assignmentManager.getZKTable(); + zktable.setDisablingTable("disabledTable"); + + // RS was opening a region of disabled table then died + HRegionInfo region = assignedRegionsOnLiveRS.remove(0); + master.assignmentManager.regionOffline(region); + master.assignmentManager.regionsInTransition.put(region.getEncodedName(), new RegionState( + region, RegionState.State.OPENING, System.currentTimeMillis(), deadServerName)); + ZKAssign.createNodeOffline(zkw, region, deadServerName); + ZKAssign.transitionNodeOpening(zkw, region, deadServerName); + + // Kill the RS that had a hard death + log("Killing RS " + deadServerName); + hrsDead.abort("Killing for unit test"); + while (hrsDeadThread.isAlive()) { + Threads.sleep(10); + } + log("RS " + deadServerName + " killed"); + + log("Waiting for no more RIT"); + ZKAssign.blockUntilNoRIT(zkw); + log("No more RIT in ZK"); + assertTrue(master.assignmentManager.waitUntilNoRegionsInTransition(120000)); + } + /** * Verify regions are on the expected region server */ diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenMasterInitializing.java src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenMasterInitializing.java index 3d1da30..1689238 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenMasterInitializing.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenMasterInitializing.java @@ -44,12 +44,10 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.master.TestMasterFailover; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZKTable; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -103,25 +101,12 @@ public class TestRSKilledWhenMasterInitializing { super(conf); } - @Override - protected void splitLogAfterStartup(MasterFileSystem mfs) { - super.splitLogAfterStartup(mfs); - logSplit = true; - // If "TestingMaster.sleep" is set, sleep after log split. - if (getConfiguration().getBoolean("TestingMaster.sleep", false)) { - int duration = getConfiguration().getInt( - "TestingMaster.sleep.duration", 0); - Threads.sleep(duration); - } - } - - public boolean isLogSplitAfterStartup() { return logSplit; } } - @Test(timeout = 120000) + @Test(timeout = 180000) public void testCorrectnessWhenMasterFailOver() throws Exception { final byte[] TABLENAME = Bytes.toBytes("testCorrectnessWhenMasterFailOver"); final byte[] FAMILY = Bytes.toBytes("family"); @@ -164,7 +149,7 @@ public class TestRSKilledWhenMasterInitializing { /* NO.1 .META. region correctness */ // First abort master abortMaster(cluster); - TestingMaster master = startMasterAndWaitUntilLogSplit(cluster); + TestingMaster master = startMasterAndWaitTillMetaRegionAssignment(cluster); // Second kill meta server int metaServerNum = cluster.getServerWithMeta(); @@ -195,7 +180,7 @@ public class TestRSKilledWhenMasterInitializing { if (rootServerNum != metaServerNum) { // First abort master abortMaster(cluster); - master = startMasterAndWaitUntilLogSplit(cluster); + master = startMasterAndWaitTillMetaRegionAssignment(cluster); // Second kill meta server HRegionServer rootRS = cluster.getRegionServer(rootServerNum); @@ -215,6 +200,7 @@ public class TestRSKilledWhenMasterInitializing { assertTrue(hbaseAdmin.isTableAvailable(TABLENAME)); } + /* NO.3 data region correctness */ ServerManager serverManager = cluster.getMaster().getServerManager(); while (serverManager.areDeadServersInProgress()) { @@ -274,7 +260,7 @@ public class TestRSKilledWhenMasterInitializing { // Stop the master abortMaster(cluster); - master = startMasterAndWaitUntilLogSplit(cluster); + master = startMasterAndWaitTillMetaRegionAssignment(cluster); deadRS.kill(); deadRS.join(); waitUntilMasterIsInitialized(master); @@ -302,14 +288,12 @@ public class TestRSKilledWhenMasterInitializing { LOG.debug("Master is aborted"); } - private TestingMaster startMasterAndWaitUntilLogSplit(MiniHBaseCluster cluster) + private TestingMaster startMasterAndWaitTillMetaRegionAssignment(MiniHBaseCluster cluster) throws IOException, InterruptedException { TestingMaster master = (TestingMaster) cluster.startMaster().getMaster(); - while (!master.isLogSplitAfterStartup()) { + while (!master.isInitializationStartsMetaRegoinAssignment()) { Thread.sleep(100); } - LOG.debug("splitted:" + master.isLogSplitAfterStartup() + ",initialized:" - + master.isInitialized()); return master; } @@ -318,6 +302,9 @@ public class TestRSKilledWhenMasterInitializing { while (!master.isInitialized()) { Thread.sleep(100); } + while (master.getServerManager().areDeadServersInProgress()) { + Thread.sleep(100); + } LOG.debug("master isInitialized"); }