diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 1c576209e8..67e4fdf591 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -761,8 +761,6 @@ public class HMaster extends HRegionServer implements MasterServices { /* * We are active master now... go initialize components we need to run. - * Note, there may be dross in zk from previous runs; it'll get addressed - * below after we determine if cluster startup or failover. */ status.setStatus("Initializing Master file system"); @@ -1173,12 +1171,6 @@ public class HMaster extends HRegionServer implements MasterServices { super.stopServiceThreads(); stopChores(); - // Wait for all the remaining region servers to report in IFF we were - // running a cluster shutdown AND we were NOT aborting. - if (!isAborted() && this.serverManager != null && - this.serverManager.isClusterShutdown()) { - this.serverManager.letRegionServersShutdown(); - } if (LOG.isDebugEnabled()) { LOG.debug("Stopping service threads"); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 79ffc8a582..d5ef94de30 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -321,14 +321,14 @@ public class ServerManager { * @param sl the server load on the server * @return true if the server is recorded, otherwise, false */ - boolean checkAndRecordNewServer( - final ServerName serverName, final ServerLoad sl) { - ServerName existingServer = null; + boolean checkAndRecordNewServer(final ServerName serverName, final ServerLoad sl) { + ServerName newerEquivalentServer = null; synchronized (this.onlineServers) { - existingServer = findServerWithSameHostnamePortWithLock(serverName); - if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) { - LOG.info("Server serverName=" + serverName + " rejected; we already have " - + existingServer.toString() + " registered with same hostname and port"); + newerEquivalentServer = getNewerEquivalentServer(serverName); + if (newerEquivalentServer != null) { + LOG.info("ServerName=" + serverName + " rejected; we already have " + + newerEquivalentServer.toString() + " registered with same hostname and port and larger" + + "startcode"); return false; } recordNewServerWithLock(serverName, sl); @@ -343,15 +343,27 @@ public class ServerManager { // Note that we assume that same ts means same server, and don't expire in that case. // TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky. - if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) { + if (newerEquivalentServer != null && + (newerEquivalentServer.getStartcode() < serverName.getStartcode())) { LOG.info("Triggering server recovery; existingServer " + - existingServer + " looks stale, new server:" + serverName); - expireServer(existingServer); + newerEquivalentServer + " looks stale, new server:" + serverName); + expireServer(newerEquivalentServer); } return true; } /** + * @return Name of the new instance of the oldServerName else null. Returned + * ServerName has same hostname and port and a newer startcode. + */ + public ServerName getNewerEquivalentServer(final ServerName oldServerName) { + synchronized (this.onlineServers) { + ServerName result = findServerWithSameHostnamePortWithLock(oldServerName); + return result != null && (result.getStartcode() > oldServerName.getStartcode())? result: null; + } + } + + /** * Checks if the clock skew between the server and the master. If the clock skew exceeds the * configured max, it will throw an exception; if it exceeds the configured warning threshold, * it will log a warning but start normally. @@ -951,7 +963,6 @@ public class ServerManager { String statusStr = "Cluster shutdown requested of master=" + this.master.getServerName(); LOG.info(statusStr); this.clusterShutdown.set(true); - this.master.stop(statusStr); } public boolean isClusterShutdown() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index 694c0e725c..c860086b10 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -599,6 +599,27 @@ public class AssignmentManager implements ServerListener { // RegionTransition procedures helpers // ============================================================================================ + public AssignProcedure[] createAssignProcedures(final List hris, + ServerName oldLocation) { + if (hris == null || hris.size() <= 0) { + return EMPTY_ASSIGN_PROCEDURE_ARRAY; + } + ServerName newerEquivalentServer = + this.master.getServerManager().getNewerEquivalentServer(oldLocation); + if (newerEquivalentServer != null) { + // Assign all these regions to the new instance of the server so we retain locality; so + // we put the regions back on the server with same port and hostname as oldLocation. + // TODO: Add a stickyness facility to LoadBalancer where you pass in old location and + // balancer tries to keep assignments on new version of old location. + Map> map = new HashMap<>(); + map.put(newerEquivalentServer, hris); + return createAssignProcedures(map, hris.size()); + } else { + // Fall back to default roundrobin. + return createAssignProcedures(hris); + } + } + public AssignProcedure[] createAssignProcedures(final List hris) { if (hris.isEmpty()) return null; try { @@ -606,7 +627,7 @@ public class AssignmentManager implements ServerListener { // a better job if it has all the assignments in the one lump. Map> assignments = getBalancer().roundRobinAssignment(hris, this.master.getServerManager().createDestinationServersList(null)); - return createAssignProcedure(assignments, hris.size()); + return createAssignProcedures(assignments, hris.size()); } catch (HBaseIOException hioe) { LOG.warn("Failed roundRobinAssignment", hioe); } @@ -622,14 +643,20 @@ public class AssignmentManager implements ServerListener { // Make this static for the method below where we use it typing the AssignProcedure array we // return as result. - private static final AssignProcedure [] ASSIGN_PROCEDURE_ARRAY_TYPE = new AssignProcedure[] {}; + private static final AssignProcedure [] EMPTY_ASSIGN_PROCEDURE_ARRAY = new AssignProcedure[] {}; + private static final AssignProcedure [] ASSIGN_PROCEDURE_ARRAY_TYPE = + EMPTY_ASSIGN_PROCEDURE_ARRAY; /** * @param assignments Map of assignments from which we produce an array of assignments. * @param size Count of assignments to make (the caller may know the total count) * @return Assignments made from the passed in assignments */ - AssignProcedure[] createAssignProcedure(Map> assignments, int size) { + AssignProcedure[] createAssignProcedures(Map> assignments, + int size) { + if (assignments == null || assignments.size() <= 0) { + return EMPTY_ASSIGN_PROCEDURE_ARRAY; + } List procedures = new ArrayList(size > 0? size: 8/*Choose an arbitrary size*/); for (Map.Entry> e: assignments.entrySet()) { @@ -1181,20 +1208,16 @@ public class AssignmentManager implements ServerListener { // ============================================================================================ public void joinCluster() throws IOException { final long startTime = System.currentTimeMillis(); - - LOG.info("Joining the cluster..."); - + LOG.info("Joining cluster..."); // Scan hbase:meta to build list of existing regions, servers, and assignment loadMeta(); - for (int i = 0; master.getServerManager().countOfRegionServers() < 1; ++i) { - LOG.info("waiting for RS to join"); + LOG.info("Waiting for RegionServers to join; current count=" + + master.getServerManager().countOfRegionServers()); Threads.sleep(250); } - LOG.info("RS joined. Num RS = " + master.getServerManager().countOfRegionServers()); + LOG.info("Number of RegionServers=" + master.getServerManager().countOfRegionServers()); - // This method will assign all user regions if a clean server startup or - // it will reconstruct master state and cleanup any leftovers from previous master process. boolean failover = processofflineServersWithOnlineRegions(); // Start the RIT chore @@ -1249,49 +1272,47 @@ public class AssignmentManager implements ServerListener { // they will be handled by the SSH that are put in the ServerManager "queue". // we can integrate this a bit better. private boolean processofflineServersWithOnlineRegions() { - boolean failover = !master.getServerManager().getDeadServers().isEmpty(); - - final Set offlineServersWithOnlineRegions = new HashSet(); - final ArrayList regionsToAssign = new ArrayList(); - long st, et; - - st = System.currentTimeMillis(); + boolean deadServers = !master.getServerManager().getDeadServers().isEmpty(); + final Set offlineServersWithOnlineRegions = new HashSet<>(); + int size = regionStates.getRegionStateNodes().size(); + final List onlineRegionsToAssign = new ArrayList<>(size); + final List offlineRegionsToAssign = new ArrayList<>(size); + long startTime = System.currentTimeMillis(); + boolean failover = deadServers; for (RegionStateNode regionNode: regionStates.getRegionStateNodes()) { + // Region State can be OPEN even if we did controlled cluster shutdown; Master does not close + // the regions in this case. The RegionServer does. if (regionNode.getState() == State.OPEN) { final ServerName serverName = regionNode.getRegionLocation(); if (!master.getServerManager().isServerOnline(serverName)) { + onlineRegionsToAssign.add(regionNode.getRegionInfo()); offlineServersWithOnlineRegions.add(serverName); + } else { + // Server is online. For sure this is failover then, a Master starting up into an + // already-running cluster. + failover = true; } } else if (regionNode.getState() == State.OFFLINE) { if (isTableEnabled(regionNode.getTable())) { - regionsToAssign.add(regionNode.getRegionInfo()); + offlineRegionsToAssign.add(regionNode.getRegionInfo()); } } } - et = System.currentTimeMillis(); - LOG.info("[STEP-1] " + StringUtils.humanTimeDiff(et - st)); - - // kill servers with online regions - st = System.currentTimeMillis(); + // Kill servers with online regions just-in-case. Runs ServerCrashProcedure. + startTime = System.currentTimeMillis(); for (ServerName serverName: offlineServersWithOnlineRegions) { if (!master.getServerManager().isServerOnline(serverName)) { - LOG.info("KILL RS hosting regions but not online " + serverName + - " (master=" + master.getServerName() + ")"); killRegionServer(serverName); } } - et = System.currentTimeMillis(); - LOG.info("[STEP-2] " + StringUtils.humanTimeDiff(et - st)); - setFailoverCleanupDone(true); // Assign offline regions - st = System.currentTimeMillis(); - master.getMasterProcedureExecutor().submitProcedures(master.getAssignmentManager(). - createAssignProcedures(regionsToAssign)); - et = System.currentTimeMillis(); - LOG.info("[STEP-3] " + StringUtils.humanTimeDiff(et - st)); - + startTime = System.currentTimeMillis(); + if (offlineRegionsToAssign.size() > 0) { + master.getMasterProcedureExecutor().submitProcedures(master.getAssignmentManager(). + createAssignProcedures(offlineRegionsToAssign)); + } return failover; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index e1a29f5930..cc149fa57d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -165,7 +165,7 @@ implements ServerProcedureInterface { AssignmentManager am = env.getAssignmentManager(); // forceNewPlan is set to false. Balancer is expected to find most suitable target // server if retention is not possible. - addChildProcedure(am.createAssignProcedures(regionsOnCrashedServer)); + addChildProcedure(am.createAssignProcedures(regionsOnCrashedServer, getServerName())); } setNextState(ServerCrashState.SERVER_CRASH_FINISH); break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index cb0632d765..28c015a01d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2275,7 +2275,8 @@ public class HRegionServer extends HasThread implements ReportRegionStateTransitionRequest request = builder.build(); int tries = 0; long pauseTime = INIT_PAUSE_TIME_MS; - while (keepLooping()) { + // Keep looping till we get an error. We want to send reports even though server is going down. + while (true) { RegionServerStatusService.BlockingInterface rss = rssStub; try { if (rss == null) { @@ -2286,8 +2287,7 @@ public class HRegionServer extends HasThread implements rss.reportRegionStateTransition(null, request); if (response.hasErrorMessage()) { LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage()); - // NOTE: Return mid-method!!! - return false; + break; } // Log if we had to retry else don't log unless TRACE. We want to // know if were successful after an attempt showed in logs as failed. @@ -2319,7 +2319,6 @@ public class HRegionServer extends HasThread implements } } } - LOG.info("TRANSITION NOT REPORTED " + request); return false; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 2488d20eb0..bee90dd26f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -967,13 +967,18 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { } // Start the MiniHBaseCluster - return startMiniHBaseCluster(numMasters, numSlaves, masterClass, + return startMiniHBaseCluster(numMasters, numSlaves, null, masterClass, regionserverClass, create, withWALDir); } public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves) - throws IOException, InterruptedException{ - return startMiniHBaseCluster(numMasters, numSlaves, null, null, false, false); + throws IOException, InterruptedException { + return startMiniHBaseCluster(numMasters, numSlaves, null); + } + + public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves, + List rsports) throws IOException, InterruptedException { + return startMiniHBaseCluster(numMasters, numSlaves, rsports, null, null, false, false); } /** @@ -990,7 +995,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { * @see {@link #startMiniCluster()} */ public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, - final int numSlaves, Class masterClass, + final int numSlaves, List rsports, Class masterClass, Class regionserverClass, boolean create, boolean withWALDir) throws IOException, InterruptedException { @@ -1015,7 +1020,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { Configuration c = new Configuration(this.conf); TraceUtil.initTracer(c); this.hbaseCluster = - new MiniHBaseCluster(c, numMasters, numSlaves, masterClass, regionserverClass); + new MiniHBaseCluster(c, numMasters, numSlaves, rsports, masterClass, regionserverClass); // Don't leave here till we've done a successful scan of the hbase:meta Table t = getConnection().getTable(TableName.META_TABLE_NAME); ResultScanner s = t.getScanner(new Scan()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index e02347d3c3..3939050eee 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -77,10 +77,11 @@ public class MiniHBaseCluster extends HBaseCluster { */ public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers) throws IOException, InterruptedException { - this(conf, numMasters, numRegionServers, null, null); + this(conf, numMasters, numRegionServers, null, null, null); } public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers, + List rsports, Class masterClass, Class regionserverClass) throws IOException, InterruptedException { @@ -93,7 +94,7 @@ public class MiniHBaseCluster extends HBaseCluster { // Hadoop 2 CompatibilityFactory.getInstance(MetricsAssertHelper.class).init(); - init(numMasters, numRegionServers, masterClass, regionserverClass); + init(numMasters, numRegionServers, rsports, masterClass, regionserverClass); this.initialClusterStatus = getClusterStatus(); } @@ -207,7 +208,7 @@ public class MiniHBaseCluster extends HBaseCluster { } } - private void init(final int nMasterNodes, final int nRegionNodes, + private void init(final int nMasterNodes, final int nRegionNodes, List rsports, Class masterClass, Class regionserverClass) throws IOException, InterruptedException { @@ -224,8 +225,11 @@ public class MiniHBaseCluster extends HBaseCluster { masterClass, regionserverClass); // manually add the regionservers as other users - for (int i=0; i rsports = new ArrayList<>(); + for (JVMClusterUtil.RegionServerThread rst: + TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads()) { + rsports.add(rst.getRegionServer().getRpcServer().getListenerAddress().getPort()); + } TEST_UTIL.shutdownMiniHBaseCluster(); - TEST_UTIL.startMiniHBaseCluster(1, numSlaves); + TEST_UTIL.startMiniHBaseCluster(1, numSlaves, rsports); TEST_UTIL.waitTableEnabled(tableName); validateFromSnapshotFromMeta(TEST_UTIL, tableName, numRegions, numReplica, ADMIN.getConnection()); @@ -203,10 +212,10 @@ public class TestMasterOperationsForRegionReplicas { ADMIN.enableTable(tableName); LOG.info(ADMIN.getTableDescriptor(tableName).toString()); assert(ADMIN.isTableEnabled(tableName)); - List regions = TEST_UTIL.getMiniHBaseCluster().getMaster() - .getAssignmentManager().getRegionStates().getRegionsOfTable(tableName); - assertTrue("regions.size=" + regions.size() + ", numRegions=" + numRegions + ", numReplica=" + numReplica, - regions.size() == numRegions * (numReplica + 1)); + List regions = TEST_UTIL.getMiniHBaseCluster().getMaster(). + getAssignmentManager().getRegionStates().getRegionsOfTable(tableName); + assertTrue("regions.size=" + regions.size() + ", numRegions=" + numRegions + + ", numReplica=" + numReplica, regions.size() == numRegions * (numReplica + 1)); //decrease the replica(earlier, table was modified to have a replica count of numReplica + 1) ADMIN.disableTable(tableName); @@ -233,7 +242,6 @@ public class TestMasterOperationsForRegionReplicas { assert(defaultReplicas.size() == numRegions); Collection counts = new HashSet<>(defaultReplicas.values()); assert(counts.size() == 1 && counts.contains(new Integer(numReplica))); - */ } finally { ADMIN.disableTable(tableName); ADMIN.deleteTable(tableName); @@ -342,14 +350,14 @@ public class TestMasterOperationsForRegionReplicas { connection); snapshot.initialize(); Map regionToServerMap = snapshot.getRegionToRegionServerMap(); - assertEquals(regionToServerMap.size(), numRegions * numReplica + 1); //'1' for the namespace + assertEquals(regionToServerMap.size(), numRegions * numReplica + 1); Map> serverToRegionMap = snapshot.getRegionServerToRegionMap(); - assertEquals(serverToRegionMap.keySet().size(), 2); // 1 rs + 1 master + assertEquals("One Region Only", 1, serverToRegionMap.keySet().size()); for (Map.Entry> entry : serverToRegionMap.entrySet()) { if (entry.getKey().equals(TEST_UTIL.getHBaseCluster().getMaster().getServerName())) { continue; } - assertEquals(entry.getValue().size(), numRegions * numReplica); + assertEquals(entry.getValue().size(), numRegions * numReplica + 1); } } }