diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 168b2a6..91cfb12 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -3561,7 +3562,8 @@ public class AssignmentManager extends ZooKeeperListener { return this.balancer; } - public Map> getSnapShotOfAssignment(List infos) { + public Map> + getSnapShotOfAssignment(Collection infos) { return getRegionStates().getRegionAssignments(infos); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java index 0f6737b..0b2e2f0 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java @@ -41,6 +41,9 @@ public class RackManager { private DNSToSwitchMapping switchMapping; + public RackManager() { + } + public RackManager(Configuration conf) { switchMapping = ReflectionUtils.instantiateWithCustomCtor( conf.getClass("hbase.util.ip.to.rack.determiner", ScriptBasedMapping.class, diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java index 56eca74..e35c7cd 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -148,7 +149,8 @@ public class RegionStates { * @param regions * @return a pair containing the groupings as a map */ - synchronized Map> getRegionAssignments(List regions) { + synchronized Map> getRegionAssignments( + Collection regions) { Map> map = new HashMap>(); for (HRegionInfo region : regions) { HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(region); @@ -794,6 +796,19 @@ public class RegionStates { return result; } + /** + * Returns a clone of region assignments per server + * @return a Map of ServerName to a List of HRegionInfo's + */ + protected synchronized Map> getRegionAssignmentsByServer() { + Map> regionsByServer = + new HashMap>(serverHoldings.size()); + for (Map.Entry> e: serverHoldings.entrySet()) { + regionsByServer.put(e.getKey(), new ArrayList(e.getValue())); + } + return regionsByServer; + } + protected synchronized RegionState getRegionState(final HRegionInfo hri) { return regionStates.get(hri.getEncodedName()); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 710796b..f379c6f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -26,10 +26,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.NavigableMap; import java.util.Random; import java.util.Set; import java.util.TreeMap; -import java.util.NavigableMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,12 +39,16 @@ import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RackManager; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type; import com.google.common.base.Joiner; import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; /** @@ -57,6 +61,17 @@ public abstract class BaseLoadBalancer implements LoadBalancer { private static final int MIN_SERVER_BALANCE = 2; private volatile boolean stopped = false; + private static final List EMPTY_REGION_LIST = new ArrayList(0); + + protected final RegionLocationFinder regionFinder = new RegionLocationFinder(); + + private static class DefaultRackManager extends RackManager { + @Override + public String getRack(ServerName server) { + return UNKNOWN_RACK; + } + } + /** * An efficient array based implementation similar to ClusterState for keeping * the status of the cluster in terms of region assignment and distribution. @@ -64,74 +79,139 @@ public abstract class BaseLoadBalancer implements LoadBalancer { */ protected static class Cluster { ServerName[] servers; + String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host + String[] racks; + ArrayList tables; HRegionInfo[] regions; Deque[] regionLoads; + int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality + int[] serverIndexToHostIndex; //serverIndex -> host index + int[] serverIndexToRackIndex; //serverIndex -> rack index + int[][] regionsPerServer; //serverIndex -> region list + int[][] regionsPerHost; //hostIndex -> list of regions + int[][] regionsPerRack; //rackIndex -> region list + int[][] regionsByPrimaryPerServer; //serverIndex -> sorted list of regions by primary region index + int[][] regionsByPrimaryPerHost; //hostIndex -> sorted list of regions by primary region index + int[][] regionsByPrimaryPerRack; //rackIndex -> sorted list of regions by primary region index + + int[][] serversPerHost; //hostIndex -> list of server indexes + int[][] serversPerRack; //rackIndex -> list of server indexes int[] regionIndexToServerIndex; //regionIndex -> serverIndex int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state) int[] regionIndexToTableIndex; //regionIndex -> tableIndex int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS + int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary Integer[] serverIndicesSortedByRegionCount; Map serversToIndex; + Map hostsToIndex; + Map racksToIndex; Map tablesToIndex; + Map regionsToIndex; - int numRegions; int numServers; + int numHosts; + int numRacks; int numTables; + int numRegions; int numMovedRegions = 0; //num moved regions from the initial configuration int numMovedMetaRegions = 0; //num of moved regions that are META - protected Cluster(Map> clusterState, Map> loads, - RegionLocationFinder regionFinder) { + protected final RackManager rackManager; + + protected Cluster( + Map> clusterState, + Map> loads, + RegionLocationFinder regionFinder, + RackManager rackManager) { + this(null, clusterState, loads, regionFinder, rackManager); + } + + protected Cluster( + Collection unassignedRegions, + Map> clusterState, + Map> loads, + RegionLocationFinder regionFinder, + RackManager rackManager) { + if (unassignedRegions == null) { + unassignedRegions = EMPTY_REGION_LIST; + } serversToIndex = new HashMap(); + hostsToIndex = new HashMap(); + racksToIndex = new HashMap(); tablesToIndex = new HashMap(); - //regionsToIndex = new HashMap(); //TODO: We should get the list of tables from master tables = new ArrayList(); + this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); - - numRegions = 0; - - int serverIndex = 0; + List> serversPerHostList = new ArrayList>(); + List> serversPerRackList = new ArrayList>(); // Use servername and port as there can be dead servers in this list. We want everything with // a matching hostname and port to have the same index. - for (ServerName sn:clusterState.keySet()) { + for (ServerName sn : clusterState.keySet()) { if (serversToIndex.get(sn.getHostAndPort()) == null) { - serversToIndex.put(sn.getHostAndPort(), serverIndex++); + serversToIndex.put(sn.getHostAndPort(), numServers++); + } + if (!hostsToIndex.containsKey(sn.getHostname())) { + hostsToIndex.put(sn.getHostname(), numHosts++); + serversPerHostList.add(new ArrayList(1)); + } + + int serverIndex = serversToIndex.get(sn.getHostAndPort()); + int hostIndex = hostsToIndex.get(sn.getHostname()); + serversPerHostList.get(hostIndex).add(serverIndex); + + String rack = this.rackManager.getRack(sn); + if (!racksToIndex.containsKey(rack)) { + racksToIndex.put(rack, numRacks++); + serversPerRackList.add(new ArrayList()); } + int rackIndex = racksToIndex.get(rack); + serversPerRackList.get(rackIndex).add(serverIndex); } // Count how many regions there are. for (Entry> entry : clusterState.entrySet()) { numRegions += entry.getValue().size(); } + numRegions += unassignedRegions.size(); - numServers = serversToIndex.size(); - regionsPerServer = new int[serversToIndex.size()][]; - + regionsToIndex = new HashMap(numRegions); servers = new ServerName[numServers]; + serversPerHost = new int[numHosts][]; + serversPerRack = new int[numRacks][]; regions = new HRegionInfo[numRegions]; regionIndexToServerIndex = new int[numRegions]; initialRegionIndexToServerIndex = new int[numRegions]; regionIndexToTableIndex = new int[numRegions]; + regionIndexToPrimaryIndex = new int[numRegions]; regionLoads = new Deque[numRegions]; regionLocations = new int[numRegions][]; serverIndicesSortedByRegionCount = new Integer[numServers]; + serverIndexToHostIndex = new int[numServers]; + serverIndexToRackIndex = new int[numServers]; + regionsPerServer = new int[numServers][]; + regionsPerHost = new int[numHosts][]; + regionsPerRack = new int[numRacks][]; + regionsByPrimaryPerServer = new int[numServers][]; + regionsByPrimaryPerHost = new int[numHosts][]; + regionsByPrimaryPerRack = new int[numRacks][]; + int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0; for (Entry> entry : clusterState.entrySet()) { - serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); + int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); // keep the servername if this is the first server name for this hostname // or this servername has the newest startcode. @@ -141,51 +221,52 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } regionsPerServer[serverIndex] = new int[entry.getValue().size()]; + regionsByPrimaryPerServer[serverIndex] = new int[entry.getValue().size()]; serverIndicesSortedByRegionCount[serverIndex] = serverIndex; } + hosts = new String[numHosts]; + for (Entry entry : hostsToIndex.entrySet()) { + hosts[entry.getValue()] = entry.getKey(); + } + racks = new String[numRacks]; + for (Entry entry : racksToIndex.entrySet()) { + racks[entry.getValue()] = entry.getKey(); + } + for (Entry> entry : clusterState.entrySet()) { - serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); + int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); regionPerServerIndex = 0; + int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); + serverIndexToHostIndex[serverIndex] = hostIndex; + + int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); + serverIndexToRackIndex[serverIndex] = rackIndex; + for (HRegionInfo region : entry.getValue()) { - String tableName = region.getTable().getNameAsString(); - Integer idx = tablesToIndex.get(tableName); - if (idx == null) { - tables.add(tableName); - idx = tableIndex; - tablesToIndex.put(tableName, tableIndex++); - } + registerRegion(region, regionIndex, serverIndex, loads, regionFinder); - regions[regionIndex] = region; - regionIndexToServerIndex[regionIndex] = serverIndex; - initialRegionIndexToServerIndex[regionIndex] = serverIndex; - regionIndexToTableIndex[regionIndex] = idx; regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; + regionIndex++; + } + } + for (HRegionInfo region : unassignedRegions) { + registerRegion(region, regionIndex, -1, loads, regionFinder); + regionIndex++; + } - // region load - if (loads != null) { - Deque rl = loads.get(region.getRegionNameAsString()); - // That could have failed if the RegionLoad is using the other regionName - if (rl == null) { - // Try getting the region load using encoded name. - rl = loads.get(region.getEncodedName()); - } - regionLoads[regionIndex] = rl; - } - - if (regionFinder != null) { - //region location - List loc = regionFinder.getTopBlockLocations(region); - regionLocations[regionIndex] = new int[loc.size()]; - for (int i=0; i < loc.size(); i++) { - regionLocations[regionIndex][i] = - loc.get(i) == null ? -1 : - (serversToIndex.get(loc.get(i)) == null ? -1 : serversToIndex.get(loc.get(i))); - } - } + for (int i = 0; i < serversPerHostList.size(); i++) { + serversPerHost[i] = new int[serversPerHostList.get(i).size()]; + for (int j = 0; j < serversPerHost[i].length; j++) { + serversPerHost[i][j] = serversPerHostList.get(i).get(j); + } + } - regionIndex++; + for (int i = 0; i < serversPerRackList.size(); i++) { + serversPerRack[i] = new int[serversPerRackList.get(i).size()]; + for (int j = 0; j < serversPerRack[i].length; j++) { + serversPerRack[i][j] = serversPerRackList.get(i).get(j); } } @@ -199,59 +280,328 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } for (int i=0; i < regionIndexToServerIndex.length; i++) { - numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++; + if (regionIndexToServerIndex[i] >= 0) { + numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++; + } } numMaxRegionsPerTable = new int[numTables]; - for (serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) { + for (int serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) { for (tableIndex = 0 ; tableIndex < numRegionsPerServerPerTable[serverIndex].length; tableIndex++) { if (numRegionsPerServerPerTable[serverIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) { numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[serverIndex][tableIndex]; } } } + + for (int i = 0; i < regions.length; i ++) { + HRegionInfo info = regions[i]; + if (RegionReplicaUtil.isDefaultReplica(info)) { + regionIndexToPrimaryIndex[i] = i; + } else { + HRegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); + regionIndexToPrimaryIndex[i] = + regionsToIndex.containsKey(primaryInfo) ? + regionsToIndex.get(primaryInfo): + -1; + } + } + + for (int i = 0; i < regionsPerServer.length; i++) { + regionsByPrimaryPerServer[i] = new int[regionsPerServer[i].length]; + for (int j = 0; j < regionsPerServer[i].length; j++) { + int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]]; + regionsByPrimaryPerServer[i][j] = primaryIndex; + } + // sort the regions by primaries. + Arrays.sort(regionsByPrimaryPerServer[i]); + } + + // compute regionsPerHost + for (int i = 0 ; i < serversPerHost.length; i++) { + int numRegionsPerHost = 0; + for (int j = 0; j < serversPerHost[i].length; j++) { + numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length; + } + regionsPerHost[i] = new int[numRegionsPerHost]; + regionsByPrimaryPerHost[i] = new int[numRegionsPerHost]; + } + + for (int i = 0 ; i < serversPerHost.length; i++) { + int numRegionPerHostIndex = 0; + for (int j = 0; j < serversPerHost[i].length; j++) { + for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) { + int region = regionsPerServer[serversPerHost[i][j]][k]; + regionsPerHost[i][numRegionPerHostIndex] = region; + int primaryIndex = regionIndexToPrimaryIndex[region]; + regionsByPrimaryPerHost[i][numRegionPerHostIndex] = primaryIndex; + numRegionPerHostIndex++; + } + } + // sort the regions by primaries. + Arrays.sort(regionsByPrimaryPerHost[i]); + } + + // compute regionsPerRack + for (int i = 0 ; i < serversPerRack.length; i++) { + int numRegionsPerRack = 0; + for (int j = 0; j < serversPerRack[i].length; j++) { + numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length; + } + regionsPerRack[i] = new int[numRegionsPerRack]; + regionsByPrimaryPerRack[i] = new int[numRegionsPerRack]; + } + + for (int i = 0 ; i < serversPerRack.length; i++) { + int numRegionPerRackIndex = 0; + for (int j = 0; j < serversPerRack[i].length; j++) { + for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) { + int region = regionsPerServer[serversPerRack[i][j]][k]; + regionsPerRack[i][numRegionPerRackIndex] = region; + int primaryIndex = regionIndexToPrimaryIndex[region]; + regionsByPrimaryPerRack[i][numRegionPerRackIndex] = primaryIndex; + numRegionPerRackIndex++; + } + } + // sort the regions by primaries. + Arrays.sort(regionsByPrimaryPerRack[i]); + } + } + + /** Helper for Cluster constructor to handle a region */ + private void registerRegion(HRegionInfo region, int regionIndex, int serverIndex, + Map> loads, RegionLocationFinder regionFinder) { + String tableName = region.getTable().getNameAsString(); + if (!tablesToIndex.containsKey(tableName)) { + tables.add(tableName); + tablesToIndex.put(tableName, tablesToIndex.size()); + } + int tableIndex = tablesToIndex.get(tableName); + + regionsToIndex.put(region, regionIndex); + regions[regionIndex] = region; + regionIndexToServerIndex[regionIndex] = serverIndex; + initialRegionIndexToServerIndex[regionIndex] = serverIndex; + regionIndexToTableIndex[regionIndex] = tableIndex; + + // region load + if (loads != null) { + Deque rl = loads.get(region.getRegionNameAsString()); + // That could have failed if the RegionLoad is using the other regionName + if (rl == null) { + // Try getting the region load using encoded name. + rl = loads.get(region.getEncodedName()); + } + regionLoads[regionIndex] = rl; + } + + if (regionFinder != null) { + //region location + List loc = regionFinder.getTopBlockLocations(region); + regionLocations[regionIndex] = new int[loc.size()]; + for (int i=0; i < loc.size(); i++) { + regionLocations[regionIndex][i] = + loc.get(i) == null ? -1 : + (serversToIndex.get(loc.get(i)) == null ? -1 : serversToIndex.get(loc.get(i))); + } + } + } + + /** An action to move or swap a region */ + public static class Action { + public static enum Type { + ASSIGN_REGION, + MOVE_REGION, + SWAP_REGIONS, + NULL, + } + + public Type type; + public Action (Type type) {this.type = type;} + /** Returns an Action which would undo this action */ + public Action undoAction() { return this; } + @Override + public String toString() { return type + ":";} + } + + public static class AssignRegionAction extends Action { + public int region; + public int server; + public AssignRegionAction(int region, int server) { + super(Type.ASSIGN_REGION); + this.region = region; + this.server = server; + } + @Override + public Action undoAction() { + // TODO fix this + return this; + } + @Override + public String toString() { + return type + ": " + region + ":" + server; + } + } + + public static class MoveRegionAction extends Action { + public int region; + public int fromServer; + public int toServer; + + public MoveRegionAction(int region, int fromServer, int toServer) { + super(Type.MOVE_REGION); + this.fromServer = fromServer; + this.region = region; + this.toServer = toServer; + } + @Override + public Action undoAction() { + return new MoveRegionAction (region, toServer, fromServer); + } + @Override + public String toString() { + return type + ": " + region + ":" + fromServer + " -> " + toServer; + } + } + + public static class SwapRegionsAction extends Action { + public int fromServer; + public int fromRegion; + public int toServer; + public int toRegion; + public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) { + super(Type.SWAP_REGIONS); + this.fromServer = fromServer; + this.fromRegion = fromRegion; + this.toServer = toServer; + this.toRegion = toRegion; + } + @Override + public Action undoAction() { + return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion); + } + @Override + public String toString() { + return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer; + } + } + + public static Action NullAction = new Action(Type.NULL); + + public void doAction(Action action) { + switch (action.type) { + case NULL: break; + case ASSIGN_REGION: + AssignRegionAction ar = (AssignRegionAction) action; + regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region); + regionMoved(ar.region, -1, ar.server); + break; + case MOVE_REGION: + MoveRegionAction mra = (MoveRegionAction) action; + regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region); + regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region); + regionMoved(mra.region, mra.fromServer, mra.toServer); + break; + case SWAP_REGIONS: + SwapRegionsAction a = (SwapRegionsAction) action; + regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion); + regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion); + regionMoved(a.fromRegion, a.fromServer, a.toServer); + regionMoved(a.toRegion, a.toServer, a.fromServer); + break; + default: + throw new RuntimeException("Uknown action:" + action.type); + } + } + + /** + * Return true if the placement of region on server would lower the availability + * of the region in question + * @param server + * @param region + * @return true or false + */ + boolean wouldLowerAvailability(HRegionInfo regionInfo, ServerName serverName) { + if (!serversToIndex.containsKey(serverName.getHostAndPort())) { + return false; // safeguard against race between cluster.servers and servers from LB method args + } + int server = serversToIndex.get(serverName.getHostAndPort()); + int region = regionsToIndex.get(regionInfo); + + int primary = regionIndexToPrimaryIndex[region]; + + // there is a subset relation for server < host < rack + // check server first + + if (contains(regionsByPrimaryPerServer[server], primary)) { + // check for whether there are other servers that we can place this region + for (int i = 0; i < regionsByPrimaryPerServer.length; i++) { + if (i != server && !contains(regionsByPrimaryPerServer[i], primary)) { + return true; // meaning there is a better server + } + } + return false; // there is not a better server to place this + } + + // check host + int host = serverIndexToHostIndex[server]; + if (contains(regionsByPrimaryPerHost[host], primary)) { + // check for whether there are other hosts that we can place this region + for (int i = 0; i < regionsByPrimaryPerHost.length; i++) { + if (i != host && !contains(regionsByPrimaryPerHost[i], primary)) { + return true; // meaning there is a better host + } + } + return false; // there is not a better host to place this + } + + // check rack + int rack = serverIndexToRackIndex[server]; + if (contains(regionsByPrimaryPerRack[rack], primary)) { + // check for whether there are other racks that we can place this region + for (int i = 0; i < regionsByPrimaryPerRack.length; i++) { + if (i != rack && !contains(regionsByPrimaryPerRack[i], primary)) { + return true; // meaning there is a better rack + } + } + return false; // there is not a better rack to place this + } + return false; } - public void moveOrSwapRegion(int lServer, int rServer, int lRegion, int rRegion) { - //swap - if (rRegion >= 0 && lRegion >= 0) { - regionMoved(rRegion, rServer, lServer); - regionsPerServer[rServer] = replaceRegion(regionsPerServer[rServer], rRegion, lRegion); - regionMoved(lRegion, lServer, rServer); - regionsPerServer[lServer] = replaceRegion(regionsPerServer[lServer], lRegion, rRegion); - } else if (rRegion >= 0) { //move rRegion - regionMoved(rRegion, rServer, lServer); - regionsPerServer[rServer] = removeRegion(regionsPerServer[rServer], rRegion); - regionsPerServer[lServer] = addRegion(regionsPerServer[lServer], rRegion); - } else if (lRegion >= 0) { //move lRegion - regionMoved(lRegion, lServer, rServer); - regionsPerServer[lServer] = removeRegion(regionsPerServer[lServer], lRegion); - regionsPerServer[rServer] = addRegion(regionsPerServer[rServer], lRegion); + + void doAssignRegion(HRegionInfo regionInfo, ServerName serverName) { + if (!serversToIndex.containsKey(serverName.getHostAndPort())) { + return; } + int server = serversToIndex.get(serverName.getHostAndPort()); + int region = regionsToIndex.get(regionInfo); + doAction(new AssignRegionAction(region, server)); } - /** Region moved out of the server */ - void regionMoved(int regionIndex, int oldServerIndex, int newServerIndex) { - regionIndexToServerIndex[regionIndex] = newServerIndex; - if (initialRegionIndexToServerIndex[regionIndex] == newServerIndex) { + void regionMoved(int region, int oldServer, int newServer) { + regionIndexToServerIndex[region] = newServer; + if (initialRegionIndexToServerIndex[region] == newServer) { numMovedRegions--; //region moved back to original location - if (regions[regionIndex].isMetaRegion()) { + if (regions[region].isMetaRegion()) { numMovedMetaRegions--; } - } else if (initialRegionIndexToServerIndex[regionIndex] == oldServerIndex) { + } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) { numMovedRegions++; //region moved from original location - if (regions[regionIndex].isMetaRegion()) { + if (regions[region].isMetaRegion()) { numMovedMetaRegions++; } } - int tableIndex = regionIndexToTableIndex[regionIndex]; - numRegionsPerServerPerTable[oldServerIndex][tableIndex]--; - numRegionsPerServerPerTable[newServerIndex][tableIndex]++; + int tableIndex = regionIndexToTableIndex[region]; + if (oldServer >= 0) { + numRegionsPerServerPerTable[oldServer][tableIndex]--; + } + numRegionsPerServerPerTable[newServer][tableIndex]++; //check whether this caused maxRegionsPerTable in the new Server to be updated - if (numRegionsPerServerPerTable[newServerIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) { - numRegionsPerServerPerTable[newServerIndex][tableIndex] = numMaxRegionsPerTable[tableIndex]; - } else if ((numRegionsPerServerPerTable[oldServerIndex][tableIndex] + 1) + if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) { + numRegionsPerServerPerTable[newServer][tableIndex] = numMaxRegionsPerTable[tableIndex]; + } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1) == numMaxRegionsPerTable[tableIndex]) { //recompute maxRegionsPerTable since the previous value was coming from the old server for (int serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) { @@ -260,6 +610,41 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } } } + + // update for servers + int primary = regionIndexToPrimaryIndex[region]; + if (oldServer >= 0) { + regionsByPrimaryPerServer[oldServer] = removeRegion( + regionsByPrimaryPerServer[oldServer], primary); + } + regionsByPrimaryPerServer[newServer] = addRegionSorted( + regionsByPrimaryPerServer[newServer], primary); + + // update for hosts + int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1; + int newHost = serverIndexToHostIndex[newServer]; + if (newHost != oldHost) { + regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region); + regionsByPrimaryPerHost[newHost] = addRegionSorted(regionsByPrimaryPerHost[newHost], primary); + if (oldHost >= 0) { + regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region); + regionsByPrimaryPerHost[oldHost] = removeRegion( + regionsByPrimaryPerHost[oldHost], primary); // will still be sorted + } + } + + // update for racks + int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1; + int newRack = serverIndexToRackIndex[newServer]; + if (newRack != oldRack) { + regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region); + regionsByPrimaryPerRack[newRack] = addRegionSorted(regionsByPrimaryPerRack[newRack], primary); + if (oldRack >= 0) { + regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region); + regionsByPrimaryPerRack[oldRack] = removeRegion( + regionsByPrimaryPerRack[oldRack], primary); // will still be sorted + } + } } int[] removeRegion(int[] regions, int regionIndex) { @@ -283,6 +668,24 @@ public abstract class BaseLoadBalancer implements LoadBalancer { return newRegions; } + int[] addRegionSorted(int[] regions, int regionIndex) { + int[] newRegions = new int[regions.length + 1]; + int i = 0; + for (i = 0; i < regions.length; i++) { + if (regions[i] > regionIndex) { + newRegions[i] = regionIndex; + break; + } + newRegions[i] = regions[i]; + } + if (i == regions.length) { + newRegions[i] = regionIndex; + } else { + System.arraycopy(regions, i, newRegions, i+1, regions.length - i); + } + return newRegions; + } + int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) { int i = 0; for (i = 0; i < regions.length; i++) { @@ -302,6 +705,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer { return regionsPerServer[server].length; } + boolean contains(int[] arr, int val) { + return Arrays.binarySearch(arr, val) >= 0; + } + private Comparator numRegionsComparator = new Comparator() { @Override public int compare(Integer integer, Integer integer2) { @@ -345,6 +752,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { // slop for regions protected float slop; private Configuration config; + protected RackManager rackManager; private static final Random RANDOM = new Random(System.currentTimeMillis()); private static final Log LOG = LogFactory.getLog(BaseLoadBalancer.class); @@ -358,6 +766,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { else if (slop > 1) slop = 1; this.config = conf; + this.rackManager = new RackManager(getConf()); + regionFinder.setConf(conf); } protected void setSlop(Configuration conf) { @@ -369,12 +779,19 @@ public abstract class BaseLoadBalancer implements LoadBalancer { return this.config; } + @Override public void setClusterStatus(ClusterStatus st) { - // Not used except for the StocasticBalancer + regionFinder.setClusterStatus(st); } + @Override public void setMasterServices(MasterServices masterServices) { this.services = masterServices; + this.regionFinder.setServices(masterServices); + } + + public void setRackManager(RackManager rackManager) { + this.rackManager = rackManager; } protected boolean needsBalance(ClusterLoadState cs) { @@ -385,6 +802,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } return false; } + // TODO: check for same host replicas + // Check if we even need to do any load balancing // HBASE-3681 check sloppiness first float average = cs.getLoadAverage(); // for logging @@ -422,6 +841,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { * @return map of server to the regions it should take, or null if no * assignment is possible (ie. no regions or no servers) */ + @Override public Map> roundRobinAssignment(List regions, List servers) { metricsBalancer.incrMiscInvocations(); @@ -429,7 +849,16 @@ public abstract class BaseLoadBalancer implements LoadBalancer { if (regions.isEmpty() || servers.isEmpty()) { return null; } + + // TODO: instead of retainAssignment() and roundRobinAssignment(), we should just run the + // normal LB.balancerCluster() with unassignedRegions. We only need to have a candidate + // generator for AssignRegionAction. The LB will ensure the regions are mostly local + // and balanced. This should also run fast with fewer number of iterations. + Map> assignments = new TreeMap>(); + + Cluster cluster = createCluster(servers, regions); + int numRegions = regions.size(); int numServers = servers.size(); int max = (int) Math.ceil((float) numRegions / numServers); @@ -438,18 +867,60 @@ public abstract class BaseLoadBalancer implements LoadBalancer { serverIdx = RANDOM.nextInt(numServers); } int regionIdx = 0; + List unassignedRegions = new ArrayList(); for (int j = 0; j < numServers; j++) { ServerName server = servers.get((j + serverIdx) % numServers); List serverRegions = new ArrayList(max); for (int i = regionIdx; i < numRegions; i += numServers) { - serverRegions.add(regions.get(i % numRegions)); + HRegionInfo region = regions.get(i % numRegions); + if (cluster.wouldLowerAvailability(region, server)) { + unassignedRegions.add(region); + } else { + serverRegions.add(region); + cluster.doAssignRegion(region, server); + } } assignments.put(server, serverRegions); regionIdx++; } + List lastFewRegions = new ArrayList(); + // assign the remaining by going through the list and try to assign to servers one-by-one + serverIdx = RANDOM.nextInt(numServers); + for (HRegionInfo region : unassignedRegions) { + for (int j = 0; j < numServers; j++) { // try all servers one by one + ServerName serverName = servers.get((j + serverIdx) % numServers); + if (!cluster.wouldLowerAvailability(region, serverName)) { + assignments.get(serverName).add(region); + cluster.doAssignRegion(region, serverName); + serverIdx = (j + serverIdx + 1) % numServers; //remain from next server + break; + } + } + } + // just sprinkle the rest of the regions on random regionservers. The balanceCluster will + // make it optimal later. We should not need this if wouldLowerAvailability() is correct + for (HRegionInfo region : lastFewRegions) { + int i = RANDOM.nextInt(numServers); + assignments.get(servers.get(i)).add(region); + } return assignments; } + protected Cluster createCluster(List servers, Collection regions) { + // Get the snapshot of the current assignments for the regions in question, and then create + // a cluster out of it. Note that we might have replicas already assigned to some servers + // earlier. So we want to get the snapshot to see those assignments, but this will only contain + // replicas of the regions that are passed (for performance). + Map> clusterState = getRegionAssignmentsByServer(regions); + + for (ServerName server : servers) { + if (!clusterState.containsKey(server)) { + clusterState.put(server, EMPTY_REGION_LIST); + } + } + return new Cluster(regions, clusterState, null, this.regionFinder, rackManager); + } + /** * Generates an immediate assignment plan to be used by a new master for * regions in transition that do not have an already known destination. @@ -467,6 +938,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { * @param servers * @return map of regions to the server it should be assigned to */ + @Override public Map immediateAssignment(List regions, List servers) { metricsBalancer.incrMiscInvocations(); @@ -481,6 +953,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { /** * Used to assign a single region to a random server. */ + @Override public ServerName randomAssignment(HRegionInfo regionInfo, List servers) { metricsBalancer.incrMiscInvocations(); @@ -488,7 +961,26 @@ public abstract class BaseLoadBalancer implements LoadBalancer { LOG.warn("Wanted to do random assignment but no servers to assign to"); return null; } - return servers.get(RANDOM.nextInt(servers.size())); + List regions = Lists.newArrayList(regionInfo); + Cluster cluster = createCluster(servers, regions); + + ServerName server = randomAssignment(cluster, regionInfo, servers); + return server; + } + + protected ServerName randomAssignment(Cluster cluster, HRegionInfo regionInfo, + List servers) { + ServerName server = null; + final int maxIterations = servers.size(); + int iterations = 0; + int serverIdx = RANDOM.nextInt(servers.size()); + do { + server = servers.get(serverIdx++ % servers.size()); + } while (servers.size() != 1 + && cluster.wouldLowerAvailability(regionInfo, server) + && iterations++ < maxIterations); + cluster.doAssignRegion(regionInfo, server); + return server; } /** @@ -508,6 +1000,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { * @param servers available servers * @return map of servers and regions to be assigned to them */ + @Override public Map> retainAssignment(Map regions, List servers) { // Update metrics @@ -538,6 +1031,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer { int numRandomAssignments = 0; int numRetainedAssigments = 0; + + Cluster cluster = createCluster(servers, regions.keySet()); + for (Map.Entry entry : regions.entrySet()) { HRegionInfo region = entry.getKey(); ServerName oldServerName = entry.getValue(); @@ -548,21 +1044,25 @@ public abstract class BaseLoadBalancer implements LoadBalancer { if (localServers.isEmpty()) { // No servers on the new cluster match up with this hostname, // assign randomly. - ServerName randomServer = servers.get(RANDOM.nextInt(servers.size())); + ServerName randomServer = randomAssignment(cluster, region, servers); assignments.get(randomServer).add(region); numRandomAssignments++; if (oldServerName != null) oldHostsNoLongerPresent.add(oldServerName.getHostname()); } else if (localServers.size() == 1) { // the usual case - one new server on same host - assignments.get(localServers.get(0)).add(region); + ServerName target = localServers.get(0); + assignments.get(target).add(region); + cluster.doAssignRegion(region, target); numRetainedAssigments++; } else { // multiple new servers in the cluster on this same host - int size = localServers.size(); - ServerName target = - localServers.contains(oldServerName) ? oldServerName : localServers.get(RANDOM - .nextInt(size)); - assignments.get(target).add(region); + if (localServers.contains(oldServerName)) { + assignments.get(oldServerName).add(region); + cluster.doAssignRegion(region, oldServerName); + } else { + ServerName target = randomAssignment(cluster, region, localServers); + assignments.get(target).add(region); + } numRetainedAssigments++; } } @@ -595,4 +1095,13 @@ public abstract class BaseLoadBalancer implements LoadBalancer { LOG.info("Load Balancer stop requested: "+why); stopped = true; } + + protected Map> getRegionAssignmentsByServer( + Collection regions) { + if (this.services != null) { + return this.services.getAssignmentManager().getSnapShotOfAssignment(regions); + } else { + return new HashMap>(); + } + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java index b564462..596db62 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java @@ -63,6 +63,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer { @Override public void setConf(Configuration conf) { + super.setConf(conf); globalFavoredNodesAssignmentPlan = new FavoredNodesPlan(); this.rackManager = new RackManager(conf); this.conf = conf; @@ -81,7 +82,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer { LOG.warn("Not running balancer since exception was thrown " + ie); return plans; } - globalFavoredNodesAssignmentPlan = snaphotOfRegionAssignment.getExistingAssignmentPlan(); + globalFavoredNodesAssignmentPlan = snaphotOfRegionAssignment.getExistingAssignmentPlan(); Map serverNameToServerNameWithoutCode = new HashMap(); Map serverNameWithoutCodeToServerName = @@ -134,7 +135,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer { destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2)); } } - + if (destination != null) { RegionPlan plan = new RegionPlan(region, currentServer, destination); plans.add(plan); @@ -160,7 +161,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer { // one of the favored node is still alive. In this case, try to adhere // to the current favored nodes assignment as much as possible - i.e., // if the current primary is gone, then make the secondary or tertiary - // as the new host for the region (based on their current load). + // as the new host for the region (based on their current load). // Note that we don't change the favored // node assignments here (even though one or more favored node is currently // down). It is up to the balanceCluster to do this hard work. The HDFS @@ -223,7 +224,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer { } } - private Pair>, List> + private Pair>, List> segregateRegionsAndAssignRegionsWithFavoredNodes(List regions, List availableServers) { Map> assignmentMapForFavoredNodes = diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index 964d4f9..5f8f08c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master.balancer; import java.util.ArrayDeque; +import java.util.Arrays; import java.util.Collection; import java.util.Deque; import java.util.HashMap; @@ -37,11 +38,16 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Pair; /** *

This is a best effort load balancer. Given a Cost function F(C) => x It will @@ -89,19 +95,18 @@ import org.apache.hadoop.hbase.util.Pair; @InterfaceAudience.Private public class StochasticLoadBalancer extends BaseLoadBalancer { - private static final String STEPS_PER_REGION_KEY = + protected static final String STEPS_PER_REGION_KEY = "hbase.master.balancer.stochastic.stepsPerRegion"; - private static final String MAX_STEPS_KEY = + protected static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps"; - private static final String MAX_RUNNING_TIME_KEY = + protected static final String MAX_RUNNING_TIME_KEY = "hbase.master.balancer.stochastic.maxRunningTime"; - private static final String KEEP_REGION_LOADS = + protected static final String KEEP_REGION_LOADS = "hbase.master.balancer.stochastic.numRegionLoadsToRemember"; private static final Random RANDOM = new Random(System.currentTimeMillis()); private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class); - private final RegionLocationFinder regionFinder = new RegionLocationFinder(); private ClusterStatus clusterStatus = null; Map> loads = new HashMap>(); @@ -111,20 +116,18 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { private long maxRunningTime = 30 * 1000 * 1; // 30 seconds. private int numRegionLoadsToRemember = 15; - private RegionPicker[] pickers; + private CandidateGenerator[] candidateGenerators; private CostFromRegionLoadFunction[] regionLoadFunctions; private CostFunction[] costFunctions; // Keep locality based picker and cost function to alert them // when new services are offered - private LocalityBasedPicker localityPicker; + private LocalityBasedCandidateGenerator localityCandidateGenerator; private LocalityCostFunction localityCost; @Override public void setConf(Configuration conf) { super.setConf(conf); - regionFinder.setConf(conf); - maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps); stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion); @@ -132,13 +135,14 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember); - localityPicker = new LocalityBasedPicker(services); + localityCandidateGenerator = new LocalityBasedCandidateGenerator(services); localityCost = new LocalityCostFunction(conf, services); - pickers = new RegionPicker[] { - new RandomRegionPicker(), - new LoadPicker(), - localityPicker + candidateGenerators = new CandidateGenerator[] { + new RandomCandidateGenerator(), + new LoadCandidateGenerator(), + localityCandidateGenerator, + new RegionReplicaHostCandidateGenerator(), }; regionLoadFunctions = new CostFromRegionLoadFunction[] { @@ -153,6 +157,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { new MoveCostFunction(conf), localityCost, new TableSkewCostFunction(conf), + new RegionReplicaHostCostFunction(conf), + new RegionReplicaRackCostFunction(conf), regionLoadFunctions[0], regionLoadFunctions[1], regionLoadFunctions[2], @@ -168,7 +174,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { @Override public void setClusterStatus(ClusterStatus st) { super.setClusterStatus(st); - regionFinder.setClusterStatus(st); this.clusterStatus = st; updateRegionLoad(); for(CostFromRegionLoadFunction cost : regionLoadFunctions) { @@ -179,9 +184,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { @Override public void setMasterServices(MasterServices masterServices) { super.setMasterServices(masterServices); - this.regionFinder.setServices(masterServices); this.localityCost.setServices(masterServices); - this.localityPicker.setServices(masterServices); + this.localityCandidateGenerator.setServices(masterServices); } @@ -191,6 +195,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { */ @Override public List balanceCluster(Map> clusterState) { + //The clusterState that is given to this method contains the state + //of all the regions in the table(s) (that's true today) if (!needsBalance(new ClusterLoadState(clusterState))) { return null; } @@ -198,7 +204,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { long startTime = EnvironmentEdgeManager.currentTimeMillis(); // Keep track of servers to iterate through them. - Cluster cluster = new Cluster(clusterState, loads, regionFinder); + Cluster cluster = new Cluster(clusterState, loads, regionFinder, rackManager); + initCosts(cluster); double currentCost = computeCost(cluster, Double.MAX_VALUE); double initCost = currentCost; @@ -208,42 +215,30 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers)); // Perform a stochastic walk to see if we can get a good fit. long step; - for (step = 0; step < computedMaxSteps; step++) { - int pickerIdx = RANDOM.nextInt(pickers.length); - RegionPicker p = pickers[pickerIdx]; - Pair, Pair> picks = p.pick(cluster); - - int leftServer = picks.getFirst().getFirst(); - int leftRegion = picks.getFirst().getSecond(); - int rightServer = picks.getSecond().getFirst(); - int rightRegion = picks.getSecond().getSecond(); - // We couldn't find a server - if (rightServer < 0 || leftServer < 0) { - continue; - } + for (step = 0; step < computedMaxSteps; step++) { + int generatorIdx = RANDOM.nextInt(candidateGenerators.length); + CandidateGenerator p = candidateGenerators[generatorIdx]; + Cluster.Action action = p.generate(cluster); - // We randomly picked to do nothing. - if (leftRegion < 0 && rightRegion < 0) { + if (action.type == Type.NULL) { continue; } - cluster.moveOrSwapRegion(leftServer, - rightServer, - leftRegion, - rightRegion); + cluster.doAction(action); + updateCostsWithAction(cluster, action); newCost = computeCost(cluster, currentCost); + // Should this be kept? if (newCost < currentCost) { currentCost = newCost; } else { // Put things back the way they were before. - // TODO: undo by remembering old values, using an UndoAction class - cluster.moveOrSwapRegion(leftServer, - rightServer, - rightRegion, - leftRegion); + // TODO: undo by remembering old values + Action undoAction = action.undoAction(); + cluster.doAction(undoAction); + updateCostsWithAction(cluster, undoAction); } if (EnvironmentEdgeManager.currentTimeMillis() - startTime > @@ -338,6 +333,17 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } } + protected void initCosts(Cluster cluster) { + for (CostFunction c:costFunctions) { + c.init(cluster); + } + } + + protected void updateCostsWithAction(Cluster cluster, Action action) { + for (CostFunction c : costFunctions) { + c.postAction(cluster, action); + } + } /** * This is the main cost function. It will compute a cost associated with a proposed cluster @@ -365,8 +371,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return total; } - abstract static class RegionPicker { - abstract Pair, Pair> pick(Cluster cluster); + /** Generates a candidate action to be applied to the cluster for cost function search */ + abstract static class CandidateGenerator { + abstract Cluster.Action generate(Cluster cluster); /** * From a list of regions pick a random one. Null can be returned which @@ -397,6 +404,15 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return RANDOM.nextInt(cluster.numServers); } + + protected int pickRandomHost(Cluster cluster) { + if (cluster.numHosts < 1) { + return -1; + } + + return RANDOM.nextInt(cluster.numHosts); + } + protected int pickOtherRandomServer(Cluster cluster, int serverIndex) { if (cluster.numServers < 2) { return -1; @@ -409,11 +425,23 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } } - protected Pair pickRandomRegions(Cluster cluster, + protected int pickOtherRandomHost(Cluster cluster, int hostIndex) { + if (cluster.numHosts < 2) { + return -1; + } + while (true) { + int otherHostIndex = pickRandomHost(cluster); + if (otherHostIndex != hostIndex) { + return otherHostIndex; + } + } + } + + protected Cluster.Action pickRandomRegions(Cluster cluster, int thisServer, int otherServer) { if (thisServer < 0 || otherServer < 0) { - return new Pair(-1, -1); + return Cluster.NullAction; } // Decide who is most likely to need another region @@ -427,45 +455,50 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { int thisRegion = pickRandomRegion(cluster, thisServer, thisChance); int otherRegion = pickRandomRegion(cluster, otherServer, otherChance); - return new Pair(thisRegion, otherRegion); + return getAction(thisServer, thisRegion, otherServer, otherRegion); + } + + protected Cluster.Action getAction (int fromServer, int fromRegion, + int toServer, int toRegion) { + if (fromServer < 0 || toServer < 0) { + return Cluster.NullAction; + } + if (fromRegion > 0 && toRegion > 0) { + return new Cluster.SwapRegionsAction(fromServer, fromRegion, + toServer, toRegion); + } else if (fromRegion > 0) { + return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer); + } else if (toRegion > 0) { + return new Cluster.MoveRegionAction(toRegion, toServer, fromServer); + } else { + return Cluster.NullAction; + } } } - static class RandomRegionPicker extends RegionPicker { + static class RandomCandidateGenerator extends CandidateGenerator { @Override - Pair, Pair> pick(Cluster cluster) { + Cluster.Action generate(Cluster cluster) { int thisServer = pickRandomServer(cluster); // Pick the other server int otherServer = pickOtherRandomServer(cluster, thisServer); - Pair regions = pickRandomRegions(cluster, thisServer, otherServer); - - return new Pair, Pair>( - new Pair(thisServer, regions.getFirst()), - new Pair(otherServer, regions.getSecond()) - - ); + return pickRandomRegions(cluster, thisServer, otherServer); } - } - public static class LoadPicker extends RegionPicker { + public static class LoadCandidateGenerator extends CandidateGenerator { @Override - Pair, Pair> pick(Cluster cluster) { + Cluster.Action generate(Cluster cluster) { cluster.sortServersByRegionCount(); int thisServer = pickMostLoadedServer(cluster, -1); int otherServer = pickLeastLoadedServer(cluster, thisServer); - Pair regions = pickRandomRegions(cluster, thisServer, otherServer); - return new Pair, Pair>( - new Pair(thisServer, regions.getFirst()), - new Pair(otherServer, regions.getSecond()) - - ); + return pickRandomRegions(cluster, thisServer, otherServer); } private int pickLeastLoadedServer(final Cluster cluster, int thisServer) { @@ -495,21 +528,18 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } } - static class LocalityBasedPicker extends RegionPicker { + static class LocalityBasedCandidateGenerator extends CandidateGenerator { private MasterServices masterServices; - LocalityBasedPicker(MasterServices masterServices) { + LocalityBasedCandidateGenerator(MasterServices masterServices) { this.masterServices = masterServices; } @Override - Pair, Pair> pick(Cluster cluster) { + Cluster.Action generate(Cluster cluster) { if (this.masterServices == null) { - return new Pair, Pair>( - new Pair(-1,-1), - new Pair(-1,-1) - ); + return Cluster.NullAction; } // Pick a random region server int thisServer = pickRandomServer(cluster); @@ -518,10 +548,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { int thisRegion = pickRandomRegion(cluster, thisServer, 0.0f); if (thisRegion == -1) { - return new Pair, Pair>( - new Pair(-1,-1), - new Pair(-1,-1) - ); + return Cluster.NullAction; } // Pick the server with the highest locality @@ -530,10 +557,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { // pick an region on the other server to potentially swap int otherRegion = this.pickRandomRegion(cluster, otherServer, 0.5f); - return new Pair, Pair>( - new Pair(thisServer,thisRegion), - new Pair(otherServer,otherRegion) - ); + return getAction(thisServer, thisRegion, otherServer, otherRegion); } private int pickHighestLocalityServer(Cluster cluster, int thisServer, int thisRegion) { @@ -558,6 +582,96 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } /** + * Generates candidates which moves the replicas out of the region server for + * co-hosted region replicas + */ + public static class RegionReplicaHostCandidateGenerator extends CandidateGenerator { + + RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator(); + + @Override + Cluster.Action generate(Cluster cluster) { + + int hostIndex = pickRandomHost(cluster); + + if (cluster.numHosts <= 1 || hostIndex == -1) { + return Cluster.NullAction; + } + + // find the regions in the host with its replicas in the same host + int currentPrimary = -1; + int currentPrimaryIndex = -1; + int numRegionsWithReplica = 0; + for (int j = 0; j <= cluster.regionsByPrimaryPerHost[hostIndex].length; j++) { + int primary = j < cluster.regionsByPrimaryPerHost[hostIndex].length ? cluster.regionsByPrimaryPerHost[hostIndex][j] : -1; + if (primary != currentPrimary) { + int numReplicas = j - currentPrimaryIndex; + if (numReplicas > 1) { + numRegionsWithReplica++; + } + + currentPrimary = primary; + currentPrimaryIndex = j; + } + } + + // randomly select one primaryIndex out of all region replicas in the same host + currentPrimary = -1; + currentPrimaryIndex = -1; + int primaryIndex = -1; + if (numRegionsWithReplica > 0) { + int randomIndex = RANDOM.nextInt(numRegionsWithReplica); + int index = 0; + for (int j = 0; j <= cluster.regionsByPrimaryPerHost[hostIndex].length; j++) { + int primary = j < cluster.regionsByPrimaryPerHost[hostIndex].length ? cluster.regionsByPrimaryPerHost[hostIndex][j] : -1; + if (primary != currentPrimary) { + int numReplicas = j - currentPrimaryIndex; + if (numReplicas > 1) { + if (index++ == randomIndex) { + primaryIndex = currentPrimary; + break; + } + } + + currentPrimary = primary; + currentPrimaryIndex = j; + } + } + } + + if (primaryIndex == -1) { + // default to randompicker + return randomGenerator.generate(cluster); + } + + int regionIndex = -1; + int serverIndex = -1; + // move the region replica out of the host + for (int j = 0; j < cluster.serversPerHost[hostIndex].length; j++) { + for (int k = 0; k < cluster.regionsPerServer[cluster.serversPerHost[hostIndex][j]].length; k++) { + int region = cluster.regionsPerServer[cluster.serversPerHost[hostIndex][j]][k]; + if (primaryIndex == cluster.regionIndexToPrimaryIndex[region]) { + // always move the secondary, not the primary + if (!RegionReplicaUtil.isDefaultReplica(cluster.regions[region])) { + regionIndex = region; + serverIndex = cluster.serversPerHost[hostIndex][j]; + break; + } + } + } + } + + int toHostIndex = pickOtherRandomHost(cluster, hostIndex); + int toServerIndex = cluster.serversPerHost[toHostIndex] + [RANDOM.nextInt(cluster.serversPerHost[toHostIndex].length)]; + + int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f); + + return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex); + } + } + + /** * Base class of StochasticLoadBalancer's Cost Functions. */ public abstract static class CostFunction { @@ -577,6 +691,39 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { this.multiplier = m; } + /** Called once per LB invocation to give the cost function + * to initialize it's state, and perform any costly calculation. + */ + void init(Cluster cluster) { } + + /** Called once per cluster Action to give the cost function + * an opportunity to update it's state. postAction() is always + * called at least once before cost() is called with the cluster + * that this action is performed on. */ + void postAction(Cluster cluster, Action action) { + switch (action.type) { + case NULL: break; + case ASSIGN_REGION: + AssignRegionAction ar = (AssignRegionAction) action; + regionMoved(cluster, ar.region, -1, ar.server); + break; + case MOVE_REGION: + MoveRegionAction mra = (MoveRegionAction) action; + regionMoved(cluster, mra.region, mra.fromServer, mra.toServer); + break; + case SWAP_REGIONS: + SwapRegionsAction a = (SwapRegionsAction) action; + regionMoved(cluster, a.fromRegion, a.fromServer, a.toServer); + regionMoved(cluster, a.toRegion, a.toServer, a.fromServer); + break; + default: + throw new RuntimeException("Uknown action:" + action.type); + } + } + + protected void regionMoved(Cluster cluster, int region, int oldServer, int newServer) { + } + abstract double cost(Cluster cluster); /** @@ -606,8 +753,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return scaled; } - - private double getSum(double[] stats) { double total = 0; for(double s:stats) { @@ -824,6 +969,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } + @Override double cost(Cluster cluster) { if (clusterStatus == null || loads == null) { return 0; @@ -891,6 +1037,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } + @Override protected double getCostFromRl(RegionLoad rl) { return rl.getReadRequestsCount(); } @@ -911,12 +1058,135 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST)); } + @Override protected double getCostFromRl(RegionLoad rl) { return rl.getWriteRequestsCount(); } } /** + * A cost function for region replicas. We give a very high cost to hosting + * replicas of the same region in the same host. We do not prevent the case + * though, since if numReplicas > numRegionServers, we still want to keep the + * replica open. + */ + public static class RegionReplicaHostCostFunction extends CostFunction { + private static final String REGION_REPLICA_HOST_COST_KEY = + "hbase.master.balancer.stochastic.regionReplicaHostCostKey"; + private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000; + + long maxCost = 0; + long[] costsPerGroup; // group is either host or rack + + public RegionReplicaHostCostFunction(Configuration conf) { + super(conf); + this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY, DEFAULT_REGION_REPLICA_HOST_COST_KEY)); + } + + @Override + void init(Cluster cluster) { + // max cost is the case where every region replica is hosted together regardless of host + maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0; + costsPerGroup = new long[cluster.numHosts]; + for (int i = 0 ; i < cluster.regionsByPrimaryPerHost.length; i++) { + costsPerGroup[i] = costPerGroup(cluster.regionsByPrimaryPerHost[i]); + } + } + + long getMaxCost(Cluster cluster) { + int[] regionsByPrimary = new int[cluster.numRegions]; + for (int i = 0; i < cluster.regions.length; i++) { + int primaryIndex = cluster.regionIndexToPrimaryIndex[i]; + regionsByPrimary[i] = primaryIndex; + } + + Arrays.sort(regionsByPrimary); + + // compute numReplicas from the sorted array + return costPerGroup(regionsByPrimary); + } + + @Override + double cost(Cluster cluster) { + if (maxCost <= 0) { + return 0; + } + + long totalCost = 0; + for (int i = 0 ; i < costsPerGroup.length; i++) { + totalCost += costsPerGroup[i]; + } + return scale(0, maxCost, totalCost); + } + + protected long costPerGroup(int[] regionsByPrimary) { + long cost = 0; + int currentPrimary = -1; + int currentPrimaryIndex = -1; + for (int j = 0 ; j <= regionsByPrimary.length; j++) { + int primary = j < regionsByPrimary.length ? regionsByPrimary[j] : -1; + if (primary != currentPrimary) { + int numReplicas = j - currentPrimaryIndex; + // square the cost + if (numReplicas > 1) { + cost += (numReplicas - 1) * (numReplicas - 1); + } + currentPrimary = primary; + currentPrimaryIndex = j; + } + } + + return cost; + } + + @Override + protected void regionMoved(Cluster cluster, int region, int oldServer, int newServer) { + int oldHost = cluster.serverIndexToHostIndex[oldServer]; + int newHost = cluster.serverIndexToHostIndex[newServer]; + if (newHost != oldHost) { + costsPerGroup[oldHost] = costPerGroup(cluster.regionsByPrimaryPerHost[oldHost]); + costsPerGroup[newHost] = costPerGroup(cluster.regionsByPrimaryPerHost[newHost]); + } + } + } + + /** + * A cost function for region replicas for the rack distribution. We give a relatively high + * cost to hosting replicas of the same region in the same rack. We do not prevent the case + * though. + */ + public static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction { + private static final String REGION_REPLICA_RACK_COST_KEY = + "hbase.master.balancer.stochastic.regionReplicaRackCostKey"; + private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000; + + public RegionReplicaRackCostFunction(Configuration conf) { + super(conf); + this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY, DEFAULT_REGION_REPLICA_RACK_COST_KEY)); + } + + @Override + void init(Cluster cluster) { + // max cost is the case where every region replica is hosted together regardless of rack + maxCost = cluster.numRacks > 1 ? getMaxCost(cluster) : 0; + costsPerGroup = new long[cluster.numRacks]; + for (int i = 0 ; i < cluster.regionsByPrimaryPerRack.length; i++) { + costsPerGroup[i] = costPerGroup(cluster.regionsByPrimaryPerRack[i]); + } + } + + @Override + protected void regionMoved(Cluster cluster, int region, int oldServer, int newServer) { + int oldRack = cluster.serverIndexToRackIndex[oldServer]; + int newRack = cluster.serverIndexToRackIndex[newServer]; + if (newRack != oldRack) { + costsPerGroup[oldRack] = costPerGroup(cluster.regionsByPrimaryPerRack[oldRack]); + costsPerGroup[newRack] = costPerGroup(cluster.regionsByPrimaryPerRack[newRack]); + } + } + } + + /** * Compute the cost of total memstore size. The more unbalanced the higher the * computed cost will be. This uses a rolling average of regionload. */ diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java index 20aab42..5b4d732 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java @@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -126,62 +125,60 @@ public class TestMasterOperationsForRegionReplicas { assert (state != null); } } - // TODO: HBASE-10351 should uncomment the following tests (since the tests assume region placements are handled) -// List metaRows = MetaReader.fullScan(ct); -// int numRows = 0; -// for (Result result : metaRows) { -// RegionLocations locations = MetaReader.getRegionLocations(result); -// HRegionInfo hri = locations.getRegionLocation().getRegionInfo(); -// if (!hri.getTable().equals(table)) continue; -// numRows += 1; -// HRegionLocation[] servers = locations.getRegionLocations(); -// // have two locations for the replicas of a region, and the locations should be different -// assert(servers.length == 2); -// assert(!servers[0].equals(servers[1])); -// } -// assert(numRows == numRegions); -// -// // The same verification of the meta as above but with the SnapshotOfRegionAssignmentFromMeta -// // class -// validateFromSnapshotFromMeta(table, numRegions, numReplica, ct); -// -// // Now kill the master, restart it and see if the assignments are kept -// ServerName master = TEST_UTIL.getHBaseClusterInterface().getClusterStatus().getMaster(); -// TEST_UTIL.getHBaseClusterInterface().stopMaster(master); -// TEST_UTIL.getHBaseClusterInterface().waitForMasterToStop(master, 30000); -// TEST_UTIL.getHBaseClusterInterface().startMaster(master.getHostname()); -// TEST_UTIL.getHBaseClusterInterface().waitForActiveAndReadyMaster(); -// for (int i = 0; i < numRegions; i++) { -// for (int j = 0; j < numReplica; j++) { -// HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hris.get(i), j); -// RegionState state = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager() -// .getRegionStates().getRegionState(replica); -// assert (state != null); -// } -// } -// validateFromSnapshotFromMeta(table, numRegions, numReplica, ct); -// -// // Now shut the whole cluster down, and verify the assignments are kept so that the -// // availability constraints are met. -// TEST_UTIL.getConfiguration().setBoolean("hbase.master.startup.retainassign", true); -// TEST_UTIL.shutdownMiniHBaseCluster(); -// TEST_UTIL.startMiniHBaseCluster(1, numSlaves); -// TEST_UTIL.waitTableEnabled(table.getName()); -// ct = new CatalogTracker(TEST_UTIL.getConfiguration()); -// validateFromSnapshotFromMeta(table, numRegions, numReplica, ct); -// -// // Now shut the whole cluster down, and verify regions are assigned even if there is only -// // one server running -// TEST_UTIL.shutdownMiniHBaseCluster(); -// TEST_UTIL.startMiniHBaseCluster(1, 1); -// TEST_UTIL.waitTableEnabled(table.getName()); -// ct = new CatalogTracker(TEST_UTIL.getConfiguration()); -// validateSingleRegionServerAssignment(ct, numRegions, numReplica); -// for (int i = 1; i < numSlaves; i++) { //restore the cluster -// TEST_UTIL.getMiniHBaseCluster().startRegionServer(); -// } + List metaRows = MetaReader.fullScan(ct); + int numRows = 0; + for (Result result : metaRows) { + RegionLocations locations = MetaReader.getRegionLocations(result); + HRegionInfo hri = locations.getRegionLocation().getRegionInfo(); + if (!hri.getTable().equals(table)) continue; + numRows += 1; + HRegionLocation[] servers = locations.getRegionLocations(); + // have two locations for the replicas of a region, and the locations should be different + assert(servers.length == 2); + assert(!servers[0].equals(servers[1])); + } + assert(numRows == numRegions); + + // The same verification of the meta as above but with the SnapshotOfRegionAssignmentFromMeta + // class + validateFromSnapshotFromMeta(table, numRegions, numReplica, ct); + + // Now kill the master, restart it and see if the assignments are kept + ServerName master = TEST_UTIL.getHBaseClusterInterface().getClusterStatus().getMaster(); + TEST_UTIL.getHBaseClusterInterface().stopMaster(master); + TEST_UTIL.getHBaseClusterInterface().waitForMasterToStop(master, 30000); + TEST_UTIL.getHBaseClusterInterface().startMaster(master.getHostname()); + TEST_UTIL.getHBaseClusterInterface().waitForActiveAndReadyMaster(); + for (int i = 0; i < numRegions; i++) { + for (int j = 0; j < numReplica; j++) { + HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hris.get(i), j); + RegionState state = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager() + .getRegionStates().getRegionState(replica); + assert (state != null); + } + } + validateFromSnapshotFromMeta(table, numRegions, numReplica, ct); + + // Now shut the whole cluster down, and verify the assignments are kept so that the + // availability constraints are met. + TEST_UTIL.getConfiguration().setBoolean("hbase.master.startup.retainassign", true); + TEST_UTIL.shutdownMiniHBaseCluster(); + TEST_UTIL.startMiniHBaseCluster(1, numSlaves); + TEST_UTIL.waitTableEnabled(table.getName()); + ct = new CatalogTracker(TEST_UTIL.getConfiguration()); + validateFromSnapshotFromMeta(table, numRegions, numReplica, ct); + + // Now shut the whole cluster down, and verify regions are assigned even if there is only + // one server running + TEST_UTIL.shutdownMiniHBaseCluster(); + TEST_UTIL.startMiniHBaseCluster(1, 1); + TEST_UTIL.waitTableEnabled(table.getName()); + ct = new CatalogTracker(TEST_UTIL.getConfiguration()); + validateSingleRegionServerAssignment(ct, numRegions, numReplica); + for (int i = 1; i < numSlaves; i++) { //restore the cluster + TEST_UTIL.getMiniHBaseCluster().startRegionServer(); + } - //TODO: HBASE-10361 patch should uncomment the test below //check on alter table admin.disableTable(table); assert(admin.isTableDisabled(table)); @@ -213,7 +210,7 @@ public class TestMasterOperationsForRegionReplicas { for (HRegionInfo hri : hris) { Integer i; HRegionInfo regionReplica0 = RegionReplicaUtil.getRegionInfoForDefaultReplica(hri); - defaultReplicas.put(regionReplica0, + defaultReplicas.put(regionReplica0, (i = defaultReplicas.get(regionReplica0)) == null ? 1 : i + 1); } assert(defaultReplicas.size() == numRegions); @@ -311,7 +308,7 @@ public class TestMasterOperationsForRegionReplicas { } // the number of startkeys will be equal to the number of regions hosted in each server // (each server will be hosting one replica of a region) - assertEquals(setOfStartKeys.size() , numRegions); + assertEquals(numRegions, setOfStartKeys.size()); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java index 310ae90..facd526 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java @@ -21,20 +21,26 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Queue; import java.util.Random; +import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; /** * Class used to be the base of unit tests on load balancers. It gives helper @@ -80,6 +86,50 @@ public class BalancerTestBase { } } + /** + * Checks whether region replicas are not hosted on the same host. + */ + public void assertRegionReplicaPlacement(Map> serverMap, RackManager rackManager) { + TreeMap> regionsPerHost = new TreeMap>(); + TreeMap> regionsPerRack = new TreeMap>(); + + for (Entry> entry : serverMap.entrySet()) { + String hostname = entry.getKey().getHostname(); + Set infos = regionsPerHost.get(hostname); + if (infos == null) { + infos = new HashSet(); + regionsPerHost.put(hostname, infos); + } + + for (HRegionInfo info : entry.getValue()) { + HRegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); + if (!infos.add(primaryInfo)) { + Assert.fail("Two or more region replicas are hosted on the same host after balance"); + } + } + } + + if (rackManager == null) { + return; + } + + for (Entry> entry : serverMap.entrySet()) { + String rack = rackManager.getRack(entry.getKey()); + Set infos = regionsPerRack.get(rack); + if (infos == null) { + infos = new HashSet(); + regionsPerRack.put(rack, infos); + } + + for (HRegionInfo info : entry.getValue()) { + HRegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); + if (!infos.add(primaryInfo)) { + Assert.fail("Two or more region replicas are hosted on the same rack after balance"); + } + } + } + } + protected String printStats(List servers) { int numServers = servers.size(); int totalRegions = 0; @@ -159,17 +209,17 @@ public class BalancerTestBase { map.put(sn, sal); } - protected Map> mockClusterServers(int[] mockCluster) { + protected TreeMap> mockClusterServers(int[] mockCluster) { return mockClusterServers(mockCluster, -1); } protected BaseLoadBalancer.Cluster mockCluster(int[] mockCluster) { - return new BaseLoadBalancer.Cluster(mockClusterServers(mockCluster, -1), null, null); + return new BaseLoadBalancer.Cluster(mockClusterServers(mockCluster, -1), null, null, null); } - protected Map> mockClusterServers(int[] mockCluster, int numTables) { + protected TreeMap> mockClusterServers(int[] mockCluster, int numTables) { int numServers = mockCluster.length; - Map> servers = new TreeMap>(); + TreeMap> servers = new TreeMap>(); for (int i = 0; i < numServers; i++) { int numRegions = mockCluster[i]; ServerAndLoad sal = randomServer(0); @@ -217,7 +267,7 @@ public class BalancerTestBase { ServerName sn = this.serverQueue.poll(); return new ServerAndLoad(sn, numRegionsPerServer); } - String host = "srv" + rand.nextInt(100000); + String host = "srv" + rand.nextInt(Integer.MAX_VALUE); int port = rand.nextInt(60000); long startCode = rand.nextLong(); ServerName sn = ServerName.valueOf(host, port, startCode); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java index d0cf4fa..21416c2 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -34,17 +35,25 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.master.LoadBalancer; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; @Category(MediumTests.class) public class TestBaseLoadBalancer extends BalancerTestBase { - private static LoadBalancer loadBalancer; - private static final Log LOG = LogFactory.getLog(TestStochasticLoadBalancer.class); + private static BaseLoadBalancer loadBalancer; + private static RackManager rackManager; + private static final int NUM_SERVERS = 15; + private static ServerName[] servers = new ServerName[NUM_SERVERS]; + private static final Log LOG = LogFactory.getLog(TestBaseLoadBalancer.class); int[][] regionsAndServersMocks = new int[][] { // { num regions, num servers } @@ -58,6 +67,20 @@ public class TestBaseLoadBalancer extends BalancerTestBase { Configuration conf = HBaseConfiguration.create(); loadBalancer = new MockBalancer(); loadBalancer.setConf(conf); + // Set up the rack topologies (5 machines per rack) + rackManager = Mockito.mock(RackManager.class); + for (int i = 0; i < NUM_SERVERS; i++) { + servers[i] = ServerName.valueOf("foo"+i+":1234",-1); + if (i < 5) { + Mockito.when(rackManager.getRack(servers[i])).thenReturn("rack1"); + } + if (i >= 5 && i < 10) { + Mockito.when(rackManager.getRack(servers[i])).thenReturn("rack2"); + } + if (i >= 10) { + Mockito.when(rackManager.getRack(servers[i])).thenReturn("rack3"); + } + } } public static class MockBalancer extends BaseLoadBalancer { @@ -174,6 +197,138 @@ public class TestBaseLoadBalancer extends BalancerTestBase { assertRetainedAssignment(existing, listOfServerNames, assignment); } + @Test + public void testRegionAvailability() throws Exception { + // Create a cluster with a few servers, assign them to specific racks + // then assign some regions. The tests should check whether moving a + // replica from one node to a specific other node or rack lowers the + // availability of the region or not + + List list0 = new ArrayList(); + List list1 = new ArrayList(); + List list2 = new ArrayList(); + // create a region (region1) + HRegionInfo hri1 = new HRegionInfo( + TableName.valueOf("table"), "key1".getBytes(), "key2".getBytes(), + false, 100); + // create a replica of the region (replica_of_region1) + HRegionInfo hri2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1); + // create a second region (region2) + HRegionInfo hri3 = new HRegionInfo( + TableName.valueOf("table"), "key2".getBytes(), "key3".getBytes(), + false, 101); + list0.add(hri1); //only region1 + list1.add(hri2); //only replica_of_region1 + list2.add(hri3); //only region2 + Map> clusterState = + new LinkedHashMap>(); + clusterState.put(servers[0], list0); //servers[0] hosts region1 + clusterState.put(servers[1], list1); //servers[1] hosts replica_of_region1 + clusterState.put(servers[2], list2); //servers[2] hosts region2 + // create a cluster with the above clusterState. The way in which the + // cluster is created (constructor code) would make sure the indices of + // the servers are in the order in which it is inserted in the clusterState + // map (linkedhashmap is important). A similar thing applies to the region lists + Cluster cluster = new Cluster(clusterState, null, null, rackManager); + // check whether a move of region1 from servers[0] to servers[1] would lower + // the availability of region1 + assertTrue(cluster.wouldLowerAvailability(hri1, servers[1])); + // check whether a move of region1 from servers[0] to servers[2] would lower + // the availability of region1 + assertTrue(!cluster.wouldLowerAvailability(hri1, servers[2])); + // check whether a move of replica_of_region1 from servers[0] to servers[2] would lower + // the availability of replica_of_region1 + assertTrue(!cluster.wouldLowerAvailability(hri2, servers[2])); + // check whether a move of region2 from servers[0] to servers[1] would lower + // the availability of region2 + assertTrue(!cluster.wouldLowerAvailability(hri3, servers[1])); + + // now lets have servers[1] host replica_of_region2 + list1.add(RegionReplicaUtil.getRegionInfoForReplica(hri3, 1)); + // create a new clusterState with the above change + cluster = new Cluster(clusterState, null, null, rackManager); + // now check whether a move of a replica from servers[0] to servers[1] would lower + // the availability of region2 + assertTrue(cluster.wouldLowerAvailability(hri3, servers[1])); + + // start over again + clusterState.clear(); + clusterState.put(servers[0], list0); //servers[0], rack1 hosts region1 + clusterState.put(servers[5], list1); //servers[5], rack2 hosts replica_of_region1 and replica_of_region2 + clusterState.put(servers[6], list2); //servers[6], rack2 hosts region2 + clusterState.put(servers[10], new ArrayList()); //servers[10], rack3 hosts no region + // create a cluster with the above clusterState + cluster = new Cluster(clusterState, null, null, rackManager); + // check whether a move of region1 from servers[0],rack1 to servers[6],rack2 would + // lower the availability + + assertTrue(cluster.wouldLowerAvailability(hri1, servers[0])); + + // now create a cluster without the rack manager + cluster = new Cluster(clusterState, null, null, null); + // now repeat check whether a move of region1 from servers[0] to servers[6] would + // lower the availability + assertTrue(!cluster.wouldLowerAvailability(hri1, servers[6])); + } + + @Test + public void testRegionAvailabilityWithRegionMoves() throws Exception { + List list0 = new ArrayList(); + List list1 = new ArrayList(); + List list2 = new ArrayList(); + // create a region (region1) + HRegionInfo hri1 = new HRegionInfo( + TableName.valueOf("table"), "key1".getBytes(), "key2".getBytes(), + false, 100); + // create a replica of the region (replica_of_region1) + HRegionInfo hri2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1); + // create a second region (region2) + HRegionInfo hri3 = new HRegionInfo( + TableName.valueOf("table"), "key2".getBytes(), "key3".getBytes(), + false, 101); + list0.add(hri1); //only region1 + list1.add(hri2); //only replica_of_region1 + list2.add(hri3); //only region2 + Map> clusterState = + new LinkedHashMap>(); + clusterState.put(servers[0], list0); //servers[0] hosts region1 + clusterState.put(servers[1], list1); //servers[1] hosts replica_of_region1 + clusterState.put(servers[2], list2); //servers[2] hosts region2 + // create a cluster with the above clusterState. The way in which the + // cluster is created (constructor code) would make sure the indices of + // the servers are in the order in which it is inserted in the clusterState + // map (linkedhashmap is important). + Cluster cluster = new Cluster(clusterState, null, null, rackManager); + // check whether moving region1 from servers[1] to servers[2] would lower availability + assertTrue(!cluster.wouldLowerAvailability(hri1, servers[2])); + + // now move region1 from servers[0] to servers[2] + cluster.doAction(new MoveRegionAction(0, 0, 2)); + // now repeat check whether moving region1 from servers[1] to servers[2] + // would lower availability + assertTrue(cluster.wouldLowerAvailability(hri1, servers[2])); + + // start over again + clusterState.clear(); + List list3 = new ArrayList(); + HRegionInfo hri4 = RegionReplicaUtil.getRegionInfoForReplica(hri3, 1); + list3.add(hri4); + clusterState.put(servers[0], list0); //servers[0], rack1 hosts region1 + clusterState.put(servers[5], list1); //servers[5], rack2 hosts replica_of_region1 + clusterState.put(servers[6], list2); //servers[6], rack2 hosts region2 + clusterState.put(servers[12], list3); //servers[12], rack3 hosts replica_of_region2 + // create a cluster with the above clusterState + cluster = new Cluster(clusterState, null, null, rackManager); + // check whether a move of replica_of_region2 from servers[12],rack3 to servers[0],rack1 would + // lower the availability + assertTrue(!cluster.wouldLowerAvailability(hri4, servers[0])); + // now move region2 from servers[6],rack2 to servers[0],rack1 + cluster.doAction(new MoveRegionAction(2, 2, 0)); + // now repeat check if replica_of_region2 from servers[12],rack3 to servers[0],rack1 would + // lower the availability + assertTrue(cluster.wouldLowerAvailability(hri3, servers[0])); + } + private List getListOfServerNames(final List sals) { List list = new ArrayList(); for (ServerAndLoad e : sals) { @@ -227,5 +382,4 @@ public class TestBaseLoadBalancer extends BalancerTestBase { } } } - } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java index 162a257..0eabc26 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java @@ -17,10 +17,19 @@ */ package org.apache.hadoop.hbase.master.balancer; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Queue; import java.util.TreeMap; @@ -34,29 +43,29 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.net.NetworkTopology; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - @Category(MediumTests.class) public class TestStochasticLoadBalancer extends BalancerTestBase { public static final String REGION_KEY = "testRegion"; private static StochasticLoadBalancer loadBalancer; private static final Log LOG = LogFactory.getLog(TestStochasticLoadBalancer.class); + private static Configuration conf; @BeforeClass public static void beforeAllTests() throws Exception { - Configuration conf = HBaseConfiguration.create(); + conf = HBaseConfiguration.create(); conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f); + conf.setClass("hbase.util.ip.to.rack.determiner", + MyRackResolver.class, DNSToSwitchMapping.class); loadBalancer = new StochasticLoadBalancer(); loadBalancer.setConf(conf); } @@ -250,10 +259,11 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { int numNodes = 3; int numRegions = 20; int numRegionsPerServer = 3; //all servers except one + int replication = 1; int numTables = 2; Map> serverMap = - createServerMap(numNodes, numRegions, numRegionsPerServer, numTables); + createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables); List list = convertToList(serverMap); @@ -275,13 +285,102 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { assertNull(plans); } + @Test + public void testReplicaCost() { + Configuration conf = HBaseConfiguration.create(); + StochasticLoadBalancer.CostFunction + costFunction = new StochasticLoadBalancer.RegionReplicaHostCostFunction(conf); + for (int[] mockCluster : clusterStateMocks) { + BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster); + double cost = costFunction.cost(cluster); + assertTrue(cost >= 0); + assertTrue(cost <= 1.01); + } + } + + @Test + public void testReplicaCostForReplicas() { + Configuration conf = HBaseConfiguration.create(); + StochasticLoadBalancer.CostFunction + costFunction = new StochasticLoadBalancer.RegionReplicaHostCostFunction(conf); + + int [] servers = new int[] {3,3,3,3,3}; + TreeMap> clusterState = mockClusterServers(servers); + + BaseLoadBalancer.Cluster cluster; + + cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null); + costFunction.init(cluster); + double costWithoutReplicas = costFunction.cost(cluster); + assertEquals(0, costWithoutReplicas, 0); + + // replicate the region from first server to the last server + HRegionInfo replica1 = RegionReplicaUtil.getRegionInfoForReplica( + clusterState.firstEntry().getValue().get(0),1); + clusterState.lastEntry().getValue().add(replica1); + + cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null); + costFunction.init(cluster); + double costWith1ReplicaDifferentServer = costFunction.cost(cluster); + + assertEquals(0, costWith1ReplicaDifferentServer, 0); + + // add a third replica to the last server + HRegionInfo replica2 = RegionReplicaUtil.getRegionInfoForReplica(replica1, 2); + clusterState.lastEntry().getValue().add(replica2); + + cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null); + costFunction.init(cluster); + double costWith1ReplicaSameServer = costFunction.cost(cluster); + + assertTrue(costWith1ReplicaDifferentServer < costWith1ReplicaSameServer); + + // test with replication = 4 for following: + + HRegionInfo replica3; + Iterator>> it; + Entry> entry; + + clusterState = mockClusterServers(servers); + it = clusterState.entrySet().iterator(); + entry = it.next(); //first server + HRegionInfo hri = entry.getValue().get(0); + replica1 = RegionReplicaUtil.getRegionInfoForReplica(hri, 1); + replica2 = RegionReplicaUtil.getRegionInfoForReplica(hri, 2); + replica3 = RegionReplicaUtil.getRegionInfoForReplica(hri, 3); + entry.getValue().add(replica1); + entry.getValue().add(replica2); + it.next().getValue().add(replica3); //2nd server + + cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null); + costFunction.init(cluster); + double costWith3ReplicasSameServer = costFunction.cost(cluster); + + clusterState = mockClusterServers(servers); + hri = clusterState.firstEntry().getValue().get(0); + replica1 = RegionReplicaUtil.getRegionInfoForReplica(hri, 1); + replica2 = RegionReplicaUtil.getRegionInfoForReplica(hri, 2); + replica3 = RegionReplicaUtil.getRegionInfoForReplica(hri, 3); + + clusterState.firstEntry().getValue().add(replica1); + clusterState.lastEntry().getValue().add(replica2); + clusterState.lastEntry().getValue().add(replica3); + + cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null); + costFunction.init(cluster); + double costWith2ReplicasOnTwoServers = costFunction.cost(cluster); + + assertTrue(costWith2ReplicasOnTwoServers < costWith3ReplicasSameServer); + } + @Test (timeout = 60000) public void testSmallCluster() { int numNodes = 10; int numRegions = 1000; int numRegionsPerServer = 40; //all servers except one + int replication = 1; int numTables = 10; - testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true); + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); } @Test (timeout = 60000) @@ -289,8 +388,9 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { int numNodes = 20; int numRegions = 2000; int numRegionsPerServer = 40; //all servers except one + int replication = 1; int numTables = 10; - testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true); + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); } @Test (timeout = 60000) @@ -298,8 +398,10 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { int numNodes = 20; int numRegions = 2000; int numRegionsPerServer = 1; // all servers except one + int replication = 1; int numTables = 10; - testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, false /* max moves */); + /* fails because of max moves */ + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, false, false); } @Test (timeout = 800000) @@ -307,8 +409,9 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { int numNodes = 100; int numRegions = 10000; int numRegionsPerServer = 60; // all servers except one + int replication = 1; int numTables = 40; - testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true); + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); } @Test (timeout = 800000) @@ -316,12 +419,15 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { int numNodes = 200; int numRegions = 100000; int numRegionsPerServer = 40; // all servers except one + int replication = 1; int numTables = 400; testWithCluster(numNodes, numRegions, numRegionsPerServer, + replication, numTables, - false /* num large num regions means may not always get to best balance with one run */); + false, /* num large num regions means may not always get to best balance with one run */ + false); } @@ -330,8 +436,9 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { int numNodes = 100; int numRegions = 2000; int numRegionsPerServer = 9; // all servers except one + int replication = 1; int numTables = 110; - testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true); + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); // TODO(eclark): Make sure that the tables are well distributed. } @@ -341,20 +448,145 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { int numRegions = 100000; //100 regions per RS int numRegionsPerServer = 80; //all servers except one int numTables = 100; - testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true); + int replication = 1; + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); + } + + @Test (timeout = 60000) + public void testRegionReplicasOnSmallCluster() { + int numNodes = 10; + int numRegions = 1000; + int replication = 3; // 3 replicas per region + int numRegionsPerServer = 80; //all regions are mostly balanced + int numTables = 10; + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); + } + + @Test (timeout = 60000) + public void testRegionReplicasOnMidCluster() { + conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); + loadBalancer.setConf(conf); + int numNodes = 200; + int numRegions = 40 * 200; + int replication = 3; // 3 replicas per region + int numRegionsPerServer = 30; //all regions are mostly balanced + int numTables = 10; + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); + } + + @Test (timeout = 60000) + public void testRegionReplicasOnLargeCluster() { + conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); + loadBalancer.setConf(conf); + int numNodes = 1000; + int numRegions = 40 * numNodes; //40 regions per RS + int numRegionsPerServer = 30; //all servers except one + int numTables = 100; + int replication = 3; + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); + } + + @Test (timeout = 60000) + public void testRegionReplicasOnMidClusterHighReplication() { + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L); + conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); + loadBalancer.setConf(conf); + int numNodes = 100; + int numRegions = 6 * 100; + int replication = 100; // 100 replicas per region, one for each server + int numRegionsPerServer = 5; + int numTables = 10; + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); + } + + @Test (timeout = 60000) + public void testRegionReplicationOnMidClusterSameHosts() { + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L); + conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); + loadBalancer.setConf(conf); + int numHosts = 100; + int numRegions = 100 * 100; + int replication = 3; // 3 replicas per region + int numRegionsPerServer = 5; + int numTables = 10; + Map> serverMap = + createServerMap(numHosts, numRegions, numRegionsPerServer, replication, numTables); + int numNodesPerHost = 4; + + // create a new map with 4 RS per host. + Map> newServerMap = new TreeMap>(serverMap); + for (Map.Entry> entry : serverMap.entrySet()) { + for (int i=1; i < numNodesPerHost; i++) { + ServerName s1 = entry.getKey(); + ServerName s2 = ServerName.valueOf(s1.getHostname(), s1.getPort() + i, 1); // create an RS for the same host + newServerMap.put(s2, new ArrayList()); + } + } + + testWithCluster(newServerMap, null, true, true); + } + + private static class ForTestRackManager extends RackManager { + int numRacks; + public ForTestRackManager(int numRacks) { + this.numRacks = numRacks; + } + @Override + public String getRack(ServerName server) { + return "rack_" + (server.hashCode() % numRacks); + } + } + + @Test (timeout = 120000) + public void testRegionReplicationOnMidClusterWithRacks() { + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L); + conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 45 * 1000); // 45 sec + loadBalancer.setConf(conf); + int numNodes = 80; + int numRegions = numNodes * 50; + int replication = 4; // 4 replicas per region + int numRegionsPerServer = 40; + int numTables = 10; + int numRacks = 5; // all replicas should be on a different rack + Map> serverMap = + createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables); + RackManager rm = new ForTestRackManager(numRacks); + + testWithCluster(serverMap, rm, true, true); + } + + @Test (timeout = 60000) + public void testRegionReplicationOnMidClusterReplicationGreaterThanNumNodes() { + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L); + conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); + loadBalancer.setConf(conf); + int numNodes = 80; + int numRegions = 6 * 100; + int replication = 100; // 100 replicas per region, more than numNodes + int numRegionsPerServer = 5; + int numTables = 10; + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, false); } protected void testWithCluster(int numNodes, - int numRegions, - int numRegionsPerServer, - int numTables, - boolean assertFullyBalanced) { + int numRegions, + int numRegionsPerServer, + int replication, + int numTables, + boolean assertFullyBalanced, boolean assertFullyBalancedForReplicas) { Map> serverMap = - createServerMap(numNodes, numRegions, numRegionsPerServer, numTables); + createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables); + testWithCluster(serverMap, null, assertFullyBalanced, assertFullyBalancedForReplicas); + } + + protected void testWithCluster(Map> serverMap, + RackManager rackManager, boolean assertFullyBalanced, boolean assertFullyBalancedForReplicas) { List list = convertToList(serverMap); LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list)); + loadBalancer.setRackManager(rackManager); // Run the balancer. List plans = loadBalancer.balanceCluster(serverMap); assertNotNull(plans); @@ -369,12 +601,16 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { assertClusterAsBalanced(balancedCluster); List secondPlans = loadBalancer.balanceCluster(serverMap); assertNull(secondPlans); + if (assertFullyBalancedForReplicas) { + assertRegionReplicaPlacement(serverMap, rackManager); + } } } private Map> createServerMap(int numNodes, int numRegions, int numRegionsPerServer, + int replication, int numTables) { //construct a cluster of numNodes, having a total of numRegions. Each RS will hold //numRegionsPerServer many regions except for the last one, which will host all the @@ -384,6 +620,36 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { cluster[i] = numRegionsPerServer; } cluster[cluster.length - 1] = numRegions - ((cluster.length - 1) * numRegionsPerServer); - return mockClusterServers(cluster, numTables); + Map> clusterState = mockClusterServers(cluster, numTables); + if (replication > 0) { + // replicate the regions to the same servers + for (List regions : clusterState.values()) { + int length = regions.size(); + for (int i = 0; i < length; i++) { + for (int r = 1; r < replication ; r++) { + regions.add(RegionReplicaUtil.getRegionInfoForReplica(regions.get(i), r)); + } + } + } + } + + return clusterState; + } + + public static class MyRackResolver implements DNSToSwitchMapping { + + public MyRackResolver(Configuration conf) {} + + @Override + public List resolve(List names) { + List racks = new ArrayList(names.size()); + for (int i = 0; i < names.size(); i++) { + racks.add(i, NetworkTopology.DEFAULT_RACK); + } + return racks; + } + + @Override + public void reloadCachedMappings() {} } }