Index: hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancerTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancerTest.java (revision ) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancerTest.java (revision ) @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Used for tests + */ +public class FavoredStochasticBalancerTest extends FavoredStochasticBalancer { + + private static final Log LOG = LogFactory.getLog(FavoredStochasticBalancerTest.class); + + @Override + protected void configureGenerators() { + List fnPickers = new ArrayList(); + fnPickers.add(new FavoredNodeLoadPicker()); + setCandidateGenerators(fnPickers); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java (revision ) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java (revision ) @@ -0,0 +1,573 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerLoad; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.StartcodeAgnosticServerName; +import org.apache.hadoop.hbase.favored.FavoredNodesManager; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RackManager; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan.Position; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Pair; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class FavoredStochasticBalancer extends StochasticLoadBalancer implements + FavoredNodesPromoter { + + private static final Log LOG = LogFactory.getLog(FavoredStochasticBalancer.class); + private ClusterStatus status; + private FavoredNodesManager fnm; + + + @Override + public void initialize() throws HBaseIOException { + configureGenerators(); + super.initialize(); + } + + protected void configureGenerators() { + List fnPickers = new ArrayList(2); + fnPickers.add(new FavoredNodeLoadPicker()); + fnPickers.add(new FavoredNodeLocalityPicker()); + setCandidateGenerators(fnPickers); + } + + @Override + public void setMasterServices(MasterServices masterServices) { + super.setMasterServices(masterServices); + fnm = masterServices.getFavoredNodesManager(); + } + + @Override + public void setClusterStatus(ClusterStatus st) { + super.setClusterStatus(st); + this.status = st; + } + + @Override + public Map> roundRobinAssignment(List regions, + List servers) throws HBaseIOException { + //TODO: Use complete redistribute API + Map> assignmentMap; + try { + FavoredNodeAssignmentHelper assignmentHelper = + new FavoredNodeAssignmentHelper(servers, fnm.getRackManager()); + assignmentHelper.initialize(); + + // Segregate the regions into two types: + // 1. The regions that have favored node assignment, and where at least + // one of the favored node is still alive. In this case, try to adhere + // to the current favored nodes assignment as much as possible - i.e., + // if the current primary is gone, then make the secondary or tertiary + // as the new host for the region (based on their current load). + // Note that we don't change the favored + // node assignments here (even though one or more favored node is currently + // down). It is up to the balanceCluster to do this hard work. The HDFS + // can handle the fact that some nodes in the favored nodes hint is down + // It'd allocate some other DNs. In combination with stale settings for HDFS, + // we should be just fine. + // 2. The regions that currently don't have favored node assignment. We will + // need to come up with favored nodes assignments for them. The corner case + // in (1) above is that all the nodes are unavailable and in that case, we + // will note that this region doesn't have favored nodes. + Pair>, List> segregatedRegions = + segregateRegionsAndAssignRegionsWithFavoredNodes(regions, servers); + Map> regionsWithFavoredNodesMap = segregatedRegions.getFirst(); + List regionsWithNoFavoredNodes = segregatedRegions.getSecond(); + assignmentMap = new HashMap>(); + fnm.generateFavoredNodes(assignmentHelper, assignmentMap, regionsWithNoFavoredNodes, + servers); + // merge the assignment maps + for (ServerName sn : regionsWithFavoredNodesMap.keySet()) { + if (assignmentMap.get(sn) == null) { + assignmentMap.put(sn, Lists.newArrayList()); + } + assignmentMap.get(sn).addAll(regionsWithFavoredNodesMap.get(sn)); + } + assignmentMap.putAll(regionsWithFavoredNodesMap); + } catch (Exception ex) { + throw new HBaseIOException("Encountered exception while doing favored-nodes assignment " + ex + + " Falling back to regular assignment", ex); + } + return assignmentMap; + } + + private Pair>, List> + segregateRegionsAndAssignRegionsWithFavoredNodes(List regions, + List availableServers) { + Map> assignmentMapForFavoredNodes = + new HashMap>(regions.size() / 2); + List regionsWithNoFavoredNodes = new ArrayList(regions.size()/2); + for (HRegionInfo region : regions) { + if (region.getTable().isSystemTable()) { + try { + ServerName destination = super.randomAssignment(region, availableServers); + addRegionToMap(assignmentMapForFavoredNodes, region, destination); + } catch (HBaseIOException e) { + LOG.error("Failed to assign region: " + region.getRegionNameAsString(), e); + } + } else { + List favoredNodes = fnm.getFavoredNodes(region); + ServerName primaryHost = null; + ServerName secondaryHost = null; + ServerName tertiaryHost = null; + if (favoredNodes != null) { + for (ServerName s : favoredNodes) { + ServerName serverWithLegitStartCode = availableServersContains(availableServers, s); + if (serverWithLegitStartCode != null) { + FavoredNodesPlan.Position position = FavoredNodesPlan.getFavoredServerPosition( + favoredNodes, s); + if (Position.PRIMARY.equals(position)) { + primaryHost = serverWithLegitStartCode; + } else if (Position.SECONDARY.equals(position)) { + secondaryHost = serverWithLegitStartCode; + } else if (Position.TERTIARY.equals(position)) { + tertiaryHost = serverWithLegitStartCode; + } + } + } + assignRegionToAvailableFavoredNode(assignmentMapForFavoredNodes, region, primaryHost, + secondaryHost, tertiaryHost); + } else { + regionsWithNoFavoredNodes.add(region); + } + } + } + return new Pair>, List>( + assignmentMapForFavoredNodes, regionsWithNoFavoredNodes); + } + + private void addRegionToMap(Map> assignmentMapForFavoredNodes, + HRegionInfo region, ServerName host) { + List regionsOnServer = null; + if ((regionsOnServer = assignmentMapForFavoredNodes.get(host)) == null) { + regionsOnServer = new ArrayList(); + assignmentMapForFavoredNodes.put(host, regionsOnServer); + } + regionsOnServer.add(region); + } + + // Do a check of the hostname and port and return the servername from the servers list + // that matched (the favoredNode will have a startcode of -1 but we want the real + // server with the legit startcode + private ServerName availableServersContains(List servers, ServerName favoredNode) { + for (ServerName server : servers) { + if (ServerName.isSameHostnameAndPort(favoredNode, server)) { + return server; + } + } + return null; + } + + private void assignRegionToAvailableFavoredNode(Map> assignmentMapForFavoredNodes, HRegionInfo region, ServerName primaryHost, + ServerName secondaryHost, ServerName tertiaryHost) { + if (primaryHost != null) { + addRegionToMap(assignmentMapForFavoredNodes, region, primaryHost); + } else if (secondaryHost != null && tertiaryHost != null) { + // assign the region to the one with a lower load + // (both have the desired hdfs blocks) + ServerName s; + ServerLoad tertiaryLoad = super.services.getServerManager().getLoad(tertiaryHost); + ServerLoad secondaryLoad = super.services.getServerManager().getLoad(secondaryHost); + if (secondaryLoad != null && tertiaryLoad != null) { + if (secondaryLoad.getLoad() < tertiaryLoad.getLoad()) { + s = secondaryHost; + } else { + s = tertiaryHost; + } + } else { + if (this.RANDOM.nextBoolean()) { + s = secondaryHost; + } else { + s = tertiaryHost; + } + } + addRegionToMap(assignmentMapForFavoredNodes, region, s); + } else if (secondaryHost != null) { + addRegionToMap(assignmentMapForFavoredNodes, region, secondaryHost); + } else if (tertiaryHost != null) { + addRegionToMap(assignmentMapForFavoredNodes, region, tertiaryHost); + } + } + + @Override + public ServerName randomAssignment(HRegionInfo regionInfo, List servers) + throws HBaseIOException { + ServerName destination = null; + // TODO: Decide strategy for assigning system tables. + if (regionInfo.getTable().isSystemTable()) { + destination = super.randomAssignment(regionInfo, servers); + return destination; + } + // We don't use FavoredNodeBalancer's random assignment, as that method generates + // new favored nodes if none of the favored nodes are online. We want to return null + // in that case. + List favoredNodes = fnm.getFavoredNodes(regionInfo); + if (favoredNodes == null || favoredNodes.isEmpty()) { + // Generate new favored nodes and return primary, don't use FavoredNodeBalancer + FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(servers, getConf()); + helper.initialize(); + try { + List newFavoredNodes = helper.generateFavoredNodes(regionInfo); + Map> regionFNMap = Maps.newHashMap(); + regionFNMap.put(regionInfo, newFavoredNodes); + fnm.updateFavoredNodes(regionFNMap); + List onlineServers = getOnlineFavoredNodes(servers, newFavoredNodes); + destination = onlineServers.get(RANDOM.nextInt(onlineServers.size())); + } catch (IOException e) { + LOG.warn("Encountered exception while doing favored-nodes (random)assignment " + e + + " Falling back to regular assignment"); + //return super.randomAssignment(regionInfo, servers); + throw new HBaseIOException(e); + } + } else { + List onlineServers = getOnlineFavoredNodes(servers, favoredNodes); + if (onlineServers.size() > 0) { + destination = onlineServers.get(RANDOM.nextInt(onlineServers.size())); + } + } + boolean alwaysAssign = getConf().getBoolean(ALWAYS_ASSIGN_REGIONS, true); + if (destination == null && alwaysAssign) { + destination = super.randomAssignment(regionInfo, servers); + } + return destination; + } + + @Override + public Map> retainAssignment(Map regions, + List servers) throws HBaseIOException { + Map> result = super.retainAssignment(regions, servers); + + // Lets check if favored nodes info is in META, if not generate now. + FavoredNodeAssignmentHelper assignmentHelper = new FavoredNodeAssignmentHelper(servers, + new RackManager(getConf())); + assignmentHelper.initialize(); + LOG.debug("Generating favored nodes for regions missing them."); + for (Entry> entry : result.entrySet()) { + Map primaryRSMap = new HashMap(); + ServerName current = ServerName.valueOf(entry.getKey().getHostAndPort(), + ServerName.NON_STARTCODE); + try { + for (HRegionInfo region : entry.getValue()) { + List favoredNodes = fnm.getFavoredNodes(region); + if (!region.getTable().isSystemTable()) { + if (favoredNodes == null + || favoredNodes.size() < FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) { + primaryRSMap.put(region, current); + LOG.debug("Generating favored nodes for region " + region); + } + } + } + Map secondaryAndTertiaryRSMap = assignmentHelper + .placeSecondaryAndTertiaryRS(primaryRSMap); + for (HRegionInfo hri : primaryRSMap.keySet()) { + ServerName[] secondaryAndTertiaryNodes = secondaryAndTertiaryRSMap.get(hri); + if (secondaryAndTertiaryNodes != null) { + List newFavoredNodes = Lists.newArrayList(); + newFavoredNodes.add(primaryRSMap.get(hri)); + newFavoredNodes.add(ServerName.valueOf(secondaryAndTertiaryNodes[0].getHostname(), + secondaryAndTertiaryNodes[0].getPort(), ServerName.NON_STARTCODE)); + newFavoredNodes.add(ServerName.valueOf(secondaryAndTertiaryNodes[1].getHostname(), + secondaryAndTertiaryNodes[1].getPort(), ServerName.NON_STARTCODE)); + Map> regionFNMap = Maps.newHashMap(); + regionFNMap.put(hri, newFavoredNodes); + fnm.updateFavoredNodes(regionFNMap); + } else { + throw new HBaseIOException("Favored nodes not updated for region " + hri); + } + } + } catch (Exception ex) { + throw new HBaseIOException("Encountered exception while generating favored nodes. ", ex); + } + } + return result; + } + + public List getOnlineFavoredNodes(List onlineServers, + List serversWithoutStartCodes) { + if (serversWithoutStartCodes == null) { + return null; + } else { + List result = Lists.newArrayList(); + for (ServerName sn : serversWithoutStartCodes) { + for (ServerName online : onlineServers) { + if (ServerName.isSameHostnameAndPort(sn, online)) { + result.add(online); + } + } + } + return result; + } + } + + /* + * This should only be called for system tables. + */ + @Override + public Map> generateFavoredNodesForDaughter( + List servers, HRegionInfo parent, HRegionInfo hri_a, HRegionInfo hri_b) + throws IOException { + Map> result = new HashMap>(); + FavoredNodeAssignmentHelper assignmentHelper = new FavoredNodeAssignmentHelper(servers, + fnm.getRackManager()); + assignmentHelper.initialize(); + List parentFavoredNodes = fnm.getFavoredNodes(parent); + if (parentFavoredNodes == null) { + LOG.debug("Unable to find favored nodes for parent, " + parent + + " generating new favored nodes for daughter"); + result.put(hri_a, assignmentHelper.generateFavoredNodes(hri_a)); + result.put(hri_b, assignmentHelper.generateFavoredNodes(hri_b)); + } else { + Set existingFavNodes = Sets.newHashSet(); + existingFavNodes.add(parentFavoredNodes.get(0)); + existingFavNodes.add(parentFavoredNodes.get(1)); + while (existingFavNodes.size() < FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) { + ServerName newNode = assignmentHelper.generateMissingFavoredNode(Lists + .newArrayList(existingFavNodes)); + existingFavNodes.add(newNode); + } + result.put(hri_a, Lists.newArrayList(existingFavNodes)); + existingFavNodes.clear(); + existingFavNodes.add(parentFavoredNodes.get(0)); + existingFavNodes.add(parentFavoredNodes.get(2)); + while (existingFavNodes.size() < FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) { + ServerName newNode = assignmentHelper.generateMissingFavoredNode(Lists + .newArrayList(existingFavNodes)); + existingFavNodes.add(newNode); + } + result.put(hri_b, Lists.newArrayList(existingFavNodes)); + } + Map> regionFNMap = Maps.newHashMap(); + regionFNMap.put(hri_a, result.get(hri_a)); + regionFNMap.put(hri_b, result.get(hri_b)); + fnm.updateFavoredNodes(regionFNMap); + return result; + } + + @Override + public void generateFavoredNodesForMergedRegion(HRegionInfo merged, HRegionInfo hriA, + HRegionInfo hriB) throws IOException { + //TODO probably should inherit the bigger region's FN + Map> regionFNMap = Maps.newHashMap(); + regionFNMap.put(merged, fnm.getFavoredNodes(hriA)); + fnm.updateFavoredNodes(regionFNMap); + } + + class FavoredNodeLocalityPicker extends CandidateGenerator { + + @Override + Cluster.Action generate(Cluster cluster) { + cluster.calculateRegionServerLocalities(); + // Pick lowest local region server + int thisServer = pickLowestLocalityServer(cluster); + int thisRegion; + if (thisServer == -1) { + LOG.trace("Could not pick lowest local region server"); + return Cluster.NullAction; + } else { + // Pick lowest local region on this server + thisRegion = pickLowestLocalRegionOnServer(cluster, thisServer); + } + if (thisRegion == -1) { + if (cluster.regionsPerServer[thisServer].length > 0) { + LOG.trace("Could not pick lowest local region even when region server held " + + cluster.regionsPerServer[thisServer].length + " regions"); + } + return Cluster.NullAction; + } + HRegionInfo hri = cluster.regions[thisRegion]; + List favoredNodes = fnm.getFavoredNodes(hri); + int otherServer; + if (favoredNodes == null) { + if (hri.getTable().isSystemTable()) { + otherServer = pickOtherRandomServer(cluster, thisServer); + } else { + return Cluster.NullAction; + } + } else { + // Pick other favored node with the highest locality + otherServer = getDifferentFavoredNode(cluster, favoredNodes, thisServer); + } + return getAction(thisServer, thisRegion, otherServer, -1); + } + + private int pickLowestLocalityServer(Cluster cluster) { + return cluster.getLowestLocalityRegionServer(); + } + + private int getDifferentFavoredNode(Cluster cluster, List favoredNodes, + int currentServer) { + List fnIndex = new ArrayList(); + for (ServerName sn : favoredNodes) { + if (cluster.serversToIndex.containsKey(sn.getHostAndPort())) { + fnIndex.add(cluster.serversToIndex.get(sn.getHostAndPort())); + } + } + float locality = 0; + int highestLocalRSIndex = -1; + for (Integer index : fnIndex) { + if (index != currentServer) { + float temp = cluster.localityPerServer[index]; + if (temp >= locality) { + locality = temp; + highestLocalRSIndex = index; + } + } + } + return highestLocalRSIndex; + } + + private int pickLowestLocalRegionOnServer(Cluster cluster, int server) { + return cluster.getLowestLocalityRegionOnServer(server); + } + } + + class FavoredNodeLoadPicker extends CandidateGenerator { + + @Override + Cluster.Action generate(Cluster cluster) { + cluster.sortServersByRegionCount(); + int thisServer = pickMostLoadedServer(cluster); + int thisRegion = pickRandomRegion(cluster, thisServer, 0); + HRegionInfo hri = cluster.regions[thisRegion]; + int otherServer; + List favoredNodes = fnm.getFavoredNodes(hri); + if (favoredNodes == null) { + otherServer = pickLeastLoadedServer(cluster, thisServer); + } else { + otherServer = pickLeastLoadedFNServer(cluster, favoredNodes, thisServer); + } + return getAction(thisServer, thisRegion, otherServer, -1); + } + + private int pickLeastLoadedServer(final Cluster cluster, int thisServer) { + Integer[] servers = cluster.serverIndicesSortedByRegionCount; + int index; + for (index = 0; index < servers.length ; index++) { + if ((servers[index] != null) && servers[index] != thisServer) { + break; + } + } + return servers[index]; + } + + private int pickLeastLoadedFNServer(final Cluster cluster, List favoredNodes, + int currentServerIndex) { + List fnIndex = new ArrayList(); + for (ServerName sn : favoredNodes) { + if (cluster.serversToIndex.containsKey(sn.getHostAndPort())) { + fnIndex.add(cluster.serversToIndex.get(sn.getHostAndPort())); + } + } + int leastLoadedFN = -1; + int load = Integer.MAX_VALUE; + for (Integer index : fnIndex) { + if (index != currentServerIndex) { + int temp = cluster.getNumRegions(index); + if (temp < load) { + load = temp; + leastLoadedFN = index; + } + } + } + return leastLoadedFN; + } + + /** + * Pick most loaded server. + * + * @param cluster + * @return index of the region server picked. + */ + private int pickMostLoadedServer(final Cluster cluster) { + Integer[] servers = cluster.serverIndicesSortedByRegionCount; + int index; + for (index = servers.length - 1; index > 0 ; index--) { + if (servers[index] != null) { + break; + } + } + return servers[index]; + } + } + + @Override + public List balanceCluster(Map> clusterState) { + if (this.services != null) { + List regionPlans = Lists.newArrayList(); + Map> correctAssignments = new HashMap>(); + int misplacedRegions = 0; + for (Entry> entry : clusterState.entrySet()) { + ServerName current = entry.getKey(); + List regions = Lists.newArrayList(); + correctAssignments.put(current, regions); + for (HRegionInfo hri : entry.getValue()) { + List favoredNodes = fnm.getFavoredNodes(hri); + //TODO: we might need this lookup to be o(1) + if (FavoredNodesPlan.getFavoredServerPosition(favoredNodes, current) != null + || hri.getTable().isSystemTable()) { + correctAssignments.get(current).add(hri); + } else { + RegionPlan rp = new RegionPlan(hri, current, BOGUS_SERVER_NAME); + regionPlans.add(rp); + misplacedRegions++; + } + } + } + LOG.debug("Found " + misplacedRegions + " number of regions not on favored nodes."); + List regionPlansFromBalance = super.balanceCluster(correctAssignments); + if (regionPlansFromBalance != null) { + regionPlans.addAll(regionPlansFromBalance); + } + return regionPlans; + } else { + return super.balanceCluster(clusterState); + } + } +} + Index: hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredStochasticBalancerPickers.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredStochasticBalancerPickers.java (revision ) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredStochasticBalancerPickers.java (revision ) @@ -0,0 +1,337 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static junit.framework.TestCase.assertNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Deque; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.master.RackManager; +import org.apache.hadoop.hbase.master.RegionStates; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.RegionLoad; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.favored.FavoredNodesManager; +import org.apache.hadoop.hbase.master.LoadBalancer; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.net.ScriptBasedMapping; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +@Category(LargeTests.class) +public class TestFavoredStochasticBalancerPickers extends BalancerTestBase { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final Log LOG = LogFactory.getLog(TestFavoredStochasticBalancerPickers.class); + private static final int SLAVES = 6; + private static HBaseAdmin admin; + protected static Configuration conf; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + conf = TEST_UTIL.getConfiguration(); + // Enable the favored nodes based load balancer + conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, + FavoredStochasticBalancerTest.class, LoadBalancer.class); + conf.setFloat("hbase.min.locality.redistribute", 0.0f); + conf.setBoolean("hbase.redistribute.even.on.same.rack", true); + conf.set(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + ScriptBasedMapping.class.getName()); + conf.setInt("hbase.assignment.maximum.attempts", 3); + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 30000); + conf.setInt("hbase.master.balancer.stochastic.moveCost", 0); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1200000); + conf.setBoolean("hbase.master.balancer.stochastic.execute.maxSteps", true); + //Don't let chore run. + // TODO - Comment out when FNChore patch is committed. +// conf.setInt(FavoredNodesRepairChore.FAVORED_NODE_REPAIR_CHORE_FREQ, Integer.MAX_VALUE); + } + + @Before + public void startCluster() throws Exception { + TEST_UTIL.startMiniCluster(SLAVES); + TEST_UTIL.getDFSCluster().waitClusterUp(); + TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(120*1000); + admin = TEST_UTIL.getHBaseAdmin(); + admin.setBalancerRunning(false, true); + } + + @After + public void stopCluster() throws Exception { + TEST_UTIL.cleanupTestDir(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testBasicBalance() throws Exception { + final int regions = 10; + TableName tableName = TableName.valueOf("testBasicBalance"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), regions); + TEST_UTIL.loadTable(admin.getConnection().getTable(tableName), HConstants.CATALOG_FAMILY); + admin.flush(tableName); + compactTable(TEST_UTIL, tableName); + + ServerName masterServerName = TEST_UTIL.getMiniHBaseCluster().getServerHoldingMeta(); + List masterRegions = admin.getOnlineRegions(masterServerName); + + RegionServerThread rs1 = TEST_UTIL.getHBaseCluster().startRegionServer(); + RegionServerThread rs2 = TEST_UTIL.getHBaseCluster().startRegionServer(); + // TODO: Increased timeing + TEST_UTIL.waitFor(10000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().size() == SLAVES + 2); + } + }); + // Now try to run balance, and verify no regions are moved to the 2 region servers recently + // started. + admin.setBalancerRunning(true, true); + assertTrue("Balancer did not run", admin.balancer()); + TEST_UTIL.waitFor(120000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (TEST_UTIL.getHBaseAdmin().getClusterStatus().getRegionsInTransition().size() == 0); + } + }); + List hris = admin.getOnlineRegions(rs1.getRegionServer().getServerName()); + for (HRegionInfo hri : hris) { + if (!masterRegions.contains(hri)) { + assertFalse(hri.getTable().equals(tableName)); + } + } + hris = admin.getOnlineRegions(rs2.getRegionServer().getServerName()); + for (HRegionInfo hri : hris) { + if (!masterRegions.contains(hri)) { + assertFalse(hri.getTable().equals(tableName)); + } + } + } + + @Test + public void testPickers() throws Exception { + int regions = SLAVES * 3; + TableName tableName = TableName.valueOf("testPickers"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), regions); + TEST_UTIL.loadTable(admin.getConnection().getTable(tableName), HConstants.CATALOG_FAMILY); + admin.flush(tableName); + ServerName masterServerName = TEST_UTIL.getMiniHBaseCluster().getServerHoldingMeta(); + final ServerName mostLoadedServer = getRSWithMaxRegions(Lists.newArrayList(masterServerName)); + int numRegions = admin.getOnlineRegions(mostLoadedServer).size(); + assertNotNull(mostLoadedServer); + ServerName source = getRSWithMaxRegions(Lists.newArrayList(masterServerName, mostLoadedServer)); + assertNotNull(source); + int regionsToMove = admin.getOnlineRegions(source).size()/2; + List hris = admin.getOnlineRegions(source); + for (int i = 0; i < regionsToMove; i++) { + admin.move(hris.get(i).getEncodedNameAsBytes(), Bytes.toBytes(mostLoadedServer.getServerName())); + LOG.info("Moving region: " + hris.get(i).getRegionNameAsString() + " to " + mostLoadedServer); + } + final int finalRegions = numRegions + regionsToMove; + TEST_UTIL.waitFor(60000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (TEST_UTIL.getHBaseAdmin().getClusterStatus().getRegionsInTransition().size() == 0); + } + }); + TEST_UTIL.waitFor(60000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + int numRegions = TEST_UTIL.getHBaseAdmin().getOnlineRegions(mostLoadedServer).size(); + return (numRegions == finalRegions); + } + }); + RegionServerThread rs1 = TEST_UTIL.getHBaseCluster().startRegionServer(); + TEST_UTIL.waitFor(10000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (TEST_UTIL.getHBaseCluster().getRegionServerThreads().size() == SLAVES + 1); + } + }); + Map> serverAssignments = new HashMap>(); + ClusterStatus status = admin.getClusterStatus(); + for (ServerName sn : status.getServers()) { + if (!ServerName.isSameHostnameAndPort(sn, masterServerName)) { + serverAssignments.put(sn, admin.getOnlineRegions(sn)); + } + } + Map> loads = new HashMap>(); + RegionLocationFinder regionFinder = new RegionLocationFinder(); + regionFinder.setClusterStatus(admin.getClusterStatus()); + regionFinder.setConf(conf); + regionFinder.setServices(TEST_UTIL.getMiniHBaseCluster().getMaster()); + Cluster cluster = new Cluster(serverAssignments, loads, regionFinder, new RackManager(conf)); + FavoredStochasticBalancerTest balancer = (FavoredStochasticBalancerTest) TEST_UTIL + .getMiniHBaseCluster().getMaster().getLoadBalancer(); + FavoredNodesManager fnm = TEST_UTIL.getMiniHBaseCluster().getMaster().getFavoredNodesManager(); + cluster.sortServersByRegionCount(); + Integer[] servers = cluster.serverIndicesSortedByRegionCount; + LOG.info("Servers sorted by region count:" + Arrays.toString(servers)); + LOG.info("Cluster dump: " + cluster); + if (!mostLoadedServer.equals(cluster.servers[servers[servers.length -1]])) { + LOG.error("Most loaded server: " + mostLoadedServer + " does not match: " + + cluster.servers[servers[servers.length -1]]); + } + assertEquals(mostLoadedServer, cluster.servers[servers[servers.length -1]]); + FavoredStochasticBalancer.FavoredNodeLoadPicker loadPicker = balancer.new FavoredNodeLoadPicker(); + boolean userRegionPicked = false; + for (int i = 0; i < 100; i++) { + if (userRegionPicked) { + break; + } else { + Cluster.Action action = loadPicker.generate(cluster); + if (action.type == Cluster.Action.Type.MOVE_REGION) { + Cluster.MoveRegionAction moveRegionAction = (Cluster.MoveRegionAction) action; + HRegionInfo region = cluster.regions[moveRegionAction.region]; + assertNotEquals(-1, moveRegionAction.toServer); + ServerName destinationServer = cluster.servers[moveRegionAction.toServer]; + assertEquals(cluster.servers[moveRegionAction.fromServer], mostLoadedServer); + if (!region.getTable().isSystemTable()) { + List favNodes = fnm.getFavoredNodes(region); + assertTrue(favNodes.contains(ServerName.valueOf(destinationServer.getHostAndPort(), -1))); + userRegionPicked = true; + } + } + } + } + assertTrue("load picker did not pick expected regions in 100 iterations.", userRegionPicked); + } + + private ServerName getRSWithMaxRegions(ArrayList excludeNodes) throws IOException { + int maxRegions = 0; + ServerName maxLoadedServer = null; + + for (ServerName sn : admin.getClusterStatus().getServers()) { + if (admin.getOnlineRegions(sn).size() > maxRegions) { + if (excludeNodes == null || !doesMatchExcludeNodes(excludeNodes, sn)) { + maxRegions = admin.getOnlineRegions(sn).size(); + maxLoadedServer = sn; + } + } + } + return maxLoadedServer; + } + + private boolean doesMatchExcludeNodes(ArrayList excludeNodes, ServerName sn) { + for (ServerName excludeSN : excludeNodes) { + if (ServerName.isSameHostnameAndPort(sn, excludeSN)) { + return true; + } + } + return false; + } + + @Test + public void testMisplacedRegions() throws Exception { + int regions = 10; + TableName tableName = TableName.valueOf("testMisplacedRegions"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), regions); + TEST_UTIL.waitTableAvailable(tableName); + final HRegionInfo misplacedRegion = admin.getTableRegions(tableName).get(0); + FavoredNodesManager fnm = TEST_UTIL.getHBaseCluster().getMaster().getFavoredNodesManager(); + List currentFN = fnm.getFavoredNodes(misplacedRegion); + assertNotNull(currentFN); + List serversForNewFN = Lists.newArrayList(); + for (ServerName sn : admin.getClusterStatus().getServers()) { + serversForNewFN.add(ServerName.valueOf(sn.getHostAndPort(), ServerName.NON_STARTCODE)); + } + for (ServerName sn : currentFN) { + serversForNewFN.remove(sn); + } + FavoredNodeAssignmentHelper helper = new FavoredNodeAssignmentHelper(serversForNewFN, conf); + helper.initialize(); + List newFavoredNodes = helper.generateFavoredNodes(misplacedRegion); + assertNotNull(newFavoredNodes); + assertEquals(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM, newFavoredNodes.size()); + Map> regionFNMap = Maps.newHashMap(); + regionFNMap.put(misplacedRegion, newFavoredNodes); + fnm.updateFavoredNodes(regionFNMap); + + RegionStates regionStates = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager() + .getRegionStates(); + final ServerName current = regionStates.getRegionServerOfRegion(misplacedRegion); + assertNull("Misplaced region is still hosted on favored node, not expected.", + FavoredNodesPlan.getFavoredServerPosition(fnm.getFavoredNodes(misplacedRegion), current)); + admin.setBalancerRunning(true, true); + assertTrue("Balancer did not run", admin.balancer()); + TEST_UTIL.waitFor(120000, 30000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + ServerName host = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager() + .getRegionStates().getRegionServerOfRegion(misplacedRegion); + return !ServerName.isSameHostnameAndPort(host, current); + } + }); + for (HRegionInfo hri : admin.getTableRegions(tableName)) { + ServerName host = regionStates.getRegionServerOfRegion(hri); + assertNotNull("Region not on favored node.", + FavoredNodesPlan.getFavoredServerPosition(fnm.getFavoredNodes(hri), host)); + } + } + + void compactTable(HBaseTestingUtility util, TableName tableName) throws IOException { + for(RegionServerThread t : + util.getMiniHBaseCluster().getRegionServerThreads()) { + for(Region region : t.getRegionServer().getOnlineRegions(tableName)) { + region.compact(true); + } + } + } +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredStochasticLoadBalancer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredStochasticLoadBalancer.java (revision ) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredStochasticLoadBalancer.java (revision ) @@ -0,0 +1,337 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.RegionStates; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.favored.FavoredNodesManager; +import org.apache.hadoop.hbase.master.LoadBalancer; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.net.ScriptBasedMapping; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + + +@Category(MediumTests.class) +public class TestFavoredStochasticLoadBalancer extends BalancerTestBase { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final int SLAVES = 3; + private static HBaseAdmin admin; + private int REGION_NUM = 5; + private static boolean postSplit = false; + protected static Configuration conf; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + setUpOnce(); + } + + static void setUpOnce() throws Exception { + conf = TEST_UTIL.getConfiguration(); + // Enable the favored nodes based load balancer + conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, + FavoredStochasticBalancerTest.class, LoadBalancer.class); + conf.setFloat("hbase.min.locality.redistribute", 0.0f); + conf.setBoolean("hbase.redistribute.even.on.same.rack", true); + conf.set(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + ScriptBasedMapping.class.getName()); + conf.setInt("hbase.assignment.maximum.attempts", 3); + //Making it high value, tests should explicitly call catalog janitor if needed. + conf.setInt("hbase.catalogjanitor.interval", Integer.MAX_VALUE); + //Don't let chore run. + // TODO: Set when the Chore patch is committed +// conf.setInt(FavoredNodesRepairChore.FAVORED_NODE_REPAIR_CHORE_FREQ, Integer.MAX_VALUE); + } + + @Before + public void startCluster() throws Exception { + TEST_UTIL.startMiniCluster(SLAVES); + TEST_UTIL.getDFSCluster().waitClusterUp(); + admin = TEST_UTIL.getHBaseAdmin(); + admin.setBalancerRunning(false, true); + } + + @After + public void stopCluster() throws Exception { + TEST_UTIL.cleanupTestDir(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testBasicRegionPlacement() throws Exception { + String tableName = "testBasicRegionPlacement"; + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGION_NUM); + FavoredNodesManager fnm = TEST_UTIL.getHBaseCluster().getMaster().getFavoredNodesManager(); + List regionsOfTable = + TEST_UTIL.getHBaseAdmin().getTableRegions(TableName.valueOf(tableName)); + for (HRegionInfo rInfo : regionsOfTable) { + Set favNodes = Sets.newHashSet(fnm.getFavoredNodes(rInfo)); + assertNotNull(favNodes); + assertEquals(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM, favNodes.size()); + } + Map> replicaLoadMap = + fnm.getReplicaLoad(Lists.newArrayList(admin.getClusterStatus().getServers())); + assertTrue("Not all replica load collected.", + admin.getClusterStatus().getServers().size() == replicaLoadMap.size()); + for (Entry> entry : replicaLoadMap.entrySet()) { + assertTrue(entry.getValue().size() == 3); + assertTrue(entry.getValue().get(0) >= 0); + assertTrue(entry.getValue().get(1) >= 0); + assertTrue(entry.getValue().get(2) >= 0); + } + admin.disableTable(TableName.valueOf(tableName)); + admin.deleteTable(TableName.valueOf(tableName)); + for (HRegionInfo rInfo : regionsOfTable) { + List favNodes = fnm.getFavoredNodes(rInfo); + assertNull(favNodes); + } + replicaLoadMap = + fnm.getReplicaLoad(Lists.newArrayList(admin.getClusterStatus().getServers())); + assertTrue("replica load found " + replicaLoadMap.size() + " instead of 0.", + replicaLoadMap.size() == admin.getClusterStatus().getServers().size()); + } + + + Map> getReplicaMap(TableName tableName) throws IOException { + List regionsOfTable = TEST_UTIL.getHBaseAdmin().getTableRegions(tableName); + FavoredNodesManager fnm = TEST_UTIL.getHBaseCluster().getMaster().getFavoredNodesManager(); + Map> regionReplicaMap = + new HashMap>(); + for (HRegionInfo rInfo : regionsOfTable) { + List favNodes = fnm.getFavoredNodes(rInfo); + assertNotNull(favNodes); + for (ServerName sn : favNodes) { + List replicas = regionReplicaMap.get(sn); + if (replicas == null) { + replicas = new ArrayList(); + } + replicas.add(rInfo); + regionReplicaMap.put(sn, replicas); + } + } + return regionReplicaMap; + } + + void checkMinReplicas(Map> replicaMap, int minReplicas, + Collection servers) { + assertEquals(servers.size(), replicaMap.size()); + for (ServerName sn : servers) { + assertTrue("Atleast min replica expected.", + replicaMap.get(ServerName.valueOf(sn.getHostAndPort(), ServerName.NON_STARTCODE)) + .size() >= minReplicas); + } + } + + + @Test + public void testRegionSplit() throws Exception { + final TableName tableName = TableName.valueOf("testRegionSplit"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc); + Table table = admin.getConnection().getTable(tableName); + TEST_UTIL.loadTable(admin.getConnection().getTable(tableName), HConstants.CATALOG_FAMILY); + List hris = admin.getTableRegions(tableName); + assertTrue(hris.size() == 1); + FavoredNodesManager fnm = TEST_UTIL.getHBaseCluster().getMaster().getFavoredNodesManager(); + Set parentFavNodes = Sets.newHashSet(fnm.getFavoredNodes(hris.get(0))); + assertEquals(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM, parentFavNodes.size()); + HRegion actualRegion = TEST_UTIL.getHBaseCluster().getRegions(tableName).get(0); + // install region co-processor to monitor splits + actualRegion.getCoprocessorHost().load(CustomObserver.class, + Coprocessor.PRIORITY_USER, TEST_UTIL.getConfiguration()); + admin.split(tableName, Bytes.toBytes("ggg")); + TEST_UTIL.waitFor(60000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return postSplit; + } + }); + // Do gets from both the daughter regions to see they are online. + Get splitRowKey = new Get(Bytes.toBytes("ggg")); + table.get(splitRowKey); + splitRowKey = new Get(Bytes.toBytes("abc")); + table.get(splitRowKey); + RegionStates regionStates = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager() + .getRegionStates(); + assertFalse( + "Parent region not offline after split, found in state + " + + regionStates.getRegionTransitionState(actualRegion.getRegionInfo().getEncodedName()), + regionStates.isRegionOnline(actualRegion.getRegionInfo())); + TEST_UTIL.waitTableAvailable(tableName); + for(RegionServerThread regionServerThread : TEST_UTIL.getMiniHBaseCluster() + .getLiveRegionServerThreads()) { + for(Region region : + regionServerThread.getRegionServer().getOnlineRegions(tableName)) { + region.compact(true); + } + } + final HRegionInfo parent = hris.get(0); + // The parents favored nodes would still be there, they will be cleaned up once catalog janitor runs. + assertNotNull(fnm.getFavoredNodes(parent)); + hris = admin.getTableRegions(tableName); + for (HRegionInfo hri : hris) { + assertNotNull(fnm.getFavoredNodes(hri)); + } + TEST_UTIL.waitFor(60000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return admin.runCatalogScan() > 0; + } + }); + assertNull(fnm.getFavoredNodes(parent)); + hris = admin.getTableRegions(tableName); + assertTrue(hris.size() == 2); + for (final HRegionInfo hri : hris) { + assertTrue("Favored nodes found null for region", + fnm.getFavoredNodes(hri) != null); + List favNodes = fnm.getFavoredNodes(hri); + assertEquals(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM, Sets.newHashSet(favNodes).size()); + int matchCount = 0; + for (ServerName server : favNodes) { + if (parentFavNodes.contains(server)) { + matchCount++; + } + } + assertTrue("Daughter region did not inherit 2 fns", matchCount >= 2); + ServerName sn = regionStates.getRegionServerOfRegion(hri); + final HRegionServer rs = getRegionServer(sn); + TEST_UTIL.waitFor(60000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (rs.getFavoredNodesForRegion(hri.getEncodedName()) != null); + } + }); + InetSocketAddress[] favoredSocketAddress = rs.getFavoredNodesForRegion(hri.getEncodedName()); + assertTrue(favoredSocketAddress.length == favNodes.size()); + assertTrue(favNodes.size() > 0); + int port = NetUtils.createSocketAddr( + conf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)) + .getPort(); + for (int j = 0; j < favoredSocketAddress.length; j++) { + InetSocketAddress addrFromRS = favoredSocketAddress[j]; + InetSocketAddress addrFromPlan = InetSocketAddress.createUnresolved(favNodes + .get(j).getHostname(), port); + + assertNotNull(addrFromRS); + assertNotNull(addrFromPlan); + assertTrue("Region server " + rs.getServerName().getHostAndPort() + " for region " + + hri.getRegionNameAsString() + " is " + addrFromRS + + " which is inconsistent with the plan " + addrFromPlan, + addrFromRS.equals(addrFromPlan)); + } + } + } + + /** + * Test for YHBASE-757. + * + * @throws Exception the exception + */ + @Test + public void testAssignmentWithNoFavNodes() throws Exception { + final String tableName = "testRegionWithNoFavNodes"; + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc); + HRegionInfo hri = admin.getTableRegions(TableName.valueOf(tableName)).get(0); + LoadBalancer balancer = TEST_UTIL.getHBaseCluster().getMaster().getLoadBalancer(); + FavoredNodesManager fnm = TEST_UTIL.getHBaseCluster().getMaster().getFavoredNodesManager(); + fnm.deleteFavoredNodesForRegion(Lists.newArrayList(hri)); + assertNull("Favored nodes not found null after delete", fnm.getFavoredNodes(hri)); + ServerName desintination = balancer.randomAssignment(hri, Lists.newArrayList(admin + .getClusterStatus().getServers())); + assertNotNull(desintination); + List favoredNodes = fnm.getFavoredNodes(hri); + assertNotNull(favoredNodes); + boolean containsFN = false; + for (ServerName sn : favoredNodes) { + if (ServerName.isSameHostnameAndPort(desintination, sn)) { + containsFN = true; + } + } + assertTrue("Destination server does not belong to favored nodes.", containsFN); + } + + HRegionServer getRegionServer(ServerName sn) { + for ( int i= 0; i < SLAVES; i++) { + HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(i); + if ( ServerName.isSameHostnameAndPort(server.getServerName(),sn)) { + return server; + } + } + return null; + } + + public static class CustomObserver extends BaseRegionObserver{ + @Override + public void start(CoprocessorEnvironment e) throws IOException { + postSplit = false; + } + + @Override + public void postCompleteSplit(ObserverContext ctx) + throws IOException { + postSplit = true; + } + + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java (date 1477605507000) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java (revision ) @@ -1210,7 +1210,7 @@ */ @Override public Map> roundRobinAssignment(List regions, - List servers) { + List servers) throws HBaseIOException { metricsBalancer.incrMiscInvocations(); Map> assignments = assignMasterRegions(regions, servers); if (assignments != null && !assignments.isEmpty()) { @@ -1316,7 +1316,8 @@ * Used to assign a single region to a random server. */ @Override - public ServerName randomAssignment(HRegionInfo regionInfo, List servers) { + public ServerName randomAssignment(HRegionInfo regionInfo, List servers) + throws HBaseIOException { metricsBalancer.incrMiscInvocations(); if (servers != null && servers.contains(masterServerName)) { if (shouldBeOnMaster(regionInfo)) { @@ -1360,7 +1361,7 @@ */ @Override public Map> retainAssignment(Map regions, - List servers) { + List servers) throws HBaseIOException { // Update metrics metricsBalancer.incrMiscInvocations(); Map> assignments Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeAssignmentHelper.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeAssignmentHelper.java (date 1477605507000) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeAssignmentHelper.java (revision ) @@ -225,7 +225,7 @@ // If there were fewer servers in one rack, say r3, which had 3 servers, one possible // placement could be r2:s5, , r4:s5, r1:s5, r2:s6, ... // The regions should be distributed proportionately to the racksizes - void placePrimaryRSAsRoundRobin(Map> assignmentMap, + public void placePrimaryRSAsRoundRobin(Map> assignmentMap, Map primaryRSMap, List regions) { List rackList = new ArrayList(rackToRegionServerMap.size()); rackList.addAll(rackToRegionServerMap.keySet()); @@ -540,7 +540,7 @@ return new ServerName[]{ secondaryRS, tertiaryRS }; } - boolean canPlaceFavoredNodes() { + public boolean canPlaceFavoredNodes() { return (this.servers.size() >= FAVORED_NODES_NUM); } @@ -783,6 +783,29 @@ } else { throw new HBaseIOException("Unable to generate secondary and tertiary favored nodes."); } + } + + public Map> generateFavoredNodes( + Map primaryRSMap) { + Map> generatedFavNodes = new HashMap>(); + Map secondaryAndTertiaryRSMap = placeSecondaryAndTertiaryRS(primaryRSMap); + for (HRegionInfo region : primaryRSMap.keySet()) { + List favoredNodesForRegion = new ArrayList(FAVORED_NODES_NUM); + ServerName sn = primaryRSMap.get(region); + favoredNodesForRegion.add(ServerName.valueOf(sn.getHostname(), sn.getPort(), + ServerName.NON_STARTCODE)); + ServerName[] secondaryAndTertiaryNodes = secondaryAndTertiaryRSMap.get(region); + if (secondaryAndTertiaryNodes != null) { + favoredNodesForRegion.add(ServerName.valueOf( + secondaryAndTertiaryNodes[0].getHostname(), secondaryAndTertiaryNodes[0].getPort(), + ServerName.NON_STARTCODE)); + favoredNodesForRegion.add(ServerName.valueOf( + secondaryAndTertiaryNodes[1].getHostname(), secondaryAndTertiaryNodes[1].getPort(), + ServerName.NON_STARTCODE)); + } + generatedFavNodes.put(region, favoredNodesForRegion); + } + return generatedFavNodes; } /* \ No newline at end of file Index: hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java (date 1477605507000) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java (revision ) @@ -576,7 +576,7 @@ @Override public Map> retainAssignment( - Map regions, List servers) { + Map regions, List servers) throws HBaseIOException { retainAssignCalled = true; return super.retainAssignment(regions, servers); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java (date 1477605507000) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java (revision ) @@ -28,8 +28,10 @@ import java.util.Map.Entry; import java.util.Random; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterStatus; @@ -107,11 +109,13 @@ "hbase.master.balancer.stochastic.maxRunningTime"; protected static final String KEEP_REGION_LOADS = "hbase.master.balancer.stochastic.numRegionLoadsToRemember"; + private static final String RUN_MAXIMUM_STEPS = + "hbase.master.balancer.stochastic.execute.maxSteps"; private static final String TABLE_FUNCTION_SEP = "_"; protected static final String MIN_COST_NEED_BALANCE_KEY = "hbase.master.balancer.stochastic.minCostNeedBalance"; - private static final Random RANDOM = new Random(System.currentTimeMillis()); + protected static final Random RANDOM = new Random(System.currentTimeMillis()); private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class); Map> loads = new HashMap>(); @@ -123,7 +127,7 @@ private int numRegionLoadsToRemember = 15; private float minCostNeedBalance = 0.05f; - private CandidateGenerator[] candidateGenerators; + private List candidateGenerators; private CostFromRegionLoadFunction[] regionLoadFunctions; private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC @@ -136,6 +140,7 @@ // when new services are offered private LocalityBasedCandidateGenerator localityCandidateGenerator; private LocalityCostFunction localityCost; + private boolean executeMaximumSteps; private RegionReplicaHostCostFunction regionReplicaHostCostFunction; private RegionReplicaRackCostFunction regionReplicaRackCostFunction; private boolean isByTable = false; @@ -163,6 +168,7 @@ stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion); maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime); + executeMaximumSteps = conf.getBoolean(RUN_MAXIMUM_STEPS, false); numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember); isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable); @@ -174,13 +180,12 @@ } localityCost = new LocalityCostFunction(conf, services); - if (candidateGenerators == null) { - candidateGenerators = new CandidateGenerator[] { - new RandomCandidateGenerator(), - new LoadCandidateGenerator(), - localityCandidateGenerator, - new RegionReplicaRackCandidateGenerator(), - }; + if (this.candidateGenerators == null) { + candidateGenerators = Lists.newArrayList(); + candidateGenerators.add(new RandomCandidateGenerator()); + candidateGenerators.add(new LoadCandidateGenerator()); + candidateGenerators.add(localityCandidateGenerator); + candidateGenerators.add(new RegionReplicaRackCandidateGenerator()); } regionLoadFunctions = new CostFromRegionLoadFunction[] { @@ -213,10 +218,19 @@ } @Override + public void initialize() throws HBaseIOException { + super.initialize(); + } + + @Override protected void setSlop(Configuration conf) { this.slop = conf.getFloat("hbase.regions.slop", 0.001F); } + protected void setCandidateGenerators(List customCandidateGenerators) { + this.candidateGenerators = customCandidateGenerators; + } + @Override public synchronized void setClusterStatus(ClusterStatus st) { super.setClusterStatus(st); @@ -364,8 +378,12 @@ long step; for (step = 0; step < computedMaxSteps; step++) { - int generatorIdx = RANDOM.nextInt(candidateGenerators.length); - CandidateGenerator p = candidateGenerators[generatorIdx]; + if (EnvironmentEdgeManager.currentTime() - startTime > maxRunningTime) { + break; + } + + int generatorIdx = RANDOM.nextInt(candidateGenerators.size()); + CandidateGenerator p = candidateGenerators.get(generatorIdx); Cluster.Action action = p.generate(cluster); if (action.type == Type.NULL) { @@ -392,11 +410,6 @@ Action undoAction = action.undoAction(); cluster.doAction(undoAction); updateCostsWithAction(cluster, undoAction); - } - - if (EnvironmentEdgeManager.currentTime() - startTime > - maxRunningTime) { - break; } } long endTime = EnvironmentEdgeManager.currentTime(); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java (date 1477605507000) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java (revision ) @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -1268,7 +1269,7 @@ @Override public ServerName randomAssignment(HRegionInfo regionInfo, - List servers) { + List servers) throws HBaseIOException { if (regionInfo.equals(controledRegion)) { return null; } @@ -1277,7 +1278,7 @@ @Override public Map> roundRobinAssignment( - List regions, List servers) { + List regions, List servers) throws HBaseIOException { if (countRegionServers != null && services != null) { int regionServers = services.getServerManager().countOfRegionServers(); if (regionServers < countRegionServers.intValue()) { @@ -1297,7 +1298,7 @@ @Override public Map> retainAssignment( - Map regions, List servers) { + Map regions, List servers) throws HBaseIOException { for (HRegionInfo hri : regions.keySet()) { if (hri.equals(controledRegion)) { Map> m = Maps.newHashMap(); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesManager.java (date 1477605507000) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/favored/FavoredNodesManager.java (revision ) @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta; import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper; import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan; @@ -50,10 +51,12 @@ private Map> teritiaryRSToRegionMap; private MasterServices masterServices; + private RackManager rackManager; public FavoredNodesManager(MasterServices masterServices) { this.masterServices = masterServices; this.globalFavoredNodesAssignmentPlan = new FavoredNodesPlan(); + rackManager = new RackManager(masterServices.getConfiguration()); this.primaryRSToRegionMap = new HashMap<>(); this.secondaryRSToRegionMap = new HashMap<>(); this.teritiaryRSToRegionMap = new HashMap<>(); @@ -158,7 +161,33 @@ teritiaryRSToRegionMap.put(serverToUse, regionList); } - private synchronized void deleteFavoredNodesForRegion(Collection regionInfoList) { + public synchronized Map> getReplicaLoad(List servers) { + Map> result = new HashMap>(); + for (ServerName sn : servers) { + ServerName serverWithNoStartCode = ServerName.valueOf(sn.getHostAndPort(), + ServerName.NON_STARTCODE); + List countList = Lists.newArrayList(); + if (primaryRSToRegionMap.containsKey(serverWithNoStartCode)) { + countList.add(primaryRSToRegionMap.get(serverWithNoStartCode).size()); + } else { + countList.add(0); + } + if (secondaryRSToRegionMap.containsKey(serverWithNoStartCode)) { + countList.add(secondaryRSToRegionMap.get(serverWithNoStartCode).size()); + } else { + countList.add(0); + } + if (teritiaryRSToRegionMap.containsKey(serverWithNoStartCode)) { + countList.add(teritiaryRSToRegionMap.get(serverWithNoStartCode).size()); + } else { + countList.add(0); + } + result.put(sn, countList); + } + return result; + } + + public synchronized void deleteFavoredNodesForRegion(Collection regionInfoList) { for (HRegionInfo hri : regionInfoList) { List favNodes = getFavoredNodes(hri); if (favNodes != null) { @@ -172,6 +201,26 @@ teritiaryRSToRegionMap.get(favNodes.get(2)).remove(hri); } globalFavoredNodesAssignmentPlan.removeFavoredNodes(hri); + } + } + } + + public RackManager getRackManager() { + return rackManager; + } + + public void generateFavoredNodes(FavoredNodeAssignmentHelper assignmentHelper, + Map> assignmentMap, List regions, + List servers) throws IOException { + if (regions.size() > 0) { + if (assignmentHelper.canPlaceFavoredNodes()) { + Map primaryRSMap = new HashMap(); + assignmentHelper.placePrimaryRSAsRoundRobin(assignmentMap, primaryRSMap, regions); + Map> generatedFavNodes = + assignmentHelper.generateFavoredNodes(primaryRSMap); + updateFavoredNodes(generatedFavNodes); + } else { + throw new HBaseIOException(" Not enough nodes to do RR assignment"); } } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java (date 1477605507000) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java (revision ) @@ -160,7 +160,7 @@ @Override public Map> roundRobinAssignment(List regions, - List servers) { + List servers) throws HBaseIOException { Map> assignmentMap; try { FavoredNodeAssignmentHelper assignmentHelper = @@ -203,7 +203,8 @@ } @Override - public ServerName randomAssignment(HRegionInfo regionInfo, List servers) { + public ServerName randomAssignment(HRegionInfo regionInfo, List servers) + throws HBaseIOException { try { FavoredNodeAssignmentHelper assignmentHelper = new FavoredNodeAssignmentHelper(servers, rackManager); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodesPromoter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodesPromoter.java (date 1477605507000) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodesPromoter.java (revision ) @@ -28,6 +28,8 @@ @InterfaceAudience.Private public interface FavoredNodesPromoter { + String ALWAYS_ASSIGN_REGIONS = "hbase.assignment.always.assign"; + Map> generateFavoredNodesForDaughter(List servers, HRegionInfo parent, HRegionInfo hriA, HRegionInfo hriB) throws IOException;