diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java index 3b32645..6720ba8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java @@ -133,7 +133,7 @@ public class MetaEditor { * @param ps Put to add to .META. * @throws IOException */ - static void putsToMetaTable(final CatalogTracker ct, final List ps) + public static void putsToMetaTable(final CatalogTracker ct, final List ps) throws IOException { HTable t = MetaReader.getMetaHTable(ct); try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java new file mode 100644 index 0000000..083a3ac --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.net.ScriptBasedMapping; +/** + * Wrapper over the rack resolution utility in Hadoop. The rack resolution + * utility in Hadoop does resolution from hosts to the racks they belong to. + * + */ +@InterfaceAudience.Private +public class RackManager { + static final Log LOG = LogFactory.getLog(RackManager.class); + public static final String UNKNOWN_RACK = "Unknown Rack"; + + private DNSToSwitchMapping switchMapping; + + public RackManager(Configuration conf) { + switchMapping = ReflectionUtils.instantiateWithCustomCtor( + conf.getClass("hbase.util.ip.to.rack.determiner", ScriptBasedMapping.class, + DNSToSwitchMapping.class).getName(), new Class[]{Configuration.class}, + new Object[]{conf}); + } + + /** + * Get the name of the rack containing a server, according to the DNS to + * switch mapping. + * @param server the server for which to get the rack name + * @return the rack name of the server + */ + public String getRack(ServerName server) { + if (server == null) { + return UNKNOWN_RACK; + } + // just a note - switchMapping caches results (at least the implementation should unless the + // resolution is really a lightweight process) + List racks = switchMapping.resolve(Arrays.asList(server.getHostname())); + if (racks != null && !racks.isEmpty()) { + return racks.get(0); + } + + return UNKNOWN_RACK; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java new file mode 100644 index 0000000..5117167 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredNodeLoadBalancer.java @@ -0,0 +1,373 @@ +package org.apache.hadoop.hbase.master.balancer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.catalog.MetaEditor; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.master.RackManager; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +public class FavoredNodeLoadBalancer extends BaseLoadBalancer { + private static final Log LOG = LogFactory.getLog(FavoredNodeLoadBalancer.class); + + private RackManager rackManager; + private Map> rackToRegionServerMap; + private List uniqueRackList; + private Map regionServerToRackMap; + private Random random = new Random(); + public final static short FAVORED_NODES_NUM = 3; + public static final byte [] FAVOREDNODES_QUALIFIER = Bytes.toBytes("fn"); + public final static char SERVER_NAME_SEPARATOR = ';'; + public static enum Position { + PRIMARY, + SECONDARY, + TERTIARY; + }; + //public FavoredNodeLoadBalancer(){} + + @Override + public void setConf(Configuration conf) { + this.rackManager = new RackManager(conf); + this.rackToRegionServerMap = new ConcurrentHashMap>(); + this.regionServerToRackMap = new ConcurrentHashMap(); + this.uniqueRackList = new ArrayList(); + } + + @Override + public List balanceCluster(Map> clusterState) { + return null; //TODO + } + + @Override + public Map> roundRobinAssignment(List regions, + List servers) { + Map> assignmentMap; + try { + for (ServerName s : servers) { + doRackResolution(s); + } + if (!canPlaceFavoredNodes()) { + return super.roundRobinAssignment(regions, servers); + } + assignmentMap = new HashMap>(); + Map primaryRSMap = new HashMap(); + // figure the primary RSs + placePrimaryRSAsRoundRobin(assignmentMap, primaryRSMap, regions); + // figure the secondary and tertiary RSs + Map secondaryAndTertiaryRSMap = + placeSecondaryAndTertiaryRS(primaryRSMap); + // now update meta with the favored nodes info + List puts = new ArrayList(); + for (HRegionInfo regionInfo : regions) { + ArrayList favoredNodes = new ArrayList(); + favoredNodes.add(primaryRSMap.get(regionInfo)); + ServerName[] secondaryAndTertiaryNodes = secondaryAndTertiaryRSMap.get(regionInfo); + if (secondaryAndTertiaryNodes != null) { + favoredNodes.add(secondaryAndTertiaryNodes[0]); + favoredNodes.add(secondaryAndTertiaryNodes[1]); + } + puts.add(makePutFromRegionInfo(regionInfo, favoredNodes)); + } + MetaEditor.putsToMetaTable(super.services.getCatalogTracker(), puts); + LOG.info("Added " + puts.size() + " regions in META"); + } catch (Exception ex) { + LOG.warn("Encountered exception while doing favored-nodes assignment " + ex + + " Falling back to regular assignment"); + assignmentMap = super.roundRobinAssignment(regions, servers); + } + return assignmentMap; + } + + /** + * Generates and returns a Put containing the region info for the catalog table + * and the servers + * @param regionInfo + * @param favoredNodeList + * @return Put object + */ + static Put makePutFromRegionInfo(HRegionInfo regionInfo, ListfavoredNodeList) + throws IOException { + Put put = MetaEditor.makePutFromRegionInfo(regionInfo); + if (favoredNodeList != null) { + String favoredNodes = getFavoredNodes(favoredNodeList); + put.add(HConstants.CATALOG_FAMILY, FAVOREDNODES_QUALIFIER, + EnvironmentEdgeManager.currentTimeMillis(), favoredNodes.getBytes()); + LOG.info("Create the region " + regionInfo.getRegionNameAsString() + + " with favored nodes " + favoredNodes); + } + return put; + } + + /** + * @param serverList + * @return string the favoredNodes generated by the server list. + */ + static String getFavoredNodes(List serverAddrList) { + StringBuffer favoredNodes = new StringBuffer(""); + if (serverAddrList != null) { + for (int i = 0 ; i < serverAddrList.size(); i++) { + favoredNodes.append(serverAddrList.get(i).getServerName()); + if (i != serverAddrList.size() - 1 ) { + favoredNodes.append(SERVER_NAME_SEPARATOR); + } + } + } + return favoredNodes.toString(); + } + + + // Place the regions round-robin across the racks picking one server from each + // rack at a time. For example, if 2 racks (r1 and r2) with 8 servers (s1..s8) each, it will + // choose s1 from r1, s1 from r2, s2 from r1, s2 from r2, ... + private void placePrimaryRSAsRoundRobin(Map> assignmentMap, + Map primaryRSMap, List regions) { + List rackList = new ArrayList(); + rackList.addAll(rackToRegionServerMap.keySet()); + Map currentProcessIndexMap = new HashMap(); + int rackIndex = 0; + for (HRegionInfo regionInfo : regions) { + String rackName = rackList.get(rackIndex); + // Initialize the current processing host index. + int serverIndex = 0; + // Restore the current process index from the currentProcessIndexMap + Integer currentProcessIndex = currentProcessIndexMap.get(rackName); + if (currentProcessIndex != null) { + serverIndex = currentProcessIndex.intValue(); + } + // Get the server list for the current rack + List currentServerList = rackToRegionServerMap.get(rackName); + + // Get the current process region server + ServerName currentServer = currentServerList.get(serverIndex); + + // Place the current region with the current primary region server + primaryRSMap.put(regionInfo, currentServer); + List regionsForServer = assignmentMap.get(currentServer); + if (regionsForServer == null) { + regionsForServer = new ArrayList(); + assignmentMap.put(currentServer, regionsForServer); + } + regionsForServer.add(regionInfo); + + // Set the next processing index + if ((++serverIndex) >= currentServerList.size()) { + // Reset the server index for the current rack + serverIndex = 0; + } + // Keep track of the next processing index + currentProcessIndexMap.put(rackName, serverIndex); + if ((++rackIndex) >= rackList.size()) { + rackIndex = 0; // reset the rack index to 0 + } + } + } + + private Map placeSecondaryAndTertiaryRS( + Map primaryRSMap) { + Map secondaryAndTertiaryMap = + new HashMap(); + for (Map.Entry entry : primaryRSMap.entrySet()) { + // Get the target region and its primary region server rack + HRegionInfo regionInfo = entry.getKey(); + ServerName primaryRS = entry.getValue(); + try { + // Create the secondary and tertiary region server pair object. + ServerName[] favoredNodes; + // Get the rack for the primary region server + String primaryRack = rackManager.getRack(primaryRS); + + if (getTotalNumberOfRacks() == 1) { + favoredNodes = singleRackCase(regionInfo, primaryRS, primaryRack); + } else { + favoredNodes = multiRackCase(regionInfo, primaryRS, primaryRack); + } + if (favoredNodes != null) { + secondaryAndTertiaryMap.put(regionInfo, favoredNodes); + LOG.debug("Place the secondary and tertiary region server for region " + + regionInfo.getRegionNameAsString()); + } + } catch (Exception e) { + LOG.warn("Cannot place the favored nodes for region " + + regionInfo.getRegionNameAsString() + " because " + e); + continue; + } + } + return secondaryAndTertiaryMap; + } + + private ServerName[] singleRackCase(HRegionInfo regionInfo, + ServerName primaryRS, + String primaryRack) throws IOException { + // Single rack case: have to pick the secondary and tertiary + // from the same rack + List serverList = getServersFromRack(primaryRack); + if (serverList.size() <= 2) { + // Single region server case: cannot not place the favored nodes + // on any server; !domain.canPlaceFavoredNodes() + return null; + } else { + // Randomly select two region servers from the server list and make sure + // they are not overlap with the primary region server; + Set serverSkipSet = new HashSet(); + serverSkipSet.add(primaryRS); + + // Place the secondary RS + ServerName secondaryRS = getOneRandomServer(primaryRack, serverSkipSet); + // Skip the secondary for the tertiary placement + serverSkipSet.add(secondaryRS); + + // Place the tertiary RS + ServerName tertiaryRS = + getOneRandomServer(primaryRack, serverSkipSet); + + if (secondaryRS == null || tertiaryRS == null) { + LOG.error("Cannot place the secondary and terinary" + + "region server for region " + + regionInfo.getRegionNameAsString()); + } + // Create the secondary and tertiary pair + ServerName[] favoredNodes = new ServerName[2]; + favoredNodes[0] = secondaryRS; + favoredNodes[1] = tertiaryRS; + return favoredNodes; + } + } + + private ServerName[] multiRackCase(HRegionInfo regionInfo, + ServerName primaryRS, + String primaryRack) throws IOException { + + // Random to choose the secondary and tertiary region server + // from another rack to place the secondary and tertiary + + // Random to choose one rack except for the current rack + Set rackSkipSet = new HashSet(); + rackSkipSet.add(primaryRack); + ServerName[] favoredNodes = new ServerName[2]; + String secondaryRack = getOneRandomRack(rackSkipSet); + List serverList = getServersFromRack(secondaryRack); + if (serverList.size() >= 2) { + // Randomly pick up two servers from this secondary rack + + // Place the secondary RS + ServerName secondaryRS = getOneRandomServer(secondaryRack); + + // Skip the secondary for the tertiary placement + Set skipServerSet = new HashSet(); + skipServerSet.add(secondaryRS); + // Place the tertiary RS + ServerName tertiaryRS = getOneRandomServer(secondaryRack, skipServerSet); + + if (secondaryRS == null || tertiaryRS == null) { + LOG.error("Cannot place the secondary and terinary" + + "region server for region " + + regionInfo.getRegionNameAsString()); + } + // Create the secondary and tertiary pair + favoredNodes[0] = secondaryRS; + favoredNodes[1] = tertiaryRS; + } else { + // Pick the secondary rs from this secondary rack + // and pick the tertiary from another random rack + favoredNodes[0] = getOneRandomServer(secondaryRack); + + // Pick the tertiary + if (getTotalNumberOfRacks() == 2) { + // Pick the tertiary from the same rack of the primary RS + Set serverSkipSet = new HashSet(); + serverSkipSet.add(primaryRS); + favoredNodes[1] = getOneRandomServer(primaryRack, serverSkipSet); + } else { + // Pick the tertiary from another rack + rackSkipSet.add(secondaryRack); + String tertiaryRandomRack = getOneRandomRack(rackSkipSet); + favoredNodes[1] = getOneRandomServer(tertiaryRandomRack); + } + } + return favoredNodes; + } + + private boolean canPlaceFavoredNodes() { + int serverSize = this.regionServerToRackMap.keySet().size(); + if (serverSize < FAVORED_NODES_NUM) + return false; + return true; + } + + private void doRackResolution(ServerName sn) { + String rackName = this.rackManager.getRack(sn); + List serverList = this.rackToRegionServerMap.get(rackName); + if (serverList == null) { + serverList = new ArrayList(); + // Add the current rack to the unique rack list + this.uniqueRackList.add(rackName); + } + if (!serverList.contains(sn)) { + serverList.add(sn); + this.rackToRegionServerMap.put(rackName, serverList); + this.regionServerToRackMap.put(sn, rackName); + } + } + + private int getTotalNumberOfRacks() { + return this.uniqueRackList.size(); + } + + List getServersFromRack(String rack) { + return this.rackToRegionServerMap.get(rack); + } + + ServerName getOneRandomServer(String rack, + Set skipServerSet) throws IOException { + if(rack == null) return null; + List serverList = this.rackToRegionServerMap.get(rack); + if (serverList == null) return null; + + // Get a random server except for any servers from the skip set + if (skipServerSet != null && serverList.size() <= skipServerSet.size()) { + throw new IOException("Cannot randomly pick another random server"); + } + + ServerName randomServer; + do { + int randomIndex = random.nextInt(serverList.size()); + randomServer = serverList.get(randomIndex); + } while (skipServerSet != null && skipServerSet.contains(randomServer)); + + return randomServer; + } + + ServerName getOneRandomServer(String rack) throws IOException { + return this.getOneRandomServer(rack, null); + } + + String getOneRandomRack(Set skipRackSet) throws IOException { + if (skipRackSet == null || uniqueRackList.size() <= skipRackSet.size()) { + throw new IOException("Cannot randomly pick another random server"); + } + + String randomRack; + do { + int randomIndex = random.nextInt(this.uniqueRackList.size()); + randomRack = this.uniqueRackList.get(randomIndex); + } while (skipRackSet.contains(randomRack)); + + return randomRack; + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadBalancerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadBalancerFactory.java index 4f0b77e..19bc67d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadBalancerFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadBalancerFactory.java @@ -30,7 +30,7 @@ import org.apache.hadoop.util.ReflectionUtils; public class LoadBalancerFactory { /** - * Create a loadblanacer from the given conf. + * Create a loadbalancer from the given conf. * @param conf * @return A {@link LoadBalancer} */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 8f3a16a..3be10fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -117,6 +117,7 @@ import org.apache.hadoop.hbase.ipc.RpcClientEngine; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -280,6 +281,12 @@ public class HRegionServer implements ClientProtocol, protected final Map onlineRegions = new ConcurrentHashMap(); + /** + * Map of encoded region names to the locations they should be hosted on + */ + protected final Map regionFavoredNodesMap = + new ConcurrentHashMap(); + // Leases protected Leases leases; @@ -2410,6 +2417,10 @@ public class HRegionServer implements ClientProtocol, return this.onlineRegions.get(encodedRegionName); } + public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) { + return this.regionFavoredNodesMap.get(encodedRegionName); + } + @Override public HRegion getFromOnlineRegions(final String encodedRegionName) { return this.onlineRegions.get(encodedRegionName); @@ -2432,6 +2443,7 @@ public class HRegionServer implements ClientProtocol, } addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum); } + this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName()); return toReturn != null; } @@ -3366,24 +3378,33 @@ public class HRegionServer implements ClientProtocol, HTableDescriptor htd; try { final HRegion onlineRegion = getFromOnlineRegions(region.getEncodedName()); + ServerName[] favoredServerList = null; + Result result = null; + if (!region.isMetaRegion()) { + result = MetaReader.getRegionResult(getCatalogTracker(), region.getRegionName()); + } if (onlineRegion != null) { //Check if the region can actually be opened. if (onlineRegion.getCoprocessorHost() != null) { onlineRegion.getCoprocessorHost().preOpen(); } - // See HBASE-5094. Cross check with META if still this RS is owning - // the region. - Pair p = MetaReader.getRegion( - this.catalogTracker, region.getRegionName()); - if (this.getServerName().equals(p.getSecond())) { - LOG.warn("Attempted open of " + region.getEncodedName() - + " but already online on this server"); - builder.addOpeningState(RegionOpeningState.ALREADY_OPENED); - continue; - } else { - LOG.warn("The region " + region.getEncodedName() + " is online on this server" + - " but META does not have this server - continue opening."); - removeFromOnlineRegions(onlineRegion, null); + if (result != null) { + // See HBASE-5094. Cross check with META if still this RS is owning + // the region. + Pair p = HRegionInfo.getHRegionInfoAndServerName(result); + if (this.getServerName().equals(p.getSecond())) { + LOG.warn("Attempted open of " + region.getEncodedName() + + " but already online on this server"); + builder.addOpeningState(RegionOpeningState.ALREADY_OPENED); + continue; + } else { + LOG.warn("The region " + region.getEncodedName() + " is online on this server" + + " but META does not have this server - continue opening."); + removeFromOnlineRegions(onlineRegion, null); + } + byte[] favoredNodes = result.getValue(HConstants.CATALOG_FAMILY, + FavoredNodeLoadBalancer.FAVOREDNODES_QUALIFIER); + favoredServerList = getFavoredNodesList(favoredNodes); } } LOG.info("Received request to open region: " + region.getRegionNameAsString() + " on " @@ -3424,6 +3445,12 @@ public class HRegionServer implements ClientProtocol, this.service.submit(new OpenMetaHandler(this, this, region, htd, versionOfOfflineNode)); } else { + if (result != null) { + byte[] favoredNodes = result.getValue(HConstants.CATALOG_FAMILY, + FavoredNodeLoadBalancer.FAVOREDNODES_QUALIFIER); + favoredServerList = getFavoredNodesList(favoredNodes); + updateRegionFavoredNodesMapping(region.getEncodedName(), favoredServerList); + } this.service.submit(new OpenRegionHandler(this, this, region, htd, versionOfOfflineNode)); } @@ -3443,6 +3470,50 @@ public class HRegionServer implements ClientProtocol, return builder.build(); } + /** + * @param favoredNodes The bytes of favored nodes + * @return the list of HServerAddress for the byte array of favored nodes. + */ + public static ServerName[] getFavoredNodesList(byte[] favoredNodes) { + String favoredNodesStr = Bytes.toString(favoredNodes); + return getFavoredNodeList(favoredNodesStr); + } + + /** + * @param favoredNodesStr favored nodes + * @return the list of HServerAddress for the byte array of favored nodes. + */ + public static ServerName[] getFavoredNodeList(String favoredNodesStr) { + String[] favoredNodesArray = org.apache.commons.lang.StringUtils.split(favoredNodesStr, + FavoredNodeLoadBalancer.SERVER_NAME_SEPARATOR); + if (favoredNodesArray == null) return null; + + ServerName[] serverList = new ServerName[favoredNodesArray.length]; + int i = 0; + for (String hostNameAndPort : favoredNodesArray) { + serverList[i++] = new ServerName(hostNameAndPort); + } + return serverList; + } + + private void updateRegionFavoredNodesMapping(String encodedRegionName, + ServerName[] favoredNodes) { + InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.length]; + for (int i = 0; i < favoredNodes.length; i++) { + addr[i] = InetSocketAddress.createUnresolved(favoredNodes[i].getHostname(), + favoredNodes[i].getPort()); + } + regionFavoredNodesMap.put(encodedRegionName, addr); + } + + /** + * Return the favored nodes for a region given its encoded name + * @param encodedRegionName + * @return array of favored locations + */ + public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { + return regionFavoredNodesMap.get(encodedRegionName); + } /** * Close a region on the region server. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement.java new file mode 100644 index 0000000..478b4fdf --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.MetaScanner; +import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer; +import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer.Position; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestRegionPlacement { + final static Log LOG = LogFactory.getLog(TestRegionPlacement.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static int SLAVES = 4; + private static HBaseAdmin admin; + private static Position[] positions = FavoredNodeLoadBalancer.Position.values(); + private int lastRegionOnPrimaryRSCount = 0; + private int REGION_NUM = 10; + private Map favoredNodesAssignmentPlan = + new HashMap(); + + @BeforeClass + public static void setupBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + // Enable the favored nodes based load balancer + conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, + FavoredNodeLoadBalancer.class, LoadBalancer.class); + + conf.setInt("hbase.master.meta.thread.rescanfrequency", 5000); + conf.setInt("hbase.regionserver.msginterval", 1000); + conf.setLong("hbase.regionserver.transientAssignment.regionHoldPeriod", 2000); + TEST_UTIL.startMiniCluster(SLAVES); + admin = new HBaseAdmin(conf); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test(timeout = 180000) + public void testRegionPlacement() throws Exception { + // Create a table with REGION_NUM regions. + createTable("testRegionAssignment", REGION_NUM); + + TEST_UTIL.waitTableAvailable(Bytes.toBytes("testRegionAssignment")); + + // Verify all the user regions are assigned to the primary region server + // based on the plan + verifyRegionOnPrimaryRS(REGION_NUM); + + // Verify all the region server are update with the latest favored nodes + verifyRegionServerUpdated(); + } + + /** + * Verify the number of user regions is assigned to the primary + * region server based on the plan is expected + * @param expectedNum. + * @throws IOException + */ + private void verifyRegionOnPrimaryRS(int expectedNum) + throws IOException { + this.lastRegionOnPrimaryRSCount = getNumRegionisOnPrimaryRS(); + assertEquals("Only " + expectedNum + " of user regions running " + + "on the primary region server", expectedNum , + lastRegionOnPrimaryRSCount); + } + + /** + * Verify all the online region servers has been updated to the + * latest assignment plan + * @param plan + * @throws IOException + */ + private void verifyRegionServerUpdated() throws IOException { + // Verify all region servers contain the correct favored nodes information + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + for (int i = 0; i < SLAVES; i++) { + HRegionServer rs = cluster.getRegionServer(i); + for (HRegion region: rs.getOnlineRegions(Bytes.toBytes("testRegionAssignment"))) { + InetSocketAddress[] favoredSocketAddress = rs.getFavoredNodesForRegion( + region.getRegionInfo().getEncodedName()); + ServerName[] favoredServerList = favoredNodesAssignmentPlan.get(region.getRegionInfo()); + + // All regions are supposed to have favored nodes, + // except for META and ROOT + if (favoredServerList == null) { + HTableDescriptor desc = region.getTableDesc(); + // Verify they are ROOT and META regions since no favored nodes + assertNull(favoredSocketAddress); + assertTrue("User region " + + region.getTableDesc().getNameAsString() + + " should have favored nodes", + (desc.isRootRegion() || desc.isMetaRegion())); + } else { + // For user region, the favored nodes in the region server should be + // identical to favored nodes in the assignmentPlan + assertTrue(favoredSocketAddress.length == favoredServerList.length); + assertTrue(favoredServerList.length > 0); + for (int j = 0; j < favoredServerList.length; j++) { + InetSocketAddress addrFromRS = favoredSocketAddress[j]; + InetSocketAddress addrFromPlan = InetSocketAddress.createUnresolved( + favoredServerList[j].getHostname(), favoredServerList[j].getPort()); + + assertNotNull(addrFromRS); + assertNotNull(addrFromPlan); + assertTrue("Region server " + rs.getServerName().getHostAndPort() + + " has the " + positions[j] + + " for region " + region.getRegionNameAsString() + " is " + + addrFromRS + " which is inconsistent with the plan " + + addrFromPlan, addrFromRS.equals(addrFromPlan)); + } + } + } + } + } + + /** + * Check whether regions are assigned to servers consistent with the explicit + * hints that are persisted in the META table. + * Also keep track of the number of the regions are assigned to the + * primary region server. + * @return the number of regions are assigned to the primary region server + * @throws IOException + */ + private int getNumRegionisOnPrimaryRS() throws IOException { + final AtomicInteger regionOnPrimaryNum = new AtomicInteger(0); + final AtomicInteger totalRegionNum = new AtomicInteger(0); + LOG.info("The start of region placement verification"); + MetaScannerVisitor visitor = new MetaScannerVisitor() { + public boolean processRow(Result result) throws IOException { + try { + HRegionInfo info = MetaScanner.getHRegionInfo(result); + byte[] server = result.getValue(HConstants.CATALOG_FAMILY, + HConstants.SERVER_QUALIFIER); + byte[] startCode = result.getValue(HConstants.CATALOG_FAMILY, + HConstants.STARTCODE_QUALIFIER); + byte[] favoredNodes = result.getValue(HConstants.CATALOG_FAMILY, + FavoredNodeLoadBalancer.FAVOREDNODES_QUALIFIER); + // Add the favored nodes into assignment plan + ServerName[] favoredServerList = HRegionServer.getFavoredNodesList(favoredNodes); + favoredNodesAssignmentPlan.put(info, favoredServerList); + + Position[] positions = FavoredNodeLoadBalancer.Position.values(); + if (info != null) { + totalRegionNum.incrementAndGet(); + if (server != null) { + String serverString = + new ServerName(Bytes.toString(server),Bytes.toLong(startCode)).toString(); + if (favoredNodes != null) { + String[] splits = + new String(favoredNodes).split(Character.toString( + FavoredNodeLoadBalancer.SERVER_NAME_SEPARATOR)); + String placement = "[NOT FAVORED NODE]"; + for (int i = 0; i < splits.length; i++) { + if (splits[i].equals(serverString)) { + placement = positions[i].toString(); + if (i == FavoredNodeLoadBalancer.Position.PRIMARY.ordinal()) { + regionOnPrimaryNum.incrementAndGet(); + } + break; + } + } + LOG.info(info.getRegionNameAsString() + " on " + + serverString + " " + placement); + } else { + LOG.info(info.getRegionNameAsString() + " running on " + + serverString + " but there is no favored region server"); + } + } else { + LOG.info(info.getRegionNameAsString() + + " not assigned to any server"); + } + } + return true; + } catch (RuntimeException e) { + LOG.error("Result=" + result); + throw e; + } + } + + @Override + public void close() throws IOException {} + }; + MetaScanner.metaScan(TEST_UTIL.getConfiguration(), visitor); + LOG.info("There are " + regionOnPrimaryNum.intValue() + " out of " + + totalRegionNum.intValue() + " regions running on the primary" + + " region servers" ); + return regionOnPrimaryNum.intValue() ; + } + + /** + * Create a table with specified table name and region number. + * @param table + * @param regionNum + * @return + * @throws IOException + */ + private static void createTable(String table, int regionNum) + throws IOException { + byte[] tableName = Bytes.toBytes(table); + int expectedRegions = regionNum; + byte[][] splitKeys = new byte[expectedRegions - 1][]; + for (int i = 1; i < expectedRegions; i++) { + byte splitKey = (byte) i; + splitKeys[i - 1] = new byte[] { splitKey, splitKey, splitKey }; + } + + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc, splitKeys); + + HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName); + Map regions = ht.getRegionLocations(); + assertEquals("Tried to create " + expectedRegions + " regions " + + "but only found " + regions.size(), expectedRegions, regions.size()); + } +}