diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 1c576209e8..67e4fdf591 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -761,8 +761,6 @@ public class HMaster extends HRegionServer implements MasterServices {
/*
* We are active master now... go initialize components we need to run.
- * Note, there may be dross in zk from previous runs; it'll get addressed
- * below after we determine if cluster startup or failover.
*/
status.setStatus("Initializing Master file system");
@@ -1173,12 +1171,6 @@ public class HMaster extends HRegionServer implements MasterServices {
super.stopServiceThreads();
stopChores();
- // Wait for all the remaining region servers to report in IFF we were
- // running a cluster shutdown AND we were NOT aborting.
- if (!isAborted() && this.serverManager != null &&
- this.serverManager.isClusterShutdown()) {
- this.serverManager.letRegionServersShutdown();
- }
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping service threads");
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 79ffc8a582..d5ef94de30 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -321,14 +321,14 @@ public class ServerManager {
* @param sl the server load on the server
* @return true if the server is recorded, otherwise, false
*/
- boolean checkAndRecordNewServer(
- final ServerName serverName, final ServerLoad sl) {
- ServerName existingServer = null;
+ boolean checkAndRecordNewServer(final ServerName serverName, final ServerLoad sl) {
+ ServerName newerEquivalentServer = null;
synchronized (this.onlineServers) {
- existingServer = findServerWithSameHostnamePortWithLock(serverName);
- if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
- LOG.info("Server serverName=" + serverName + " rejected; we already have "
- + existingServer.toString() + " registered with same hostname and port");
+ newerEquivalentServer = getNewerEquivalentServer(serverName);
+ if (newerEquivalentServer != null) {
+ LOG.info("ServerName=" + serverName + " rejected; we already have " +
+ newerEquivalentServer.toString() + " registered with same hostname and port and larger" +
+ "startcode");
return false;
}
recordNewServerWithLock(serverName, sl);
@@ -343,15 +343,27 @@ public class ServerManager {
// Note that we assume that same ts means same server, and don't expire in that case.
// TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
- if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
+ if (newerEquivalentServer != null &&
+ (newerEquivalentServer.getStartcode() < serverName.getStartcode())) {
LOG.info("Triggering server recovery; existingServer " +
- existingServer + " looks stale, new server:" + serverName);
- expireServer(existingServer);
+ newerEquivalentServer + " looks stale, new server:" + serverName);
+ expireServer(newerEquivalentServer);
}
return true;
}
/**
+ * @return Name of the new instance of the oldServerName else null. Returned
+ * ServerName has same hostname and port and a newer startcode.
+ */
+ public ServerName getNewerEquivalentServer(final ServerName oldServerName) {
+ synchronized (this.onlineServers) {
+ ServerName result = findServerWithSameHostnamePortWithLock(oldServerName);
+ return result != null && (result.getStartcode() > oldServerName.getStartcode())? result: null;
+ }
+ }
+
+ /**
* Checks if the clock skew between the server and the master. If the clock skew exceeds the
* configured max, it will throw an exception; if it exceeds the configured warning threshold,
* it will log a warning but start normally.
@@ -951,7 +963,6 @@ public class ServerManager {
String statusStr = "Cluster shutdown requested of master=" + this.master.getServerName();
LOG.info(statusStr);
this.clusterShutdown.set(true);
- this.master.stop(statusStr);
}
public boolean isClusterShutdown() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
index 694c0e725c..c860086b10 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
@@ -599,6 +599,27 @@ public class AssignmentManager implements ServerListener {
// RegionTransition procedures helpers
// ============================================================================================
+ public AssignProcedure[] createAssignProcedures(final List hris,
+ ServerName oldLocation) {
+ if (hris == null || hris.size() <= 0) {
+ return EMPTY_ASSIGN_PROCEDURE_ARRAY;
+ }
+ ServerName newerEquivalentServer =
+ this.master.getServerManager().getNewerEquivalentServer(oldLocation);
+ if (newerEquivalentServer != null) {
+ // Assign all these regions to the new instance of the server so we retain locality; so
+ // we put the regions back on the server with same port and hostname as oldLocation.
+ // TODO: Add a stickyness facility to LoadBalancer where you pass in old location and
+ // balancer tries to keep assignments on new version of old location.
+ Map> map = new HashMap<>();
+ map.put(newerEquivalentServer, hris);
+ return createAssignProcedures(map, hris.size());
+ } else {
+ // Fall back to default roundrobin.
+ return createAssignProcedures(hris);
+ }
+ }
+
public AssignProcedure[] createAssignProcedures(final List hris) {
if (hris.isEmpty()) return null;
try {
@@ -606,7 +627,7 @@ public class AssignmentManager implements ServerListener {
// a better job if it has all the assignments in the one lump.
Map> assignments = getBalancer().roundRobinAssignment(hris,
this.master.getServerManager().createDestinationServersList(null));
- return createAssignProcedure(assignments, hris.size());
+ return createAssignProcedures(assignments, hris.size());
} catch (HBaseIOException hioe) {
LOG.warn("Failed roundRobinAssignment", hioe);
}
@@ -622,14 +643,20 @@ public class AssignmentManager implements ServerListener {
// Make this static for the method below where we use it typing the AssignProcedure array we
// return as result.
- private static final AssignProcedure [] ASSIGN_PROCEDURE_ARRAY_TYPE = new AssignProcedure[] {};
+ private static final AssignProcedure [] EMPTY_ASSIGN_PROCEDURE_ARRAY = new AssignProcedure[] {};
+ private static final AssignProcedure [] ASSIGN_PROCEDURE_ARRAY_TYPE =
+ EMPTY_ASSIGN_PROCEDURE_ARRAY;
/**
* @param assignments Map of assignments from which we produce an array of assignments.
* @param size Count of assignments to make (the caller may know the total count)
* @return Assignments made from the passed in assignments
*/
- AssignProcedure[] createAssignProcedure(Map> assignments, int size) {
+ AssignProcedure[] createAssignProcedures(Map> assignments,
+ int size) {
+ if (assignments == null || assignments.size() <= 0) {
+ return EMPTY_ASSIGN_PROCEDURE_ARRAY;
+ }
List procedures =
new ArrayList(size > 0? size: 8/*Choose an arbitrary size*/);
for (Map.Entry> e: assignments.entrySet()) {
@@ -1181,20 +1208,16 @@ public class AssignmentManager implements ServerListener {
// ============================================================================================
public void joinCluster() throws IOException {
final long startTime = System.currentTimeMillis();
-
- LOG.info("Joining the cluster...");
-
+ LOG.info("Joining cluster...");
// Scan hbase:meta to build list of existing regions, servers, and assignment
loadMeta();
-
for (int i = 0; master.getServerManager().countOfRegionServers() < 1; ++i) {
- LOG.info("waiting for RS to join");
+ LOG.info("Waiting for RegionServers to join; current count=" +
+ master.getServerManager().countOfRegionServers());
Threads.sleep(250);
}
- LOG.info("RS joined. Num RS = " + master.getServerManager().countOfRegionServers());
+ LOG.info("Number of RegionServers=" + master.getServerManager().countOfRegionServers());
- // This method will assign all user regions if a clean server startup or
- // it will reconstruct master state and cleanup any leftovers from previous master process.
boolean failover = processofflineServersWithOnlineRegions();
// Start the RIT chore
@@ -1249,49 +1272,47 @@ public class AssignmentManager implements ServerListener {
// they will be handled by the SSH that are put in the ServerManager "queue".
// we can integrate this a bit better.
private boolean processofflineServersWithOnlineRegions() {
- boolean failover = !master.getServerManager().getDeadServers().isEmpty();
-
- final Set offlineServersWithOnlineRegions = new HashSet();
- final ArrayList regionsToAssign = new ArrayList();
- long st, et;
-
- st = System.currentTimeMillis();
+ boolean deadServers = !master.getServerManager().getDeadServers().isEmpty();
+ final Set offlineServersWithOnlineRegions = new HashSet<>();
+ int size = regionStates.getRegionStateNodes().size();
+ final List onlineRegionsToAssign = new ArrayList<>(size);
+ final List offlineRegionsToAssign = new ArrayList<>(size);
+ long startTime = System.currentTimeMillis();
+ boolean failover = deadServers;
for (RegionStateNode regionNode: regionStates.getRegionStateNodes()) {
+ // Region State can be OPEN even if we did controlled cluster shutdown; Master does not close
+ // the regions in this case. The RegionServer does.
if (regionNode.getState() == State.OPEN) {
final ServerName serverName = regionNode.getRegionLocation();
if (!master.getServerManager().isServerOnline(serverName)) {
+ onlineRegionsToAssign.add(regionNode.getRegionInfo());
offlineServersWithOnlineRegions.add(serverName);
+ } else {
+ // Server is online. For sure this is failover then, a Master starting up into an
+ // already-running cluster.
+ failover = true;
}
} else if (regionNode.getState() == State.OFFLINE) {
if (isTableEnabled(regionNode.getTable())) {
- regionsToAssign.add(regionNode.getRegionInfo());
+ offlineRegionsToAssign.add(regionNode.getRegionInfo());
}
}
}
- et = System.currentTimeMillis();
- LOG.info("[STEP-1] " + StringUtils.humanTimeDiff(et - st));
-
- // kill servers with online regions
- st = System.currentTimeMillis();
+ // Kill servers with online regions just-in-case. Runs ServerCrashProcedure.
+ startTime = System.currentTimeMillis();
for (ServerName serverName: offlineServersWithOnlineRegions) {
if (!master.getServerManager().isServerOnline(serverName)) {
- LOG.info("KILL RS hosting regions but not online " + serverName +
- " (master=" + master.getServerName() + ")");
killRegionServer(serverName);
}
}
- et = System.currentTimeMillis();
- LOG.info("[STEP-2] " + StringUtils.humanTimeDiff(et - st));
-
setFailoverCleanupDone(true);
// Assign offline regions
- st = System.currentTimeMillis();
- master.getMasterProcedureExecutor().submitProcedures(master.getAssignmentManager().
- createAssignProcedures(regionsToAssign));
- et = System.currentTimeMillis();
- LOG.info("[STEP-3] " + StringUtils.humanTimeDiff(et - st));
-
+ startTime = System.currentTimeMillis();
+ if (offlineRegionsToAssign.size() > 0) {
+ master.getMasterProcedureExecutor().submitProcedures(master.getAssignmentManager().
+ createAssignProcedures(offlineRegionsToAssign));
+ }
return failover;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
index e1a29f5930..cc149fa57d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -165,7 +165,7 @@ implements ServerProcedureInterface {
AssignmentManager am = env.getAssignmentManager();
// forceNewPlan is set to false. Balancer is expected to find most suitable target
// server if retention is not possible.
- addChildProcedure(am.createAssignProcedures(regionsOnCrashedServer));
+ addChildProcedure(am.createAssignProcedures(regionsOnCrashedServer, getServerName()));
}
setNextState(ServerCrashState.SERVER_CRASH_FINISH);
break;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index cb0632d765..28c015a01d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2275,7 +2275,8 @@ public class HRegionServer extends HasThread implements
ReportRegionStateTransitionRequest request = builder.build();
int tries = 0;
long pauseTime = INIT_PAUSE_TIME_MS;
- while (keepLooping()) {
+ // Keep looping till we get an error. We want to send reports even though server is going down.
+ while (true) {
RegionServerStatusService.BlockingInterface rss = rssStub;
try {
if (rss == null) {
@@ -2286,8 +2287,7 @@ public class HRegionServer extends HasThread implements
rss.reportRegionStateTransition(null, request);
if (response.hasErrorMessage()) {
LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage());
- // NOTE: Return mid-method!!!
- return false;
+ break;
}
// Log if we had to retry else don't log unless TRACE. We want to
// know if were successful after an attempt showed in logs as failed.
@@ -2319,7 +2319,6 @@ public class HRegionServer extends HasThread implements
}
}
}
- LOG.info("TRANSITION NOT REPORTED " + request);
return false;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 2488d20eb0..bee90dd26f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -967,13 +967,18 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
}
// Start the MiniHBaseCluster
- return startMiniHBaseCluster(numMasters, numSlaves, masterClass,
+ return startMiniHBaseCluster(numMasters, numSlaves, null, masterClass,
regionserverClass, create, withWALDir);
}
public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves)
- throws IOException, InterruptedException{
- return startMiniHBaseCluster(numMasters, numSlaves, null, null, false, false);
+ throws IOException, InterruptedException {
+ return startMiniHBaseCluster(numMasters, numSlaves, null);
+ }
+
+ public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves,
+ List rsports) throws IOException, InterruptedException {
+ return startMiniHBaseCluster(numMasters, numSlaves, rsports, null, null, false, false);
}
/**
@@ -990,7 +995,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
* @see {@link #startMiniCluster()}
*/
public MiniHBaseCluster startMiniHBaseCluster(final int numMasters,
- final int numSlaves, Class extends HMaster> masterClass,
+ final int numSlaves, List rsports, Class extends HMaster> masterClass,
Class extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass,
boolean create, boolean withWALDir)
throws IOException, InterruptedException {
@@ -1015,7 +1020,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
Configuration c = new Configuration(this.conf);
TraceUtil.initTracer(c);
this.hbaseCluster =
- new MiniHBaseCluster(c, numMasters, numSlaves, masterClass, regionserverClass);
+ new MiniHBaseCluster(c, numMasters, numSlaves, rsports, masterClass, regionserverClass);
// Don't leave here till we've done a successful scan of the hbase:meta
Table t = getConnection().getTable(TableName.META_TABLE_NAME);
ResultScanner s = t.getScanner(new Scan());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
index e02347d3c3..3939050eee 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
@@ -77,10 +77,11 @@ public class MiniHBaseCluster extends HBaseCluster {
*/
public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers)
throws IOException, InterruptedException {
- this(conf, numMasters, numRegionServers, null, null);
+ this(conf, numMasters, numRegionServers, null, null, null);
}
public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
+ List rsports,
Class extends HMaster> masterClass,
Class extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
throws IOException, InterruptedException {
@@ -93,7 +94,7 @@ public class MiniHBaseCluster extends HBaseCluster {
// Hadoop 2
CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
- init(numMasters, numRegionServers, masterClass, regionserverClass);
+ init(numMasters, numRegionServers, rsports, masterClass, regionserverClass);
this.initialClusterStatus = getClusterStatus();
}
@@ -207,7 +208,7 @@ public class MiniHBaseCluster extends HBaseCluster {
}
}
- private void init(final int nMasterNodes, final int nRegionNodes,
+ private void init(final int nMasterNodes, final int nRegionNodes, List rsports,
Class extends HMaster> masterClass,
Class extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
throws IOException, InterruptedException {
@@ -224,8 +225,11 @@ public class MiniHBaseCluster extends HBaseCluster {
masterClass, regionserverClass);
// manually add the regionservers as other users
- for (int i=0; i rsports = new ArrayList<>();
+ for (JVMClusterUtil.RegionServerThread rst:
+ TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads()) {
+ rsports.add(rst.getRegionServer().getRpcServer().getListenerAddress().getPort());
+ }
TEST_UTIL.shutdownMiniHBaseCluster();
- TEST_UTIL.startMiniHBaseCluster(1, numSlaves);
+ TEST_UTIL.startMiniHBaseCluster(1, numSlaves, rsports);
TEST_UTIL.waitTableEnabled(tableName);
validateFromSnapshotFromMeta(TEST_UTIL, tableName, numRegions, numReplica,
ADMIN.getConnection());
@@ -203,10 +212,10 @@ public class TestMasterOperationsForRegionReplicas {
ADMIN.enableTable(tableName);
LOG.info(ADMIN.getTableDescriptor(tableName).toString());
assert(ADMIN.isTableEnabled(tableName));
- List regions = TEST_UTIL.getMiniHBaseCluster().getMaster()
- .getAssignmentManager().getRegionStates().getRegionsOfTable(tableName);
- assertTrue("regions.size=" + regions.size() + ", numRegions=" + numRegions + ", numReplica=" + numReplica,
- regions.size() == numRegions * (numReplica + 1));
+ List regions = TEST_UTIL.getMiniHBaseCluster().getMaster().
+ getAssignmentManager().getRegionStates().getRegionsOfTable(tableName);
+ assertTrue("regions.size=" + regions.size() + ", numRegions=" + numRegions +
+ ", numReplica=" + numReplica, regions.size() == numRegions * (numReplica + 1));
//decrease the replica(earlier, table was modified to have a replica count of numReplica + 1)
ADMIN.disableTable(tableName);
@@ -233,7 +242,6 @@ public class TestMasterOperationsForRegionReplicas {
assert(defaultReplicas.size() == numRegions);
Collection counts = new HashSet<>(defaultReplicas.values());
assert(counts.size() == 1 && counts.contains(new Integer(numReplica)));
- */
} finally {
ADMIN.disableTable(tableName);
ADMIN.deleteTable(tableName);
@@ -342,14 +350,14 @@ public class TestMasterOperationsForRegionReplicas {
connection);
snapshot.initialize();
Map regionToServerMap = snapshot.getRegionToRegionServerMap();
- assertEquals(regionToServerMap.size(), numRegions * numReplica + 1); //'1' for the namespace
+ assertEquals(regionToServerMap.size(), numRegions * numReplica + 1);
Map> serverToRegionMap = snapshot.getRegionServerToRegionMap();
- assertEquals(serverToRegionMap.keySet().size(), 2); // 1 rs + 1 master
+ assertEquals("One Region Only", 1, serverToRegionMap.keySet().size());
for (Map.Entry> entry : serverToRegionMap.entrySet()) {
if (entry.getKey().equals(TEST_UTIL.getHBaseCluster().getMaster().getServerName())) {
continue;
}
- assertEquals(entry.getValue().size(), numRegions * numReplica);
+ assertEquals(entry.getValue().size(), numRegions * numReplica + 1);
}
}
}