diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java index 059f07e..5c70ecf 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java @@ -93,11 +93,17 @@ @Override public void setConf(Configuration conf) { this.config = conf; + if(internalBalancer != null) { + internalBalancer.setConf(conf); + } } @Override public void setClusterMetrics(ClusterMetrics sm) { this.clusterStatus = sm; + if (internalBalancer != null) { + internalBalancer.setClusterMetrics(sm); + } } @Override @@ -358,8 +364,7 @@ } private Pair>, List> correctAssignments( - Map> existingAssignments) - throws HBaseIOException{ + Map> existingAssignments) throws HBaseIOException{ // To return Map> correctAssignments = new TreeMap<>(); List regionPlansForMisplacedRegions = new ArrayList<>(); @@ -423,7 +428,9 @@ StochasticLoadBalancer.class, LoadBalancer.class); internalBalancer = ReflectionUtils.newInstance(balancerKlass, config); internalBalancer.setMasterServices(masterServices); - internalBalancer.setClusterMetrics(clusterStatus); + if(clusterStatus != null) { + internalBalancer.setClusterMetrics(clusterStatus); + } internalBalancer.setConf(config); internalBalancer.initialize(); } diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java new file mode 100644 index 0000000..9838ccf --- /dev/null +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/RSGroupableBalancerTestBase.java @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableDescriptors; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; +import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; +import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager; +import org.apache.hadoop.hbase.util.Bytes; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + +/** + * Class used to be the base of unit tests on RSGroupableBalancer. + */ +public class RSGroupableBalancerTestBase { + + static SecureRandom rand = new SecureRandom(); + static String[] groups = new String[] {RSGroupInfo.DEFAULT_GROUP, "dg2", "dg3", "dg4"}; + static TableName table0 = TableName.valueOf("dt0"); + static TableName[] tables = + new TableName[] { TableName.valueOf("dt1"), + TableName.valueOf("dt2"), + TableName.valueOf("dt3"), + TableName.valueOf("dt4")}; + static List servers; + static Map groupMap; + static Map tableMap = new HashMap<>(); + static List tableDescs; + int[] regionAssignment = new int[] { 2, 5, 7, 10, 4, 3, 1 }; + static int regionId = 0; + + /** + * Invariant is that all servers of a group have load between floor(avg) and + * ceiling(avg) number of regions. + */ + protected void assertClusterAsBalanced( + ArrayListMultimap groupLoadMap) { + for (String gName : groupLoadMap.keySet()) { + List groupLoad = groupLoadMap.get(gName); + int numServers = groupLoad.size(); + int numRegions = 0; + int maxRegions = 0; + int minRegions = Integer.MAX_VALUE; + for (ServerAndLoad server : groupLoad) { + int nr = server.getLoad(); + if (nr > maxRegions) { + maxRegions = nr; + } + if (nr < minRegions) { + minRegions = nr; + } + numRegions += nr; + } + if (maxRegions - minRegions < 2) { + // less than 2 between max and min, can't balance + return; + } + int min = numRegions / numServers; + int max = numRegions % numServers == 0 ? min : min + 1; + + for (ServerAndLoad server : groupLoad) { + assertTrue(server.getLoad() <= max); + assertTrue(server.getLoad() >= min); + } + } + } + + /** + * All regions have an assignment. + */ + protected void assertImmediateAssignment(List regions, + List servers, + Map assignments) + throws IOException { + for (RegionInfo region : regions) { + assertTrue(assignments.containsKey(region)); + ServerName server = assignments.get(region); + TableName tableName = region.getTable(); + + String groupName = getMockedGroupInfoManager().getRSGroupOfTable(tableName); + assertTrue(StringUtils.isNotEmpty(groupName)); + RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup(groupName); + assertTrue("Region is not correctly assigned to group servers.", + gInfo.containsServer(server.getAddress())); + } + } + + /** + * Asserts a valid retained assignment plan. + *

+ * Must meet the following conditions: + *

    + *
  • Every input region has an assignment, and to an online server + *
  • If a region had an existing assignment to a server with the same + * address a a currently online server, it will be assigned to it + *
+ */ + protected void assertRetainedAssignment( + Map existing, List servers, + Map> assignment) + throws FileNotFoundException, IOException { + // Verify condition 1, every region assigned, and to online server + Set onlineServerSet = new TreeSet<>(servers); + Set assignedRegions = new TreeSet<>(RegionInfo.COMPARATOR); + for (Map.Entry> a : assignment.entrySet()) { + assertTrue( + "Region assigned to server that was not listed as online", + onlineServerSet.contains(a.getKey())); + for (RegionInfo r : a.getValue()) { + assignedRegions.add(r); + } + } + assertEquals(existing.size(), assignedRegions.size()); + + // Verify condition 2, every region must be assigned to correct server. + Set onlineHostNames = new TreeSet<>(); + for (ServerName s : servers) { + onlineHostNames.add(s.getHostname()); + } + + for (Map.Entry> a : assignment.entrySet()) { + ServerName currentServer = a.getKey(); + for (RegionInfo r : a.getValue()) { + ServerName oldAssignedServer = existing.get(r); + TableName tableName = r.getTable(); + String groupName = + getMockedGroupInfoManager().getRSGroupOfTable(tableName); + assertTrue(StringUtils.isNotEmpty(groupName)); + RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup( + groupName); + assertTrue( + "Region is not correctly assigned to group servers.", + gInfo.containsServer(currentServer.getAddress())); + if (oldAssignedServer != null + && onlineHostNames.contains(oldAssignedServer + .getHostname())) { + // this region was previously assigned somewhere, and that + // host is still around, then the host must have been is a + // different group. + if (!oldAssignedServer.getAddress().equals(currentServer.getAddress())) { + assertFalse(gInfo.containsServer(oldAssignedServer.getAddress())); + } + } + } + } + } + + protected String printStats( + ArrayListMultimap groupBasedLoad) { + StringBuilder sb = new StringBuilder(); + sb.append("\n"); + for (String groupName : groupBasedLoad.keySet()) { + sb.append("Stats for group: " + groupName); + sb.append("\n"); + sb.append(groupMap.get(groupName).getServers()); + sb.append("\n"); + List groupLoad = groupBasedLoad.get(groupName); + int numServers = groupLoad.size(); + int totalRegions = 0; + sb.append("Per Server Load: \n"); + for (ServerAndLoad sLoad : groupLoad) { + sb.append("Server :" + sLoad.getServerName() + " Load : " + + sLoad.getLoad() + "\n"); + totalRegions += sLoad.getLoad(); + } + sb.append(" Group Statistics : \n"); + float average = (float) totalRegions / numServers; + int max = (int) Math.ceil(average); + int min = (int) Math.floor(average); + sb.append("[srvr=" + numServers + " rgns=" + totalRegions + " avg=" + + average + " max=" + max + " min=" + min + "]"); + sb.append("\n"); + sb.append("==============================="); + sb.append("\n"); + } + return sb.toString(); + } + + protected ArrayListMultimap convertToGroupBasedMap( + final Map> serversMap) throws IOException { + ArrayListMultimap loadMap = ArrayListMultimap + .create(); + for (RSGroupInfo gInfo : getMockedGroupInfoManager().listRSGroups()) { + Set
groupServers = gInfo.getServers(); + for (Address hostPort : groupServers) { + ServerName actual = null; + for(ServerName entry: servers) { + if(entry.getAddress().equals(hostPort)) { + actual = entry; + break; + } + } + List regions = serversMap.get(actual); + assertTrue("No load for " + actual, regions != null); + loadMap.put(gInfo.getName(), + new ServerAndLoad(actual, regions.size())); + } + } + return loadMap; + } + + protected ArrayListMultimap reconcile( + ArrayListMultimap previousLoad, + List plans) { + ArrayListMultimap result = ArrayListMultimap + .create(); + result.putAll(previousLoad); + if (plans != null) { + for (RegionPlan plan : plans) { + ServerName source = plan.getSource(); + updateLoad(result, source, -1); + ServerName destination = plan.getDestination(); + updateLoad(result, destination, +1); + } + } + return result; + } + + protected void updateLoad( + ArrayListMultimap previousLoad, + final ServerName sn, final int diff) { + for (String groupName : previousLoad.keySet()) { + ServerAndLoad newSAL = null; + ServerAndLoad oldSAL = null; + for (ServerAndLoad sal : previousLoad.get(groupName)) { + if (ServerName.isSameAddress(sn, sal.getServerName())) { + oldSAL = sal; + newSAL = new ServerAndLoad(sn, sal.getLoad() + diff); + break; + } + } + if (newSAL != null) { + previousLoad.remove(groupName, oldSAL); + previousLoad.put(groupName, newSAL); + break; + } + } + } + + protected Map> mockClusterServers() throws IOException { + assertTrue(servers.size() == regionAssignment.length); + Map> assignment = new TreeMap<>(); + for (int i = 0; i < servers.size(); i++) { + int numRegions = regionAssignment[i]; + List regions = assignedRegions(numRegions, servers.get(i)); + assignment.put(servers.get(i), regions); + } + return assignment; + } + + /** + * Generate a list of regions evenly distributed between the tables. + * + * @param numRegions The number of regions to be generated. + * @return List of RegionInfo. + */ + protected List randomRegions(int numRegions) { + List regions = new ArrayList<>(numRegions); + byte[] start = new byte[16]; + byte[] end = new byte[16]; + rand.nextBytes(start); + rand.nextBytes(end); + int regionIdx = rand.nextInt(tables.length); + for (int i = 0; i < numRegions; i++) { + Bytes.putInt(start, 0, numRegions << 1); + Bytes.putInt(end, 0, (numRegions << 1) + 1); + int tableIndex = (i + regionIdx) % tables.length; + regions.add(RegionInfoBuilder.newBuilder(tables[tableIndex]) + .setStartKey(start) + .setEndKey(end) + .setSplit(false) + .setRegionId(regionId++) + .build()); + } + return regions; + } + + /** + * Generate assigned regions to a given server using group information. + * + * @param numRegions the num regions to generate + * @param sn the servername + * @return the list of regions + * @throws java.io.IOException Signals that an I/O exception has occurred. + */ + protected List assignedRegions(int numRegions, ServerName sn) throws IOException { + List regions = new ArrayList<>(numRegions); + byte[] start = new byte[16]; + byte[] end = new byte[16]; + Bytes.putInt(start, 0, numRegions << 1); + Bytes.putInt(end, 0, (numRegions << 1) + 1); + for (int i = 0; i < numRegions; i++) { + TableName tableName = getTableName(sn); + regions.add(RegionInfoBuilder.newBuilder(tableName) + .setStartKey(start) + .setEndKey(end) + .setSplit(false) + .setRegionId(regionId++) + .build()); + } + return regions; + } + + protected static List generateServers(int numServers) { + List servers = new ArrayList<>(numServers); + for (int i = 0; i < numServers; i++) { + String host = "server" + rand.nextInt(100000); + int port = rand.nextInt(60000); + servers.add(ServerName.valueOf(host, port, -1)); + } + return servers; + } + + /** + * Construct group info, with each group having at least one server. + * + * @param servers the servers + * @param groups the groups + * @return the map + */ + protected static Map constructGroupInfo( + List servers, String[] groups) { + assertTrue(servers != null); + assertTrue(servers.size() >= groups.length); + int index = 0; + Map groupMap = new HashMap<>(); + for (String grpName : groups) { + RSGroupInfo RSGroupInfo = new RSGroupInfo(grpName); + RSGroupInfo.addServer(servers.get(index).getAddress()); + groupMap.put(grpName, RSGroupInfo); + index++; + } + while (index < servers.size()) { + int grpIndex = rand.nextInt(groups.length); + groupMap.get(groups[grpIndex]).addServer( + servers.get(index).getAddress()); + index++; + } + return groupMap; + } + + /** + * Construct table descriptors evenly distributed between the groups. + * + * @return the list + */ + protected static List constructTableDesc(boolean hasBogusTable) { + List tds = Lists.newArrayList(); + int index = rand.nextInt(groups.length); + for (int i = 0; i < tables.length; i++) { + HTableDescriptor htd = new HTableDescriptor(tables[i]); + int grpIndex = (i + index) % groups.length ; + String groupName = groups[grpIndex]; + tableMap.put(tables[i], groupName); + tds.add(htd); + } + if (hasBogusTable) { + tableMap.put(table0, ""); + tds.add(new HTableDescriptor(table0)); + } + return tds; + } + + protected static MasterServices getMockedMaster() throws IOException { + TableDescriptors tds = Mockito.mock(TableDescriptors.class); + Mockito.when(tds.get(tables[0])).thenReturn(tableDescs.get(0)); + Mockito.when(tds.get(tables[1])).thenReturn(tableDescs.get(1)); + Mockito.when(tds.get(tables[2])).thenReturn(tableDescs.get(2)); + Mockito.when(tds.get(tables[3])).thenReturn(tableDescs.get(3)); + MasterServices services = Mockito.mock(HMaster.class); + Mockito.when(services.getTableDescriptors()).thenReturn(tds); + AssignmentManager am = Mockito.mock(AssignmentManager.class); + Mockito.when(services.getAssignmentManager()).thenReturn(am); + return services; + } + + protected static RSGroupInfoManager getMockedGroupInfoManager() throws IOException { + RSGroupInfoManager gm = Mockito.mock(RSGroupInfoManager.class); + Mockito.when(gm.getRSGroup(Mockito.any())).thenAnswer(new Answer() { + @Override + public RSGroupInfo answer(InvocationOnMock invocation) throws Throwable { + return groupMap.get(invocation.getArgument(0)); + } + }); + Mockito.when(gm.listRSGroups()).thenReturn( + Lists.newLinkedList(groupMap.values())); + Mockito.when(gm.isOnline()).thenReturn(true); + Mockito.when(gm.getRSGroupOfTable(Mockito.any())) + .thenAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + return tableMap.get(invocation.getArgument(0)); + } + }); + return gm; + } + + protected TableName getTableName(ServerName sn) throws IOException { + TableName tableName = null; + RSGroupInfoManager gm = getMockedGroupInfoManager(); + RSGroupInfo groupOfServer = null; + for(RSGroupInfo gInfo : gm.listRSGroups()){ + if(gInfo.containsServer(sn.getAddress())){ + groupOfServer = gInfo; + break; + } + } + + for(HTableDescriptor desc : tableDescs){ + if(gm.getRSGroupOfTable(desc.getTableName()).endsWith(groupOfServer.getName())){ + tableName = desc.getTableName(); + } + } + return tableName; + } +} diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java index 0ef0a01..2b82f40 100644 --- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java @@ -21,9 +21,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.security.SecureRandom; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -32,75 +29,44 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; -import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.LoadBalancer; -import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionPlan; -import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer; import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; -import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Bytes; + import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; -//TODO use stochastic based load balancer instead @Category(SmallTests.class) -public class TestRSGroupBasedLoadBalancer { - +public class TestRSGroupBasedLoadBalancer extends RSGroupableBalancerTestBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestRSGroupBasedLoadBalancer.class); - private static final Logger LOG = LoggerFactory.getLogger(TestRSGroupBasedLoadBalancer.class); private static RSGroupBasedLoadBalancer loadBalancer; - private static SecureRandom rand; - - static String[] groups = new String[] { RSGroupInfo.DEFAULT_GROUP, "dg2", "dg3", "dg4" }; - static TableName table0 = TableName.valueOf("dt0"); - static TableName[] tables = - new TableName[] { TableName.valueOf("dt1"), - TableName.valueOf("dt2"), - TableName.valueOf("dt3"), - TableName.valueOf("dt4")}; - static List servers; - static Map groupMap; - static Map tableMap; - static List tableDescs; - int[] regionAssignment = new int[] { 2, 5, 7, 10, 4, 3, 1 }; - static int regionId = 0; @BeforeClass public static void beforeAllTests() throws Exception { - rand = new SecureRandom(); servers = generateServers(7); groupMap = constructGroupInfo(servers, groups); - tableMap = new HashMap<>(); - tableDescs = constructTableDesc(); + tableDescs = constructTableDesc(true); Configuration conf = HBaseConfiguration.create(); conf.set("hbase.regions.slop", "0"); conf.set("hbase.rsgroup.grouploadbalancer.class", SimpleLoadBalancer.class.getCanonicalName()); @@ -127,62 +93,6 @@ list, plans); LOG.info("Mock Balance : " + printStats(balancedCluster)); assertClusterAsBalanced(balancedCluster); - } - - /** - * Invariant is that all servers of a group have load between floor(avg) and - * ceiling(avg) number of regions. - */ - private void assertClusterAsBalanced( - ArrayListMultimap groupLoadMap) { - for (String gName : groupLoadMap.keySet()) { - List groupLoad = groupLoadMap.get(gName); - int numServers = groupLoad.size(); - int numRegions = 0; - int maxRegions = 0; - int minRegions = Integer.MAX_VALUE; - for (ServerAndLoad server : groupLoad) { - int nr = server.getLoad(); - if (nr > maxRegions) { - maxRegions = nr; - } - if (nr < minRegions) { - minRegions = nr; - } - numRegions += nr; - } - if (maxRegions - minRegions < 2) { - // less than 2 between max and min, can't balance - return; - } - int min = numRegions / numServers; - int max = numRegions % numServers == 0 ? min : min + 1; - - for (ServerAndLoad server : groupLoad) { - assertTrue(server.getLoad() <= max); - assertTrue(server.getLoad() >= min); - } - } - } - - /** - * All regions have an assignment. - */ - private void assertImmediateAssignment(List regions, - List servers, - Map assignments) - throws IOException { - for (RegionInfo region : regions) { - assertTrue(assignments.containsKey(region)); - ServerName server = assignments.get(region); - TableName tableName = region.getTable(); - - String groupName = getMockedGroupInfoManager().getRSGroupOfTable(tableName); - assertTrue(StringUtils.isNotEmpty(groupName)); - RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup(groupName); - assertTrue("Region is not correctly assigned to group servers.", - gInfo.containsServer(server.getAddress())); - } } /** @@ -235,6 +145,7 @@ Set misplacedRegions = loadBalancer.getMisplacedRegions(inputForTest); assertFalse(misplacedRegions.contains(ri)); } + /** * Test the cluster startup bulk assignment which attempts to retain assignment info. */ @@ -283,334 +194,5 @@ Map> assignments = loadBalancer .roundRobinAssignment(regions, onlineServers); assertEquals(bogusRegion, assignments.get(LoadBalancer.BOGUS_SERVER_NAME).size()); - } - - /** - * Asserts a valid retained assignment plan. - *

- * Must meet the following conditions: - *

    - *
  • Every input region has an assignment, and to an online server - *
  • If a region had an existing assignment to a server with the same - * address a a currently online server, it will be assigned to it - *
- */ - private void assertRetainedAssignment( - Map existing, List servers, - Map> assignment) - throws FileNotFoundException, IOException { - // Verify condition 1, every region assigned, and to online server - Set onlineServerSet = new TreeSet<>(servers); - Set assignedRegions = new TreeSet<>(RegionInfo.COMPARATOR); - for (Map.Entry> a : assignment.entrySet()) { - assertTrue( - "Region assigned to server that was not listed as online", - onlineServerSet.contains(a.getKey())); - for (RegionInfo r : a.getValue()) { - assignedRegions.add(r); - } - } - assertEquals(existing.size(), assignedRegions.size()); - - // Verify condition 2, every region must be assigned to correct server. - Set onlineHostNames = new TreeSet<>(); - for (ServerName s : servers) { - onlineHostNames.add(s.getHostname()); - } - - for (Map.Entry> a : assignment.entrySet()) { - ServerName currentServer = a.getKey(); - for (RegionInfo r : a.getValue()) { - ServerName oldAssignedServer = existing.get(r); - TableName tableName = r.getTable(); - String groupName = - getMockedGroupInfoManager().getRSGroupOfTable(tableName); - assertTrue(StringUtils.isNotEmpty(groupName)); - RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup( - groupName); - assertTrue( - "Region is not correctly assigned to group servers.", - gInfo.containsServer(currentServer.getAddress())); - if (oldAssignedServer != null - && onlineHostNames.contains(oldAssignedServer - .getHostname())) { - // this region was previously assigned somewhere, and that - // host is still around, then the host must have been is a - // different group. - if (!oldAssignedServer.getAddress().equals(currentServer.getAddress())) { - assertFalse(gInfo.containsServer(oldAssignedServer.getAddress())); - } - } - } - } - } - - private String printStats( - ArrayListMultimap groupBasedLoad) { - StringBuffer sb = new StringBuffer(); - sb.append("\n"); - for (String groupName : groupBasedLoad.keySet()) { - sb.append("Stats for group: " + groupName); - sb.append("\n"); - sb.append(groupMap.get(groupName).getServers()); - sb.append("\n"); - List groupLoad = groupBasedLoad.get(groupName); - int numServers = groupLoad.size(); - int totalRegions = 0; - sb.append("Per Server Load: \n"); - for (ServerAndLoad sLoad : groupLoad) { - sb.append("Server :" + sLoad.getServerName() + " Load : " - + sLoad.getLoad() + "\n"); - totalRegions += sLoad.getLoad(); - } - sb.append(" Group Statistics : \n"); - float average = (float) totalRegions / numServers; - int max = (int) Math.ceil(average); - int min = (int) Math.floor(average); - sb.append("[srvr=" + numServers + " rgns=" + totalRegions + " avg=" - + average + " max=" + max + " min=" + min + "]"); - sb.append("\n"); - sb.append("==============================="); - sb.append("\n"); - } - return sb.toString(); - } - - private ArrayListMultimap convertToGroupBasedMap( - final Map> serversMap) throws IOException { - ArrayListMultimap loadMap = ArrayListMultimap - .create(); - for (RSGroupInfo gInfo : getMockedGroupInfoManager().listRSGroups()) { - Set
groupServers = gInfo.getServers(); - for (Address hostPort : groupServers) { - ServerName actual = null; - for(ServerName entry: servers) { - if(entry.getAddress().equals(hostPort)) { - actual = entry; - break; - } - } - List regions = serversMap.get(actual); - assertTrue("No load for " + actual, regions != null); - loadMap.put(gInfo.getName(), - new ServerAndLoad(actual, regions.size())); - } - } - return loadMap; - } - - private ArrayListMultimap reconcile( - ArrayListMultimap previousLoad, - List plans) { - ArrayListMultimap result = ArrayListMultimap - .create(); - result.putAll(previousLoad); - if (plans != null) { - for (RegionPlan plan : plans) { - ServerName source = plan.getSource(); - updateLoad(result, source, -1); - ServerName destination = plan.getDestination(); - updateLoad(result, destination, +1); - } - } - return result; - } - - private void updateLoad( - ArrayListMultimap previousLoad, - final ServerName sn, final int diff) { - for (String groupName : previousLoad.keySet()) { - ServerAndLoad newSAL = null; - ServerAndLoad oldSAL = null; - for (ServerAndLoad sal : previousLoad.get(groupName)) { - if (ServerName.isSameAddress(sn, sal.getServerName())) { - oldSAL = sal; - newSAL = new ServerAndLoad(sn, sal.getLoad() + diff); - break; - } - } - if (newSAL != null) { - previousLoad.remove(groupName, oldSAL); - previousLoad.put(groupName, newSAL); - break; - } - } - } - - private Map> mockClusterServers() throws IOException { - assertTrue(servers.size() == regionAssignment.length); - Map> assignment = new TreeMap<>(); - for (int i = 0; i < servers.size(); i++) { - int numRegions = regionAssignment[i]; - List regions = assignedRegions(numRegions, servers.get(i)); - assignment.put(servers.get(i), regions); - } - return assignment; - } - - /** - * Generate a list of regions evenly distributed between the tables. - * - * @param numRegions The number of regions to be generated. - * @return List of RegionInfo. - */ - private List randomRegions(int numRegions) { - List regions = new ArrayList<>(numRegions); - byte[] start = new byte[16]; - byte[] end = new byte[16]; - rand.nextBytes(start); - rand.nextBytes(end); - int regionIdx = rand.nextInt(tables.length); - for (int i = 0; i < numRegions; i++) { - Bytes.putInt(start, 0, numRegions << 1); - Bytes.putInt(end, 0, (numRegions << 1) + 1); - int tableIndex = (i + regionIdx) % tables.length; - regions.add(RegionInfoBuilder.newBuilder(tables[tableIndex]) - .setStartKey(start) - .setEndKey(end) - .setSplit(false) - .setRegionId(regionId++) - .build()); - } - return regions; - } - - /** - * Generate assigned regions to a given server using group information. - * - * @param numRegions the num regions to generate - * @param sn the servername - * @return the list of regions - * @throws java.io.IOException Signals that an I/O exception has occurred. - */ - private List assignedRegions(int numRegions, ServerName sn) throws IOException { - List regions = new ArrayList<>(numRegions); - byte[] start = new byte[16]; - byte[] end = new byte[16]; - Bytes.putInt(start, 0, numRegions << 1); - Bytes.putInt(end, 0, (numRegions << 1) + 1); - for (int i = 0; i < numRegions; i++) { - TableName tableName = getTableName(sn); - regions.add(RegionInfoBuilder.newBuilder(tableName) - .setStartKey(start) - .setEndKey(end) - .setSplit(false) - .setRegionId(regionId++) - .build()); - } - return regions; - } - - private static List generateServers(int numServers) { - List servers = new ArrayList<>(numServers); - for (int i = 0; i < numServers; i++) { - String host = "server" + rand.nextInt(100000); - int port = rand.nextInt(60000); - servers.add(ServerName.valueOf(host, port, -1)); - } - return servers; - } - - /** - * Construct group info, with each group having at least one server. - * - * @param servers the servers - * @param groups the groups - * @return the map - */ - private static Map constructGroupInfo( - List servers, String[] groups) { - assertTrue(servers != null); - assertTrue(servers.size() >= groups.length); - int index = 0; - Map groupMap = new HashMap<>(); - for (String grpName : groups) { - RSGroupInfo RSGroupInfo = new RSGroupInfo(grpName); - RSGroupInfo.addServer(servers.get(index).getAddress()); - groupMap.put(grpName, RSGroupInfo); - index++; - } - while (index < servers.size()) { - int grpIndex = rand.nextInt(groups.length); - groupMap.get(groups[grpIndex]).addServer( - servers.get(index).getAddress()); - index++; - } - return groupMap; - } - - /** - * Construct table descriptors evenly distributed between the groups. - * - * @return the list - */ - private static List constructTableDesc() { - List tds = Lists.newArrayList(); - int index = rand.nextInt(groups.length); - for (int i = 0; i < tables.length; i++) { - HTableDescriptor htd = new HTableDescriptor(tables[i]); - int grpIndex = (i + index) % groups.length ; - String groupName = groups[grpIndex]; - tableMap.put(tables[i], groupName); - tds.add(htd); - } - tableMap.put(table0, ""); - tds.add(new HTableDescriptor(table0)); - return tds; - } - - private static MasterServices getMockedMaster() throws IOException { - TableDescriptors tds = Mockito.mock(TableDescriptors.class); - Mockito.when(tds.get(tables[0])).thenReturn(tableDescs.get(0)); - Mockito.when(tds.get(tables[1])).thenReturn(tableDescs.get(1)); - Mockito.when(tds.get(tables[2])).thenReturn(tableDescs.get(2)); - Mockito.when(tds.get(tables[3])).thenReturn(tableDescs.get(3)); - MasterServices services = Mockito.mock(HMaster.class); - Mockito.when(services.getTableDescriptors()).thenReturn(tds); - AssignmentManager am = Mockito.mock(AssignmentManager.class); - Mockito.when(services.getAssignmentManager()).thenReturn(am); - return services; - } - - private static RSGroupInfoManager getMockedGroupInfoManager() throws IOException { - RSGroupInfoManager gm = Mockito.mock(RSGroupInfoManager.class); - Mockito.when(gm.getRSGroup(groups[0])).thenReturn( - groupMap.get(groups[0])); - Mockito.when(gm.getRSGroup(groups[1])).thenReturn( - groupMap.get(groups[1])); - Mockito.when(gm.getRSGroup(groups[2])).thenReturn( - groupMap.get(groups[2])); - Mockito.when(gm.getRSGroup(groups[3])).thenReturn( - groupMap.get(groups[3])); - Mockito.when(gm.listRSGroups()).thenReturn( - Lists.newLinkedList(groupMap.values())); - Mockito.when(gm.isOnline()).thenReturn(true); - Mockito.when(gm.getRSGroupOfTable(Mockito.any())) - .thenAnswer(new Answer() { - @Override - public String answer(InvocationOnMock invocation) throws Throwable { - return tableMap.get(invocation.getArgument(0)); - } - }); - return gm; - } - - private TableName getTableName(ServerName sn) throws IOException { - TableName tableName = null; - RSGroupInfoManager gm = getMockedGroupInfoManager(); - RSGroupInfo groupOfServer = null; - for(RSGroupInfo gInfo : gm.listRSGroups()){ - if(gInfo.containsServer(sn.getAddress())){ - groupOfServer = gInfo; - break; - } - } - - for(HTableDescriptor desc : tableDescs){ - if(gm.getRSGroupOfTable(desc.getTableName()).endsWith(groupOfServer.getName())){ - tableName = desc.getTableName(); - } - } - return tableName; } } diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer2.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer2.java new file mode 100644 index 0000000..620b602 --- /dev/null +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer2.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterMetrics; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.RegionMetrics; +import org.apache.hadoop.hbase.ServerMetrics; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Size; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer; +import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestRSGroupBasedLoadBalancer2 extends RSGroupableBalancerTestBase { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRSGroupBasedLoadBalancer2.class); + private static RSGroupBasedLoadBalancer loadBalancer; + + @BeforeClass + public static void beforeAllTests() throws Exception { + groups = new String[] { RSGroupInfo.DEFAULT_GROUP }; + servers = generateServers(3); + groupMap = constructGroupInfo(servers, groups); + tableDescs = constructTableDesc(false); + Configuration conf = HBaseConfiguration.create(); + conf.set("hbase.regions.slop", "0"); + conf.setFloat("hbase.master.balancer.stochastic.readRequestCost", 10000f); + conf.set("hbase.rsgroup.grouploadbalancer.class", + StochasticLoadBalancer.class.getCanonicalName()); + loadBalancer = new RSGroupBasedLoadBalancer(); + loadBalancer.setRsGroupInfoManager(getMockedGroupInfoManager()); + loadBalancer.setMasterServices(getMockedMaster()); + loadBalancer.setConf(conf); + loadBalancer.initialize(); + } + + private ServerMetrics mockServerMetricsWithReadRequests(ServerName server, + List regionsOnServer, long readRequestCount) { + ServerMetrics serverMetrics = mock(ServerMetrics.class); + Map regionLoadMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for(RegionInfo info : regionsOnServer){ + RegionMetrics rl = mock(RegionMetrics.class); + when(rl.getReadRequestCount()).thenReturn(readRequestCount); + when(rl.getCpRequestCount()).thenReturn(0L); + when(rl.getWriteRequestCount()).thenReturn(0L); + when(rl.getMemStoreSize()).thenReturn(Size.ZERO); + when(rl.getStoreFileSize()).thenReturn(Size.ZERO); + regionLoadMap.put(info.getEncodedNameAsBytes(), rl); + } + when(serverMetrics.getRegionMetrics()).thenReturn(regionLoadMap); + return serverMetrics; + } + + @Test + public void testBalanceCluster() throws HBaseIOException { + // mock cluster State + Map> clusterState = new HashMap>(); + ServerName serverA = servers.get(0); + ServerName serverB = servers.get(1); + ServerName serverC = servers.get(2); + List regionsOnServerA = randomRegions(3); + List regionsOnServerB = randomRegions(3); + List regionsOnServerC = randomRegions(3); + clusterState.put(serverA, regionsOnServerA); + clusterState.put(serverB, regionsOnServerB); + clusterState.put(serverC, regionsOnServerC); + // mock ClusterMetrics + Map serverMetricsMap = new TreeMap<>(); + serverMetricsMap.put(serverA, mockServerMetricsWithReadRequests(serverA, regionsOnServerA, 0)); + serverMetricsMap.put(serverB, mockServerMetricsWithReadRequests(serverB, regionsOnServerB, 0)); + serverMetricsMap.put(serverC, mockServerMetricsWithReadRequests(serverC, regionsOnServerC, 0)); + ClusterMetrics clusterStatus = mock(ClusterMetrics.class); + when(clusterStatus.getLiveServerMetrics()).thenReturn(serverMetricsMap); + loadBalancer.setClusterMetrics(clusterStatus); + + // ReadRequestCostFunction are Rate based, So doing setClusterMetrics again + // this time, regions on serverA with more readRequestCount load + // serverA : 1000,1000,1000 + // serverB : 0,0,0 + // serverC : 0,0,0 + // so should move two regions from serverA to serverB & serverC + serverMetricsMap = new TreeMap<>(); + serverMetricsMap.put(serverA, mockServerMetricsWithReadRequests(serverA, + regionsOnServerA, 1000)); + serverMetricsMap.put(serverB, mockServerMetricsWithReadRequests(serverB, regionsOnServerB, 0)); + serverMetricsMap.put(serverC, mockServerMetricsWithReadRequests(serverC, regionsOnServerC, 0)); + clusterStatus = mock(ClusterMetrics.class); + when(clusterStatus.getLiveServerMetrics()).thenReturn(serverMetricsMap); + loadBalancer.setClusterMetrics(clusterStatus); + + List plans = loadBalancer.balanceCluster(clusterState); + Set regionsMoveFromServerA = new HashSet<>(); + Set targetServers = new HashSet<>(); + for(RegionPlan plan : plans) { + if(plan.getSource().equals(serverA)) { + regionsMoveFromServerA.add(plan.getRegionInfo()); + targetServers.add(plan.getDestination()); + } + } + // should move 2 regions from serverA, one moves to serverB, the other moves to serverC + assertEquals(2, regionsMoveFromServerA.size()); + assertEquals(2, targetServers.size()); + assertTrue(regionsOnServerA.containsAll(regionsMoveFromServerA)); + } +}