diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java index 5c349e5..40e82f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java @@ -41,12 +41,14 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -820,25 +822,39 @@ public class RegionStates { public Map> getSnapShotOfAssignment( final Collection regions) { final Map> result = new HashMap>(); - for (RegionInfo hri: regions) { - final RegionStateNode node = getRegionStateNode(hri); - if (node == null) continue; - - // TODO: State.OPEN - final ServerName serverName = node.getRegionLocation(); - if (serverName == null) continue; - - List serverRegions = result.get(serverName); - if (serverRegions == null) { - serverRegions = new ArrayList(); - result.put(serverName, serverRegions); + if (regions != null) { + for (RegionInfo hri : regions) { + final RegionStateNode node = getRegionStateNode(hri); + if (node == null) { + continue; + } + createSnapshot(node, result); + } + } else { + for (RegionStateNode node : regionsMap.values()) { + if (node == null) { + continue; + } + createSnapshot(node, result); } - - serverRegions.add(node.getRegionInfo()); } return result; } + private void createSnapshot(RegionStateNode node, Map> result) { + final ServerName serverName = node.getRegionLocation(); + if (serverName == null) { + return; + } + + List serverRegions = result.get(serverName); + if (serverRegions == null) { + serverRegions = new ArrayList(); + result.put(serverName, serverRegions); + } + serverRegions.add(node.getRegionInfo()); + } + public Map getRegionAssignments() { final HashMap assignments = new HashMap(); for (RegionStateNode node: regionsMap.values()) { @@ -1127,6 +1143,26 @@ public class RegionStates { return serverNode; } + public boolean isReplicaAvailableForRegion(final RegionInfo info) { + // if the region info itself is a replica return true. + if (!RegionReplicaUtil.isDefaultReplica(info)) { + return true; + } + // iterate the regionsMap for the given region name. If there are replicas it should + // list them in order. + for (RegionStateNode node : regionsMap.tailMap(info.getRegionName()).values()) { + if (!node.getTable().equals(info.getTable()) + || !ServerRegionReplicaUtil.isReplicasForSameRegion(info, node.getRegionInfo())) { + break; + } else if (!RegionReplicaUtil.isDefaultReplica(node.getRegionInfo())) { + // we have replicas + return true; + } + } + // we don have replicas + return false; + } + public ServerStateNode removeRegionFromServer(final ServerName serverName, final RegionStateNode regionNode) { ServerStateNode serverNode = getOrCreateServer(serverName); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 0f6e348..114aee4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -34,6 +34,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.function.Predicate; import java.util.stream.Collectors; + import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterMetrics; @@ -51,15 +52,14 @@ import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Joiner; import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Sets; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The base class for load balancers. It provides the the functions used to by @@ -741,7 +741,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer { int region = regionsToIndex.get(regionInfo); int primary = regionIndexToPrimaryIndex[region]; - // there is a subset relation for server < host < rack // check server first @@ -1262,7 +1261,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { return assignments; } - Cluster cluster = createCluster(servers, regions); + Cluster cluster = createCluster(servers, regions, false); List unassignedRegions = new ArrayList<>(); roundRobinAssignment(cluster, regions, unassignedRegions, @@ -1318,12 +1317,19 @@ public abstract class BaseLoadBalancer implements LoadBalancer { return assignments; } - protected Cluster createCluster(List servers, Collection regions) { + protected Cluster createCluster(List servers, Collection regions, + boolean hasRegionReplica) { // Get the snapshot of the current assignments for the regions in question, and then create // a cluster out of it. Note that we might have replicas already assigned to some servers // earlier. So we want to get the snapshot to see those assignments, but this will only contain // replicas of the regions that are passed (for performance). - Map> clusterState = getRegionAssignmentsByServer(regions); + Map> clusterState = null; + if (!hasRegionReplica) { + clusterState = getRegionAssignmentsByServer(regions); + } else { + // for the case where we have region replica it is better we get the entire cluster's snapshot + clusterState = getRegionAssignmentsByServer(null); + } for (ServerName server : servers) { if (!clusterState.containsKey(server)) { @@ -1372,7 +1378,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { final List finalServers = idleServers.isEmpty() ? servers : idleServers; List regions = Lists.newArrayList(regionInfo); - Cluster cluster = createCluster(finalServers, regions); + Cluster cluster = createCluster(finalServers, regions, false); return randomAssignment(cluster, regionInfo, finalServers); } @@ -1444,10 +1450,18 @@ public abstract class BaseLoadBalancer implements LoadBalancer { int numRandomAssignments = 0; int numRetainedAssigments = 0; - + boolean hasRegionReplica = false; for (Map.Entry entry : regions.entrySet()) { RegionInfo region = entry.getKey(); ServerName oldServerName = entry.getValue(); + // In the current set of regions even if one has region replica let us go with + // getting the entire snapshot + if (this.services != null && this.services.getAssignmentManager() != null) { // for tests + if (!hasRegionReplica && this.services.getAssignmentManager().getRegionStates() + .isReplicaAvailableForRegion(region)) { + hasRegionReplica = true; + } + } List localServers = new ArrayList<>(); if (oldServerName != null) { localServers = serversByHostname.get(oldServerName.getHostnameLowerCase()); @@ -1487,7 +1501,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { // If servers from prior assignment aren't present, then lets do randomAssignment on regions. if (randomAssignRegions.size() > 0) { - Cluster cluster = createCluster(servers, regions.keySet()); + Cluster cluster = createCluster(servers, regions.keySet(), hasRegionReplica); for (Map.Entry> entry : assignments.entrySet()) { ServerName sn = entry.getKey(); for (RegionInfo region : entry.getValue()) { @@ -1497,7 +1511,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer { for (RegionInfo region : randomAssignRegions) { ServerName target = randomAssignment(cluster, region, servers); assignments.get(target).add(region); - cluster.doAssignRegion(region, target); numRandomAssignments++; } } @@ -1548,12 +1561,29 @@ public abstract class BaseLoadBalancer implements LoadBalancer { ServerName sn = null; final int maxIterations = numServers * 4; int iterations = 0; - + List usedSNs = new ArrayList<>(servers.size()); do { int i = RANDOM.nextInt(numServers); sn = servers.get(i); + if (!usedSNs.contains(sn)) { + usedSNs.add(sn); + } } while (cluster.wouldLowerAvailability(regionInfo, sn) && iterations++ < maxIterations); + if (iterations >= maxIterations) { + // We have reached the max. Means the servers that we collected is still lowering the + // availability + for (ServerName unusedServer : servers) { + if (!usedSNs.contains(unusedServer)) { + // check if any other unused server is there for us to use. + // If so use it. Else we have not other go but to go with one of them + if (!cluster.wouldLowerAvailability(regionInfo, unusedServer)) { + sn = unusedServer; + break; + } + } + } + } cluster.doAssignRegion(regionInfo, sn); return sn; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index efa55fd..0de34a9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -1345,7 +1345,22 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { */ public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys) throws IOException { - return createTable(tableName, families, splitKeys, new Configuration(getConfiguration())); + return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration())); + } + + /** + * Create a table. + * @param tableName the table name + * @param families the families + * @param splitKeys the splitkeys + * @param replicaCount the region replica count + * @return A Table instance for the created table. + * @throws IOException throws IOException + */ + public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, + int replicaCount) throws IOException { + return createTable(tableName, families, splitKeys, replicaCount, + new Configuration(getConfiguration())); } public Table createTable(TableName tableName, byte[][] families, @@ -1436,16 +1451,19 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { /** * Create a table. - * @param tableName - * @param families - * @param splitKeys + * @param tableName the table name + * @param families the families + * @param splitKeys the split keys + * @param replicaCount the replica count * @param c Configuration to use * @return A Table instance for the created table. * @throws IOException */ public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, - final Configuration c) throws IOException { - return createTable(new HTableDescriptor(tableName), families, splitKeys, c); + int replicaCount, final Configuration c) throws IOException { + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.setRegionReplication(replicaCount); + return createTable(htd, families, splitKeys, c); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java index 9f7fafe..ca645ab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.master.procedure; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -52,7 +54,7 @@ public class TestServerCrashProcedure { private static final Logger LOG = LoggerFactory.getLogger(TestServerCrashProcedure.class); - private HBaseTestingUtility util; + protected HBaseTestingUtility util; private ProcedureMetrics serverCrashProcMetrics; private long serverCrashSubmittedCount = 0; @@ -68,13 +70,17 @@ public class TestServerCrashProcedure { public void setup() throws Exception { this.util = new HBaseTestingUtility(); setupConf(this.util.getConfiguration()); - this.util.startMiniCluster(3); + startMiniCluster(); ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate( this.util.getHBaseCluster().getMaster().getMasterProcedureExecutor(), false); serverCrashProcMetrics = this.util.getHBaseCluster().getMaster().getMasterMetrics() .getServerCrashProcMetrics(); } + protected void startMiniCluster() throws Exception { + this.util.startMiniCluster(3); + } + @After public void tearDown() throws Exception { MiniHBaseCluster cluster = this.util.getHBaseCluster(); @@ -113,11 +119,9 @@ public class TestServerCrashProcedure { */ private void testRecoveryAndDoubleExecution(boolean carryingMeta, boolean doubleExecution) throws Exception { - final TableName tableName = TableName.valueOf( - "testRecoveryAndDoubleExecution-carryingMeta-" + carryingMeta); - final Table t = this.util.createTable(tableName, HBaseTestingUtility.COLUMNS, - HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); - try { + final TableName tableName = TableName.valueOf("testRecoveryAndDoubleExecution-carryingMeta-" + + carryingMeta + "-doubleExecution-" + doubleExecution); + try (Table t = createTable(tableName)) { // Load the table with a bit of data so some logs to split and some edits in each region. this.util.loadTable(t, HBaseTestingUtility.COLUMNS[0]); final int count = util.countRows(t); @@ -151,17 +155,25 @@ public class TestServerCrashProcedure { long procId = getSCPProcId(procExec); ProcedureTestingUtility.waitProcedure(procExec, procId); } - // Assert all data came back. + assertReplicaDistributed(t); assertEquals(count, util.countRows(t)); assertEquals(checksum, util.checksumRows(t)); - } catch(Throwable throwable) { + } catch (Throwable throwable) { LOG.error("Test failed!", throwable); throw throwable; - } finally { - t.close(); } } + protected void assertReplicaDistributed(final Table t) { + return; + } + + protected Table createTable(final TableName tableName) throws IOException { + final Table t = this.util.createTable(tableName, HBaseTestingUtility.COLUMNS, + HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + return t; + } + private void collectMasterMetrics() { serverCrashSubmittedCount = serverCrashProcMetrics.getSubmittedCounter().getCount(); serverCrashFailedCount = serverCrashProcMetrics.getFailedCounter().getCount();