diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 99fa7ae..1c772ee 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -27,6 +27,7 @@ import java.util.regex.Pattern; import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Bytes; /** * HConstants holds a bunch of HBase-related constants @@ -101,6 +102,8 @@ public final class HConstants { /** default host address */ public static final String DEFAULT_HOST = "0.0.0.0"; + public static final String UNKNOWN_RACK = "Unknown Rack"; + /** Parameter name for port master listens on. */ public static final String MASTER_PORT = "hbase.master.port"; @@ -121,6 +124,12 @@ public final class HConstants { /** Name of ZooKeeper quorum configuration parameter. */ public static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; + + /** The number of favored nodes for each region */ + public static final int FAVORED_NODES_NUM = 3; + + /** The favored nodes column qualifier*/ + public static final byte [] FAVOREDNODES_QUALIFIER = Bytes.toBytes("favorednodes"); /** Name of ZooKeeper config file in conf/ directory. */ public static final String ZOOKEEPER_CONFIG_NAME = "zoo.cfg"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ServerName.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ServerName.java index f3ae6b5..a54c529 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ServerName.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ServerName.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.Collection; import java.util.regex.Pattern; @@ -83,6 +85,7 @@ public class ServerName implements Comparable { private final String servername; private final String hostname; + private final String hostAddress; private final int port; private final long startcode; @@ -97,6 +100,7 @@ public class ServerName implements Comparable { this.port = port; this.startcode = startcode; this.servername = getServerName(hostname, port, startcode); + this.hostAddress = checkBindAddressCanBeResolved(); } public ServerName(final String serverName) { @@ -151,6 +155,28 @@ public class ServerName implements Comparable { return hostname; } + /** @return Bind address */ + public String getBindAddress() { + if (this.hostAddress != null) + return hostAddress; + + final InetAddress addr = new InetSocketAddress(hostname, port).getAddress(); + if (addr != null) { + return addr.getHostAddress(); + } else { + return null; + } + } + + private String checkBindAddressCanBeResolved() { + String hostAddress; + if ((hostAddress = getBindAddress()) == null) { + throw new IllegalArgumentException("Could not resolve the" + + " DNS name of " + servername); + } + return hostAddress; + } + public int getPort() { return port; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java index 21d03f5..4fa5f2b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.ConnectException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,7 +33,9 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.master.RegionPlacement; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * Writes region and assignment information to .META.. @@ -55,6 +58,22 @@ public class MetaEditor { addRegionInfo(put, regionInfo); return put; } + /** + * Generates and returns a Put containing the region info for the catalog table + * and the servers + */ + public static Put makePutFromRegionInfo(HRegionInfo regionInfo, ListfavoriteNodeList) + throws IOException { + Put put = makePutFromRegionInfo(regionInfo); + if (favoriteNodeList != null) { + String favoredNodes = RegionPlacement.getFavoredNodes(favoriteNodeList); + put.add(HConstants.CATALOG_FAMILY, HConstants.FAVOREDNODES_QUALIFIER, + EnvironmentEdgeManager.currentTimeMillis(), favoredNodes.getBytes()); + LOG.info("Create the region " + regionInfo.getRegionNameAsString() + + " with favored nodes " + favoredNodes); + } + return put; + } /** * Adds split daughters to the Put @@ -200,11 +219,11 @@ public class MetaEditor { * @throws IOException if problem connecting or updating meta */ public static void addRegionsToMeta(CatalogTracker catalogTracker, - List regionInfos) + List regionInfos, final Map> assignmentMap) throws IOException { List puts = new ArrayList(); for (HRegionInfo regionInfo : regionInfos) { - puts.add(makePutFromRegionInfo(regionInfo)); + puts.add(makePutFromRegionInfo(regionInfo, assignmentMap.get(regionInfo))); } putsToMetaTable(catalogTracker, puts); LOG.info("Added " + puts.size() + " regions in META"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentDomain.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentDomain.java new file mode 100644 index 0000000..4323361 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentDomain.java @@ -0,0 +1,208 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; + +public class AssignmentDomain { + protected static final Log LOG = + LogFactory.getLog(AssignmentDomain.class.getClass()); + private Map> rackToRegionServerMap; + private List uniqueRackList; + private RackManager rackManager; + private Map regionServerToRackMap; + private Random random; + + public AssignmentDomain(Configuration conf) { + rackToRegionServerMap = new HashMap>(); + regionServerToRackMap = new HashMap(); + uniqueRackList = new ArrayList(); + rackManager = new RackManager(conf); + random = new Random(); + } + + /** + * Set the random seed + * @param seed + */ + public void setRandomSeed(long seed) { + random.setSeed(seed); + } + + /** + * Get the rack name in this domain for the server. + * @param server + * @return + */ + public String getRack(ServerName server) { + if (server == null) + return null; + return regionServerToRackMap.get(server); + } + + /** + * Get a random rack except for the current rack + * @param skipRackSet + * @return the random rack except for any Rack from the skipRackSet + * @throws IOException + */ + public String getOneRandomRack(Set skipRackSet) throws IOException { + if (skipRackSet == null || uniqueRackList.size() <= skipRackSet.size()) { + throw new IOException("Cannot randomly pick another random server"); + } + + String randomRack; + do { + int randomIndex = random.nextInt(this.uniqueRackList.size()); + randomRack = this.uniqueRackList.get(randomIndex); + } while (skipRackSet.contains(randomRack)); + + return randomRack; + } + + /** + * Get one random server from the rack + * @param rack + * @return + * @throws IOException + */ + public ServerName getOneRandomServer(String rack) throws IOException { + return this.getOneRandomServer(rack, null); + } + + /** + * Get a random server from the rack except for the servers in the skipServerSet + * @param skipServerSet + * @return the random server except for any servers from the skipServerSet + * @throws IOException + */ + public ServerName getOneRandomServer(String rack, + Set skipServerSet) throws IOException { + if(rack == null) return null; + List serverList = this.rackToRegionServerMap.get(rack); + if (serverList == null) return null; + + // Get a random server except for any servers from the skip set + if (skipServerSet != null && serverList.size() <= skipServerSet.size()) { + throw new IOException("Cannot randomly pick another random server"); + } + + ServerName randomServer; + do { + int randomIndex = random.nextInt(serverList.size()); + randomServer = serverList.get(randomIndex); + } while (skipServerSet != null && skipServerSet.contains(randomServer)); + + return randomServer; + } + + /** + * @return the total number of unique rack in the domain. + */ + public int getTotalRackNum() { + return this.uniqueRackList.size(); + } + + /** + * Get the list of region severs in the rack + * @param rack + * @return the list of region severs in the rack + */ + public List getServersFromRack(String rack) { + return this.rackToRegionServerMap.get(rack); + } + + /** + * Add a server to the assignment domain + * @param server + */ + public void addServer(ServerName server) { + // For a new server + String rackName = this.rackManager.getRack(server); + List serverList = this.rackToRegionServerMap.get(rackName); + if (serverList == null) { + serverList = new ArrayList(); + // Add the current rack to the unique rack list + this.uniqueRackList.add(rackName); + } + if (!serverList.contains(server)) { + serverList.add(server); + this.rackToRegionServerMap.put(rackName, serverList); + this.regionServerToRackMap.put(server, rackName); + } + } + + /** + * Add a list of servers to the assignment domain + * @param servers + */ + public void addServers(List servers) { + for (ServerName server : servers) { + this.addServer(server); + } + } + + public Set getAllServers() { + return regionServerToRackMap.keySet(); + } + + /** + * Get the region server to rack map + */ + public Map getRegionServerToRackMap() { + return this.regionServerToRackMap; + } + + /** + * Get the rack to region server map + */ + public Map> getRackToRegionServerMap() { + return this.rackToRegionServerMap; + } + + /** + * @return true if there is no rack in the assignment domain + */ + public boolean isEmpty() { + return uniqueRackList.isEmpty(); + } + + /** + * @return true if can place the favored nodes + */ + public boolean canPlaceFavoredNodes() { + int serverSize = this.regionServerToRackMap.keySet().size(); + if (serverSize < HConstants.FAVORED_NODES_NUM) + return false; + return true; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index aaec088..5bf32d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -2162,6 +2162,29 @@ public class AssignmentManager extends ZooKeeperListener { assign(regions.size(), servers.size(), "round-robin=true", bulkPlan); } + + public void assign(List regions, + final Map> assignmentMap) + throws InterruptedException, IOException { + if (regions == null || regions.isEmpty()) { + return; + } + List servers = serverManager.createDestinationServersList(); + + Map > plan = new HashMap>(); + for (HRegionInfo region : regions) { + List serverList = assignmentMap.get(region); + for (ServerName server : serverList) { + List serverNames; + if ((serverNames = plan.get(server)) == null) { + plan.put(server, (serverNames = new ArrayList())); + } + serverNames.add(region); + } + } + assign(regions.size(), servers.size(), + "favored-nodes=true", plan); + } private void assign(int regions, int totalServers, String message, Map> bulkPlan) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentPlan.java new file mode 100644 index 0000000..1fbc482 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentPlan.java @@ -0,0 +1,282 @@ +/** + * Copyright 2012 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.Writable; + +/** + * AssignmentPlan is a writable object for the region assignment plan. + * It contains the mapping information between each region and + * its favored region server list. + * + * All the access to this class is thread-safe. + */ +public class AssignmentPlan implements Writable{ + protected static final Log LOG = LogFactory.getLog( + AssignmentPlan.class.getName()); + + private static final int VERSION = 1; + + /** the map between each region and its favored region server list */ + private Map> assignmentMap; + + /** the map between each region and its lasted favored server list update + * time stamp + */ + private Map assignmentUpdateTS; + + public static enum POSITION { + PRIMARY, + SECONDARY, + TERTIARY; + }; + + public AssignmentPlan() { + assignmentMap = new HashMap>(); + assignmentUpdateTS = new HashMap(); + } + + /** + * Initialize the assignment plan with the existing primary region server map + * and the existing secondary/tertiary region server map + * + * if any regions cannot find the proper secondary / tertiary region server + * for whatever reason, just do NOT update the assignment plan for this region + * @param primaryRSMap + * @param secondaryAndTiteraryRSMap + */ + public void initialize(Map primaryRSMap, + Map> secondaryAndTertiaryRSMap) { + for (Map.Entry> entry : + secondaryAndTertiaryRSMap.entrySet()) { + // Get the region info and their secondary/tertiary region server + HRegionInfo regionInfo = entry.getKey(); + Pair secondaryAndTertiaryPair = + entry.getValue(); + + // Get the primary region server + ServerName primaryRS = primaryRSMap.get(regionInfo); + if (primaryRS == null) { + LOG.error("No primary region server for region " + + regionInfo.getRegionNameAsString()); + continue; + } + + // Update the assignment plan with the favored nodes + List serverList = new ArrayList(); + serverList.add(POSITION.PRIMARY.ordinal(), primaryRS); + serverList.add(POSITION.SECONDARY.ordinal(), + secondaryAndTertiaryPair.getFirst()); + serverList.add(POSITION.TERTIARY.ordinal(), + secondaryAndTertiaryPair.getSecond()); + this.updateAssignmentPlan(regionInfo, serverList); + } + } + + /** + * Add an assignment to the plan + * @param region + * @param servers + * @param ts + */ + public synchronized void updateAssignmentPlan(HRegionInfo region, + List servers, long ts) { + if (region == null || servers == null || servers.size() ==0) + return; + this.assignmentUpdateTS.put(region, Long.valueOf(ts)); + this.assignmentMap.put(region, servers); + LOG.info("Update the assignment plan for region " + + region.getRegionNameAsString() + " to favored nodes " + + RegionPlacement.getFavoredNodes(servers) + + " at time stamp " + ts); + } + + /** + * Add an assignment to the plan + * @param region + * @param servers + */ + public synchronized void updateAssignmentPlan(HRegionInfo region, + List servers) { + if (region == null || servers == null || servers.size() ==0) + return; + this.assignmentMap.put(region, servers); + LOG.info("Update the assignment plan for region " + + region.getRegionNameAsString() + " ; favored nodes " + + RegionPlacement.getFavoredNodes(servers)); + } + + /** + * Remove one assignment from the plan + * @param region + */ + public synchronized void removeAssignment(HRegionInfo region) { + this.assignmentMap.remove(region); + this.assignmentUpdateTS.remove(region); + } + + /** + * @param region + * @return true if there is an assignment plan for the particular region. + */ + public synchronized boolean hasAssignment(HRegionInfo region) { + return assignmentMap.containsKey(region); + } + + /** + * @param region + * @return the list of favored region server for this region based on the plan + */ + public synchronized List getAssignment(HRegionInfo region) { + return assignmentMap.get(region); + } + + /** + * @param region + * @return the last update time stamp for the region in the plan + */ + public synchronized long getAssignmentUpdateTS(HRegionInfo region) { + Long updateTS = assignmentUpdateTS.get(region); + if (updateTS == null) + return Long.MIN_VALUE; + else + return updateTS.longValue(); + } + + /** + * @return the mapping between each region to its favored region server list + */ + public synchronized Map> getAssignmentMap() { + return this.assignmentMap; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(VERSION); + if (this.assignmentMap == null) { + out.writeInt(0); + } + // write the size of the favored assignment map + out.writeInt(this.assignmentMap.size()); + for (Map.Entry> entry : + assignmentMap.entrySet()) { + // write the region info + entry.getKey().write(out); + // write the list of favored server list + List serverList = entry.getValue(); + // write the size of the list + out.writeInt(serverList.size()); + for (ServerName addr : serverList) { + // write the element of the list + addr.write(out); + } + } + } + + @Override + public void readFields(DataInput in) throws IOException{ + int version = in.readInt(); + if (version != VERSION) { + throw new IOException("The version mismatch for the assignment plan. " + + "The expected versioin is " + VERSION + + " but the verion from the assigment plan is " + version); + } + // read the favoredAssignmentMap size + int assignmentMapSize = in.readInt(); + for (int i = 0; i < assignmentMapSize; i++) { + // read each region info + HRegionInfo region = new HRegionInfo(); + region.readFields(in); + // read the size of favored server list + int serverListSize = in.readInt(); + List serverList = + new ArrayList(serverListSize); + for (int j = 0; j < serverListSize; j++) { + ServerName addr = new ServerName(); + addr.readFields(in); + serverList.add(addr); + } + + // add the assignment to favoredAssignmentMap + this.assignmentMap.put(region, serverList); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null) { + return false; + } + if (getClass() != o.getClass()) { + return false; + } + // To compare the map from objec o is identical to current assignment map. + Map> comparedMap= + ((AssignmentPlan)o).getAssignmentMap(); + + // compare the size + if (comparedMap.size() != this.assignmentMap.size()) + return false; + + // compare each element in the assignment map + for (Map.Entry> entry : + comparedMap.entrySet()) { + List serverList = this.assignmentMap.get(entry.getKey()); + if (serverList == null && entry.getValue() != null) { + return false; + } else if (!serverList.equals(entry.getValue())) { + return false; + } + } + return true; + } + + public static AssignmentPlan.POSITION getFavoredServerPosition( + List favoredNodes, ServerName server) { + if (favoredNodes == null || server == null || + favoredNodes.size() != HConstants.FAVORED_NODES_NUM) { + return null; + } + for (AssignmentPlan.POSITION p : AssignmentPlan.POSITION.values()) { + if (favoredNodes.get(p.ordinal()).equals(server)) { + return p; + } + } + return null; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index a028720..3915504 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -30,6 +30,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -53,6 +54,7 @@ import org.apache.hadoop.hbase.DeserializationException; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HealthCheckChore; import org.apache.hadoop.hbase.MasterAdminProtocol; @@ -285,6 +287,7 @@ Server { private volatile boolean initialized = false; // flag set after we complete assignRootAndMeta. private volatile boolean serverShutdownHandlerEnabled = false; + public RegionPlacementPolicy regionPlacement; // Instance of the hbase executor service. ExecutorService executorService; @@ -592,6 +595,8 @@ Server { // Check if we should stop every 100ms private Sleeper stopSleeper = new Sleeper(100, this); + private boolean shouldAssignRegionsWithFavoredNodes; + private void loop() { long lastMsgTs = 0l; long now = 0l; @@ -1485,10 +1490,23 @@ Server { if (cpHost != null) { cpHost.preCreateTable(hTableDescriptor, newRegions); } - + AssignmentPlan assignmentPlan = null; + String tableName = hTableDescriptor.getNameAsString(); + if (this.shouldAssignRegionsWithFavoredNodes) { + // Get the assignment domain for this table + AssignmentDomain domain = this.getAssignmentDomain(tableName); + // Get the assignment plan for the new regions + assignmentPlan = + regionPlacement.getNewAssignmentPlan(newRegions, domain); + } + if (assignmentPlan == null) { + LOG.info("Generated the assignment plan for new table " + tableName); + } else { + LOG.info("NO assignment plan for new table " + tableName); + } this.executorService.submit(new CreateTableHandler(this, this.fileSystemManager, hTableDescriptor, conf, - newRegions, catalogTracker, assignmentManager)); + newRegions, catalogTracker, assignmentManager, assignmentPlan.getAssignmentMap())); if (cpHost != null) { cpHost.postCreateTable(hTableDescriptor, newRegions); } @@ -1549,6 +1567,31 @@ Server { Bytes.equals(tableName, HConstants.META_TABLE_NAME); } + /** + * Get the assignment domain for the table. + * Currently the domain would be generated by shuffling all the online + * region servers. + * + * It would be easy to extend for the multi-tenancy in the future. + * @param tableName + * @return the assignment domain for the table. + */ + private AssignmentDomain getAssignmentDomain(String tableName) { + // Get all the online region servers + List onlineRSList = + this.serverManager.getOnlineServersList(); + + // Shuffle the server list based on the tableName + Random random = new Random(tableName.hashCode()); + Collections.shuffle(onlineRSList, random); + + // Add the shuffled server list into the assignment domain + AssignmentDomain domain = new AssignmentDomain(this.conf); + domain.addServers(onlineRSList); + + return domain; + } + @Override public void deleteTable(final byte[] tableName) throws IOException { checkInitialized(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java new file mode 100644 index 0000000..8994df7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java @@ -0,0 +1,65 @@ +package org.apache.hadoop.hbase.master; + +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.ServerName; +//import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.net.CachedDNSToSwitchMapping; +import org.apache.hadoop.net.DNSToSwitchMapping; + +public class RackManager { + static final Log LOG = LogFactory.getLog(RackManager.class); + private DNSToSwitchMapping switchMapping; + + public RackManager(Configuration conf) { + Class clz = (Class) + conf.getClass("hbase.util.ip.to.rack.determiner", + CachedDNSToSwitchMapping.class); + try { + switchMapping = clz.newInstance(); + } catch (InstantiationException e) { + LOG.warn("using IPv4AddressTruncationMapping, failed to instantiate " + + clz.getName(), e); + } catch (IllegalAccessException e) { + LOG.warn("using IPv4AddressTruncationMapping, failed to instantiate " + + clz.getName(), e); + } + } + + /** + * Get the name of the rack containing a server, according to the DNS to + * switch mapping. + * @param info the server for which to get the rack name + * @return the rack name of the server + */ +// public String getRack(HServerInfo info) { +// if (info == null) +// return HConstants.UNKNOWN_RACK; +// return this.getRack(info.getServerAddress()); +// } + + /** + * Get the name of the rack containing a server, according to the DNS to + * switch mapping. + * @param server the server for which to get the rack name + * @return the rack name of the server + */ + public String getRack(ServerName server) { + if (server == null) + return HConstants.UNKNOWN_RACK; + + List racks = switchMapping.resolve(Arrays.asList( + new String[]{server.getBindAddress()})); + if (racks != null && racks.size() > 0) { + return racks.get(0); + } + + return HConstants.UNKNOWN_RACK; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionAssignmentSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionAssignmentSnapshot.java new file mode 100644 index 0000000..7878a0e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionAssignmentSnapshot.java @@ -0,0 +1,201 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.MetaScanner; +import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; + +public class RegionAssignmentSnapshot { + private static final Log LOG = LogFactory.getLog(RegionAssignmentSnapshot.class + .getName()); + + private Configuration conf; + + /** the table name to region map */ + private final Map> tableToRegionMap; + /** the region to region server map */ + private final Map regionToRegionServerMap; + /** the region name to region info map */ + private final Map regionNameToRegionInfoMap; + + /** the regionServer to region map */ + private final Map> regionServerToRegionMap; + /** the existing assignment plan in the META region */ + private final AssignmentPlan exsitingAssignmentPlan; + /** The rack view for the current region server */ + private final AssignmentDomain globalAssignmentDomain; + + public RegionAssignmentSnapshot(Configuration conf) { + this.conf = conf; + tableToRegionMap = new HashMap>(); + regionToRegionServerMap = new HashMap(); + regionServerToRegionMap = new HashMap>(); + regionNameToRegionInfoMap = new TreeMap(); + exsitingAssignmentPlan = new AssignmentPlan(); + globalAssignmentDomain = new AssignmentDomain(conf); + } + + /** + * Initialize the region assignment snapshot by scanning the META table + * @throws IOException + */ + public void initialize() throws IOException { + LOG.info("Start to scan the META for the current region assignment " + + "snappshot"); + + // Add all the online region servers + HBaseAdmin admin = new HBaseAdmin(conf); + Collection servers = admin.getClusterStatus().getServers(); + for (ServerName serverInfo : admin.getClusterStatus().getServerInfo()) { + globalAssignmentDomain.addServer(serverInfo); + } + + MetaScannerVisitor visitor = new MetaScannerVisitor() { + public boolean processRow(Result result) throws IOException { + try { + byte[] region = result.getValue(HConstants.CATALOG_FAMILY, + HConstants.REGIONINFO_QUALIFIER); + byte[] server = result.getValue(HConstants.CATALOG_FAMILY, + HConstants.SERVER_QUALIFIER); + // Process the region info + if (region == null) return true; + HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(result); + if (regionInfo == null || regionInfo.isSplit()) { + return true; + } + addRegion(regionInfo); + + // Process the region server + if (server == null) return true; + ServerName regionServer = new ServerName(Bytes.toString(server)); + + // Add the current assignment to the snapshot + addAssignment(regionInfo, regionServer); + + // Process the assignment plan + byte[] favoredNodes = result.getValue(HConstants.CATALOG_FAMILY, + HConstants.FAVOREDNODES_QUALIFIER); + if (favoredNodes == null) return true; + // Add the favored nodes into assignment plan + List favoredServerList = + RegionPlacement.getFavoredNodesList(favoredNodes); + exsitingAssignmentPlan.updateAssignmentPlan(regionInfo, + favoredServerList); + return true; + } catch (RuntimeException e) { + LOG.error("Catche remote exception " + e.getMessage() + + " when processing" + result); + throw e; + } + } + + @Override + public void close() throws IOException { + // TODO Auto-generated method stub + + } + }; + + // Scan .META. to pick up user regions + MetaScanner.metaScan(conf, visitor); + LOG.info("Finished to scan the META for the current region assignment" + + "snapshot"); + } + + private void addRegion(HRegionInfo regionInfo) { + if (regionInfo == null) + return; + // Process the region name to region info map + regionNameToRegionInfoMap.put(regionInfo.getRegionNameAsString(), regionInfo); + + // Process the table to region map + String tableName = regionInfo.getTableNameAsString(); + List regionList = tableToRegionMap.get(tableName); + if (regionList == null) { + regionList = new ArrayList(); + } + // Add the current region info into the tableToRegionMap + regionList.add(regionInfo); + tableToRegionMap.put(tableName, regionList); + } + + private void addAssignment(HRegionInfo regionInfo, ServerName server) { + if (server != null && regionInfo != null) { + // Process the region to region server map + regionToRegionServerMap.put(regionInfo, server); + + // Process the region server to region map + List regionList = regionServerToRegionMap.get(server); + if (regionList == null) { + regionList = new ArrayList(); + } + regionList.add(regionInfo); + regionServerToRegionMap.put(server, regionList); + } + } + + public Map getRegionNameToRegionInfoMap() { + return this.regionNameToRegionInfoMap; + } + + public Map> getTableToRegionMap() { + return tableToRegionMap; + } + + public Map getRegionToRegionServerMap() { + return regionToRegionServerMap; + } + + public Map> getRegionServerToRegionMap() { + return regionServerToRegionMap; + } + + public AssignmentPlan getExistingAssignmentPlan() { + return this.exsitingAssignmentPlan; + } + + public AssignmentDomain getGlobalAssignmentDomain() { + return this.globalAssignmentDomain; + } + + public Set getTableSet() { + return this.tableToRegionMap.keySet(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacement.java new file mode 100644 index 0000000..a17736c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacement.java @@ -0,0 +1,1561 @@ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Scanner; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.MunkresAssignment; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +public class RegionPlacement implements RegionPlacementPolicy{ + private static final Log LOG = LogFactory.getLog(RegionPlacement.class + .getName()); + + // The cost of a placement that should never be assigned. + private static final float MAX_COST = Float.POSITIVE_INFINITY; + + // The cost of a placement that is undesirable but acceptable. + private static final float AVOID_COST = 100000f; + + // The amount by which the cost of a placement is increased if it is the + // last slot of the server. This is done to more evenly distribute the slop + // amongst servers. + private static final float LAST_SLOT_COST_PENALTY = 0.5f; + + // The amount by which the cost of a primary placement is penalized if it is + // not the host currently serving the region. This is done to minimize moves. + private static final float NOT_CURRENT_HOST_PENALTY = 0.1f; + + private static boolean USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = false; + + private Configuration conf; + private final boolean enforceLocality; + private final boolean enforceMinAssignmentMove; + private HBaseAdmin admin; + private Set targetTableSet; + private final static String SERVER_NAME_SEPARATOR = ";"; + + public RegionPlacement(Configuration conf) + throws IOException { + this(conf, true, true); + } + + public RegionPlacement(Configuration conf, boolean enforceLocality, + boolean enforceMinAssignmentMove) + throws IOException { + this.conf = conf; + this.enforceLocality = enforceLocality; + this.enforceMinAssignmentMove = enforceMinAssignmentMove; + this.targetTableSet = new HashSet(); + } + + @Override + public AssignmentPlan getNewAssignmentPlan(HRegionInfo[] regions, + AssignmentDomain domain) throws IOException { + if (regions == null || regions.length == 0 || + domain == null || domain.isEmpty() || !domain.canPlaceFavoredNodes()) + return null; + + try { + // Place the primary region server based on the regions and servers + Map primaryRSMap = + this.placePrimaryRSAsRoundRobin(regions, domain); + + // Place the secondary and tertiary region server + Map> + secondaryAndTertiaryRSMap = + this.placeSecondaryAndTertiaryRS(primaryRSMap, domain); + + // Get the assignment plan by initialization with the primaryRSMap and the + // secondaryAndTertiaryRSMap + AssignmentPlan plan = new AssignmentPlan(); + plan.initialize(primaryRSMap, secondaryAndTertiaryRSMap); + return plan; + } catch (Exception e) { + LOG.debug("Cannot generate the assignment plan because " + e); + return null; + } + } + + /** + * Place the primary region server in the round robin way. + * @param regions + * @param domain + * @return the map between regions and its primary region server + * @throws IOException + */ + private Map placePrimaryRSAsRoundRobin( + HRegionInfo[] regions, AssignmentDomain domain) throws IOException { + + // Get the rack to region server map from the assignment domain + Map> rackToRegionServerMap= + domain.getRackToRegionServerMap(); + + List rackList = new ArrayList(); + rackList.addAll(rackToRegionServerMap.keySet()); + Map currentProcessIndexMap = new HashMap(); + int rackIndex = 0; + + // Place the region with its primary region sever in a round robin way. + Map primaryRSMap = + new HashMap(); + for (HRegionInfo regionInfo : regions) { + String rackName = rackList.get(rackIndex); + // Initialize the current processing host index. + int serverIndex = 0; + + // Restore the current process index from the currentProcessIndexMap + Integer currentProcessIndex = currentProcessIndexMap.get(rackName); + if (currentProcessIndex != null) { + serverIndex = currentProcessIndex.intValue(); + } + // Get the server list for the current rack + List currentServerList = rackToRegionServerMap.get(rackName); + + // Get the current process region server + ServerName currentServer = currentServerList.get(serverIndex); + + // Place the current region with the current primary region server + primaryRSMap.put(regionInfo, currentServer); + + // Set the next processing index + if ((++serverIndex) >= currentServerList.size()) { + // Reset the server index for the current rack + serverIndex = 0; + } + // Keep track of the next processing index + currentProcessIndexMap.put(rackName, serverIndex); + if ((++rackIndex) >= rackList.size()) { + rackIndex = 0; // reset the rack index to 0 + } + } + + return primaryRSMap; + } + + /** + * Place the secondary and tertiary region server. Best effort to place the + * secondary and tertiary into the different rack as the primary region server. + * Also best effort to place the secondary and tertiary into the same rack. + * + * There are more than 3 region server for the placement. + * @param primaryRSMap + * @param domain + * @return + * @throws IOException + */ + private Map> placeSecondaryAndTertiaryRS( + Map primaryRSMap, AssignmentDomain domain) + throws IOException { + Map> secondaryAndTertiaryMap + = new HashMap>(); + + for (Map.Entry entry : primaryRSMap.entrySet()) { + // Get the target region and its primary region server rack + HRegionInfo regionInfo = entry.getKey(); + ServerName primaryRS = entry.getValue(); + + // Set the random seed in the assignment domain + domain.setRandomSeed(regionInfo.hashCode()); + try { + // Create the secondary and tertiary region server pair object. + Pair pair; + // Get the rack for the primary region server + String primaryRack = domain.getRack(primaryRS); + + if (domain.getTotalRackNum() == 1) { + // Single rack case: have to pick the secondary and tertiary + // from the same rack + List serverList = domain.getServersFromRack(primaryRack); + if (serverList.size() <= 2) { + // Single region server case: cannot not place the favored nodes + // on any server; !domain.canPlaceFavoredNodes() + break; + } else { + // Randomly select two region servers from the server list and make sure + // they are not overlap with the primary region server; + Set serverSkipSet = new HashSet(); + serverSkipSet.add(primaryRS); + + // Place the secondary RS + ServerName secondaryRS = + domain.getOneRandomServer(primaryRack, serverSkipSet); + // Skip the secondary for the tertiary placement + serverSkipSet.add(secondaryRS); + + // Place the tertiary RS + ServerName tertiaryRS = + domain.getOneRandomServer(primaryRack, serverSkipSet); + + if (secondaryRS == null || tertiaryRS == null) { + LOG.error("Cannot place the secondary and terinary" + + "region server for region " + + regionInfo.getRegionNameAsString()); + } + // Create the secondary and tertiary pair + pair = new Pair(); + pair.setFirst(secondaryRS); + pair.setSecond(tertiaryRS); + } + } else { + // Random to choose the secondary and tertiary region server + // from another rack to place the secondary and tertiary + + // Random to choose one rack except for the current rack + Set rackSkipSet = new HashSet(); + rackSkipSet.add(primaryRack); + String secondaryRack = domain.getOneRandomRack(rackSkipSet); + List serverList = domain.getServersFromRack(secondaryRack); + if (serverList.size() >= 2) { + // Randomly pick up two servers from this secondary rack + + // Place the secondary RS + ServerName secondaryRS = + domain.getOneRandomServer(secondaryRack); + + // Skip the secondary for the tertiary placement + Set skipServerSet = new HashSet(); + skipServerSet.add(secondaryRS); + // Place the tertiary RS + ServerName tertiaryRS = + domain.getOneRandomServer(secondaryRack, skipServerSet); + + if (secondaryRS == null || tertiaryRS == null) { + LOG.error("Cannot place the secondary and terinary" + + "region server for region " + + regionInfo.getRegionNameAsString()); + } + // Create the secondary and tertiary pair + pair = new Pair(); + pair.setFirst(secondaryRS); + pair.setSecond(tertiaryRS); + } else { + // Pick the secondary rs from this secondary rack + // and pick the tertiary from another random rack + pair = new Pair(); + ServerName secondary = domain.getOneRandomServer(secondaryRack); + pair.setFirst(secondary); + + // Pick the tertiary + if (domain.getTotalRackNum() == 2) { + // Pick the tertiary from the same rack of the primary RS + Set serverSkipSet = new HashSet(); + serverSkipSet.add(primaryRS); + ServerName tertiary = + domain.getOneRandomServer(primaryRack, serverSkipSet); + pair.setSecond(tertiary); + } else { + // Pick the tertiary from another rack + rackSkipSet.add(secondaryRack); + String tertiaryRandomRack = domain.getOneRandomRack(rackSkipSet); + ServerName tertinary = + domain.getOneRandomServer(tertiaryRandomRack); + pair.setSecond(tertinary); + } + } + } + if (pair != null) { + secondaryAndTertiaryMap.put(regionInfo, pair); + LOG.debug("Place the secondary and tertiary region server for region " + + regionInfo.getRegionNameAsString()); + } + } catch (Exception e) { + LOG.warn("Cannot place the favored nodes for region " + + regionInfo.getRegionNameAsString() + " because " + e); + continue; + } + } + return secondaryAndTertiaryMap; + } + + // For regions that share the primary, avoid placing the secondary and tertiary on a same RS + public Map> placeSecondaryAndTertiaryWithRestrictions( + Map primaryRSMap, AssignmentDomain domain) + throws IOException { + Map mapServerToRack = domain + .getRegionServerToRackMap(); + Map> serverToPrimaries = + mapRSToPrimaries(primaryRSMap); + Map> secondaryAndTertiaryMap = + new HashMap>(); + + for (Entry entry : primaryRSMap.entrySet()) { + // Get the target region and its primary region server rack + HRegionInfo regionInfo = entry.getKey(); + ServerName primaryRS = entry.getValue(); + + // Set the random seed in the assignment domain + domain.setRandomSeed(regionInfo.hashCode()); + try { + // Create the secondary and tertiary region server pair object. + Pair pair; + // Get the rack for the primary region server + String primaryRack = domain.getRack(primaryRS); + + if (domain.getTotalRackNum() == 1) { + // Single rack case: have to pick the secondary and tertiary + // from the same rack + List serverList = domain + .getServersFromRack(primaryRack); + if (serverList.size() <= 2) { + // Single region server case: cannot not place the favored nodes + // on any server; !domain.canPlaceFavoredNodes() + continue; + } else { + // Randomly select two region servers from the server list and make + // sure + // they are not overlap with the primary region server; + Set serverSkipSet = new HashSet(); + serverSkipSet.add(primaryRS); + + // Place the secondary RS + ServerName secondaryRS = domain.getOneRandomServer(primaryRack, + serverSkipSet); + // Skip the secondary for the tertiary placement + serverSkipSet.add(secondaryRS); + + // Place the tertiary RS + ServerName tertiaryRS = domain.getOneRandomServer(primaryRack, + serverSkipSet); + + if (secondaryRS == null || tertiaryRS == null) { + LOG.error("Cannot place the secondary and terinary" + + "region server for region " + + regionInfo.getRegionNameAsString()); + } + // Create the secondary and tertiary pair + pair = new Pair(); + pair.setFirst(secondaryRS); + pair.setSecond(tertiaryRS); + } + } else { + // Random to choose the secondary and tertiary region server + // from another rack to place the secondary and tertiary + // Random to choose one rack except for the current rack + Set rackSkipSet = new HashSet(); + rackSkipSet.add(primaryRack); + String secondaryRack = domain.getOneRandomRack(rackSkipSet); + List serverList = domain + .getServersFromRack(secondaryRack); + Set serverSet = new HashSet(); + serverSet.addAll(serverList); + + if (serverList.size() >= 2) { + + // Randomly pick up two servers from this secondary rack + // Skip the secondary for the tertiary placement + // skip the servers which share the primary already + Set primaries = serverToPrimaries.get(primaryRS); + Set skipServerSet = new HashSet(); + while (true) { + Pair secondaryAndTertiary = null; + if (primaries.size() > 1) { + // check where his tertiary and secondary are + for (HRegionInfo primary : primaries) { + secondaryAndTertiary = secondaryAndTertiaryMap.get(primary); + if (secondaryAndTertiary != null) { + if (mapServerToRack.get(secondaryAndTertiary.getFirst()) + .equals(secondaryRack)) { + skipServerSet.add(secondaryAndTertiary.getFirst()); + } + if (mapServerToRack.get(secondaryAndTertiary.getSecond()) + .equals(secondaryRack)) { + skipServerSet.add(secondaryAndTertiary.getSecond()); + } + } + } + } + if (skipServerSet.size() + 2 <= serverSet.size()) + break; + skipServerSet.clear(); + rackSkipSet.add(secondaryRack); + // we used all racks + if (rackSkipSet.size() == domain.getTotalRackNum()) { + // remove the last two added and break + skipServerSet.remove(secondaryAndTertiary.getFirst()); + skipServerSet.remove(secondaryAndTertiary.getSecond()); + break; + } + secondaryRack = domain.getOneRandomRack(rackSkipSet); + serverList = domain.getServersFromRack(secondaryRack); + serverSet = new HashSet(); + serverSet.addAll(serverList); + } + + // Place the secondary RS + ServerName secondaryRS = domain.getOneRandomServer( + secondaryRack, skipServerSet); + skipServerSet.add(secondaryRS); + // Place the tertiary RS + ServerName tertiaryRS = domain.getOneRandomServer( + secondaryRack, skipServerSet); + + if (secondaryRS == null || tertiaryRS == null) { + LOG.error("Cannot place the secondary and tertiary" + + " region server for region " + + regionInfo.getRegionNameAsString()); + } + // Create the secondary and tertiary pair + pair = new Pair(); + pair.setFirst(secondaryRS); + pair.setSecond(tertiaryRS); + } else { + // Pick the secondary rs from this secondary rack + // and pick the tertiary from another random rack + pair = new Pair(); + ServerName secondary = domain.getOneRandomServer(secondaryRack); + pair.setFirst(secondary); + + // Pick the tertiary + if (domain.getTotalRackNum() == 2) { + // Pick the tertiary from the same rack of the primary RS + Set serverSkipSet = new HashSet(); + serverSkipSet.add(primaryRS); + ServerName tertiary = domain.getOneRandomServer(primaryRack, + serverSkipSet); + pair.setSecond(tertiary); + } else { + // Pick the tertiary from another rack + rackSkipSet.add(secondaryRack); + String tertiaryRandomRack = domain.getOneRandomRack(rackSkipSet); + ServerName tertinary = domain + .getOneRandomServer(tertiaryRandomRack); + pair.setSecond(tertinary); + } + } + } + if (pair != null) { + secondaryAndTertiaryMap.put(regionInfo, pair); + LOG.debug("Place the secondary and tertiary region server for region " + + regionInfo.getRegionNameAsString()); + } + } catch (Exception e) { + LOG.warn("Cannot place the favored nodes for region " + + regionInfo.getRegionNameAsString() + " because " + e); + continue; + } + } + return secondaryAndTertiaryMap; + } + + public Map> mapRSToPrimaries( + Map primaryRSMap) { + Map> primaryServerMap = + new HashMap>(); + for (Entry e : primaryRSMap.entrySet()) { + Set currentSet = primaryServerMap.get(e.getValue()); + if (currentSet == null) { + currentSet = new HashSet(); + } + currentSet.add(e.getKey()); + primaryServerMap.put(e.getValue(), currentSet); + } + return primaryServerMap; + } + + /** + * Generate the assignment plan for the existing table + * + * @param tableName + * @param assignmentSnapshot + * @param regionLocalityMap + * @param plan + * @param munkresForSecondaryAndTertiary if set on true the assignment plan + * for the tertiary and secondary will be generated with Munkres algorithm, + * otherwise will be generated using placeSecondaryAndTertiaryRS + * @throws IOException + */ + private void genAssignmentPlan(String tableName, + RegionAssignmentSnapshot assignmentSnapshot, + Map> regionLocalityMap, AssignmentPlan plan, + boolean munkresForSecondaryAndTertiary) throws IOException { + // Get the all the regions for the current table + List regions = + assignmentSnapshot.getTableToRegionMap().get(tableName); + int numRegions = regions.size(); + + // Get the current assignment map + Map currentAssignmentMap = + assignmentSnapshot.getRegionToRegionServerMap(); + + // Get the assignment domain + AssignmentDomain domain = assignmentSnapshot.getGlobalAssignmentDomain(); + + // Get the all the region servers + List servers = new ArrayList(); + servers.addAll(domain.getAllServers()); + + LOG.info("Start to generate assignment plan for " + numRegions + + " regions from table " + tableName + " with " + + servers.size() + " region servers"); + + int slotsPerServer = (int) Math.ceil((float) numRegions / + servers.size()); + int regionSlots = slotsPerServer * servers.size(); + + // Compute the primary, secondary and tertiary costs for each region/server + // pair. These costs are based only on node locality and rack locality, and + // will be modified later. + float[][] primaryCost = new float[numRegions][regionSlots]; + float[][] secondaryCost = new float[numRegions][regionSlots]; + float[][] tertiaryCost = new float[numRegions][regionSlots]; + + if (this.enforceLocality && regionLocalityMap != null) { + // Transform the locality mapping into a 2D array, assuming that any + // unspecified locality value is 0. + float[][] localityPerServer = new float[numRegions][regionSlots]; + for (int i = 0; i < numRegions; i++) { + Map serverLocalityMap = + regionLocalityMap.get(regions.get(i).getEncodedName()); + if (serverLocalityMap == null) { + continue; + } + for (int j = 0; j < servers.size(); j++) { + String serverName = servers.get(j).getHostname(); + if (serverName == null) { + continue; + } + Float locality = serverLocalityMap.get(serverName); + if (locality == null) { + continue; + } + for (int k = 0; k < slotsPerServer; k++) { + // If we can't find the locality of a region to a server, which occurs + // because locality is only reported for servers which have some + // blocks of a region local, then the locality for that pair is 0. + localityPerServer[i][j * slotsPerServer + k] = locality.floatValue(); + } + } + } + + // Compute the total rack locality for each region in each rack. The total + // rack locality is the sum of the localities of a region on all servers in + // a rack. + Map> rackRegionLocality = + new HashMap>(); + for (int i = 0; i < numRegions; i++) { + HRegionInfo region = regions.get(i); + for (int j = 0; j < regionSlots; j += slotsPerServer) { + String rack = domain.getRack(servers.get(j / slotsPerServer)); + Map rackLocality = rackRegionLocality.get(rack); + if (rackLocality == null) { + rackLocality = new HashMap(); + rackRegionLocality.put(rack, rackLocality); + } + Float localityObj = rackLocality.get(region); + float locality = localityObj == null ? 0 : localityObj.floatValue(); + locality += localityPerServer[i][j]; + rackLocality.put(region, locality); + } + } + for (int i = 0; i < numRegions; i++) { + for (int j = 0; j < regionSlots; j++) { + String rack = domain.getRack(servers.get(j / slotsPerServer)); + Float totalRackLocalityObj = + rackRegionLocality.get(rack).get(regions.get(i)); + float totalRackLocality = totalRackLocalityObj == null ? + 0 : totalRackLocalityObj.floatValue(); + + // Primary cost aims to favor servers with high node locality and low + // rack locality, so that secondaries and tertiaries can be chosen for + // nodes with high rack locality. This might give primaries with + // slightly less locality at first compared to a cost which only + // considers the node locality, but should be better in the long run. + primaryCost[i][j] = 1 - (2 * localityPerServer[i][j] - + totalRackLocality); + + // Secondary cost aims to favor servers with high node locality and high + // rack locality since the tertiary will be chosen from the same rack as + // the secondary. This could be negative, but that is okay. + secondaryCost[i][j] = 2 - (localityPerServer[i][j] + totalRackLocality); + + // Tertiary cost is only concerned with the node locality. It will later + // be restricted to only hosts on the same rack as the secondary. + tertiaryCost[i][j] = 1 - localityPerServer[i][j]; + } + } + } + + if (this.enforceMinAssignmentMove && currentAssignmentMap != null) { + // We want to minimize the number of regions which move as the result of a + // new assignment. Therefore, slightly penalize any placement which is for + // a host that is not currently serving the region. + for (int i = 0; i < numRegions; i++) { + for (int j = 0; j < servers.size(); j++) { + ServerName currentAddress = currentAssignmentMap.get(regions.get(i)); + if (currentAddress != null && + !currentAddress.equals(servers.get(j))) { + for (int k = 0; k < slotsPerServer; k++) { + primaryCost[i][j * slotsPerServer + k] += NOT_CURRENT_HOST_PENALTY; + } + } + } + } + } + + // Artificially increase cost of last slot of each server to evenly + // distribute the slop, otherwise there will be a few servers with too few + // regions and many servers with the max number of regions. + for (int i = 0; i < numRegions; i++) { + for (int j = 0; j < regionSlots; j += slotsPerServer) { + primaryCost[i][j] += LAST_SLOT_COST_PENALTY; + secondaryCost[i][j] += LAST_SLOT_COST_PENALTY; + tertiaryCost[i][j] += LAST_SLOT_COST_PENALTY; + } + } + + RandomizedMatrix randomizedMatrix = new RandomizedMatrix(numRegions, + regionSlots); + primaryCost = randomizedMatrix.transform(primaryCost); + int[] primaryAssignment = new MunkresAssignment(primaryCost).solve(); + primaryAssignment = randomizedMatrix.invertIndices(primaryAssignment); + + // Modify the secondary and tertiary costs for each region/server pair to + // prevent a region from being assigned to the same rack for both primary + // and either one of secondary or tertiary. + for (int i = 0; i < numRegions; i++) { + int slot = primaryAssignment[i]; + String rack = domain.getRack(servers.get(slot / slotsPerServer)); + for (int k = 0; k < servers.size(); k++) { + if (!domain.getRack(servers.get(k)).equals(rack)) { + continue; + } + if (k == slot / slotsPerServer) { + // Same node, do not place secondary or tertiary here ever. + for (int m = 0; m < slotsPerServer; m++) { + secondaryCost[i][k * slotsPerServer + m] = MAX_COST; + tertiaryCost[i][k * slotsPerServer + m] = MAX_COST; + } + } else { + // Same rack, do not place secondary or tertiary here if possible. + for (int m = 0; m < slotsPerServer; m++) { + secondaryCost[i][k * slotsPerServer + m] = AVOID_COST; + tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST; + } + } + } + } + if (munkresForSecondaryAndTertiary) { + randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots); + secondaryCost = randomizedMatrix.transform(secondaryCost); + int[] secondaryAssignment = new MunkresAssignment(secondaryCost).solve(); + secondaryAssignment = randomizedMatrix.invertIndices(secondaryAssignment); + + // Modify the tertiary costs for each region/server pair to ensure that a + // region is assigned to a tertiary server on the same rack as its secondary + // server, but not the same server in that rack. + for (int i = 0; i < numRegions; i++) { + int slot = secondaryAssignment[i]; + String rack = domain.getRack(servers.get(slot / slotsPerServer)); + for (int k = 0; k < servers.size(); k++) { + if (k == slot / slotsPerServer) { + // Same node, do not place tertiary here ever. + for (int m = 0; m < slotsPerServer; m++) { + tertiaryCost[i][k * slotsPerServer + m] = MAX_COST; + } + } else { + if (domain.getRack(servers.get(k)).equals(rack)) { + continue; + } + // Different rack, do not place tertiary here if possible. + for (int m = 0; m < slotsPerServer; m++) { + tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST; + } + } + } + } + + randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots); + tertiaryCost = randomizedMatrix.transform(tertiaryCost); + int[] tertiaryAssignment = new MunkresAssignment(tertiaryCost).solve(); + tertiaryAssignment = randomizedMatrix.invertIndices(tertiaryAssignment); + + for (int i = 0; i < numRegions; i++) { + List favoredServers = + new ArrayList(HConstants.FAVORED_NODES_NUM); + favoredServers.add(servers.get(primaryAssignment[i] / slotsPerServer)); + favoredServers.add(servers.get(secondaryAssignment[i] / slotsPerServer)); + favoredServers.add(servers.get(tertiaryAssignment[i] / slotsPerServer)); + // Update the assignment plan + plan.updateAssignmentPlan(regions.get(i), favoredServers); + } + LOG.info("Generated the assignment plan for " + numRegions + + " regions from table " + tableName + " with " + + servers.size() + " region servers"); + LOG.info("Assignment plan for secondary and tertiary generated " + + "using MunkresAssignment"); + } else { + Map primaryRSMap = new HashMap(); + for (int i = 0; i < numRegions; i++) { + primaryRSMap.put(regions.get(i), servers.get(primaryAssignment[i] / slotsPerServer)); + } + Map> secondaryAndTertiaryMap = + placeSecondaryAndTertiaryWithRestrictions(primaryRSMap, domain); + for (int i = 0; i < numRegions; i++) { + List favoredServers = + new ArrayList(HConstants.FAVORED_NODES_NUM); + HRegionInfo currentRegion = regions.get(i); + favoredServers.add(primaryRSMap.get(currentRegion)); + Pair secondaryAndTertiary = + secondaryAndTertiaryMap.get(currentRegion); + favoredServers.add(secondaryAndTertiary.getFirst()); + favoredServers.add(secondaryAndTertiary.getSecond()); + // Update the assignment plan + plan.updateAssignmentPlan(regions.get(i), favoredServers); + } + LOG.info("Generated the assignment plan for " + numRegions + + " regions from table " + tableName + " with " + + servers.size() + " region servers"); + LOG.info("Assignment plan for secondary and tertiary generated " + + "using placeSecondaryAndTertiaryWithRestrictions method"); + } + } + + /*@Override + public AssignmentPlan getNewAssignmentPlan() throws IOException { + // Get the current region assignment snapshot by scanning from the META + RegionAssignmentSnapshot assignmentSnapshot = + this.getRegionAssignmentSnapshot(); + + // Get the region locality map + Map> regionLocalityMap = null; + if (this.enforceLocality) { + regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf); + } + // Initialize the assignment plan + AssignmentPlan plan = new AssignmentPlan(); + + // Get the table to region mapping + Map> tableToRegionMap = + assignmentSnapshot.getTableToRegionMap(); + LOG.info("Start to generate the new assignment plan for the " + + + tableToRegionMap.keySet().size() + " tables" ); + for (String table : tableToRegionMap.keySet()) { + try { + if (!this.targetTableSet.isEmpty() && + !this.targetTableSet.contains(table)) { + continue; + } + // TODO: maybe run the placement in parallel for each table + genAssignmentPlan(table, assignmentSnapshot, regionLocalityMap, plan, + USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY); + } catch (Exception e) { + LOG.error("Get some exceptions for placing primary region server" + + "for table " + table + " because " + e); + } + } + LOG.info("Finish to generate the new assignment plan for the " + + + tableToRegionMap.keySet().size() + " tables" ); + return plan; + } + + @Override + public void updateAssignmentPlan(AssignmentPlan plan) + throws IOException { + LOG.info("Start to update the new assignment plan for the META table and" + + " the region servers"); + // Update the new assignment plan to META + updateAssignmentPlanToMeta(plan); + // Update the new assignment plan to Region Servers + updateAssignmentPlanToRegionServers(plan); + LOG.info("Finish to update the new assignment plan for the META table and" + + " the region servers"); + } + + @Override + public AssignmentPlan getExistingAssignmentPlan() throws IOException { + RegionAssignmentSnapshot snapshot = this.getRegionAssignmentSnapshot(); + return snapshot.getExistingAssignmentPlan(); + }*/ + + /** + * Update the assignment plan into .META. + * @param plan the assignments plan to be updated into .META. + * @throws IOException if cannot update assignment plan in .META. + */ +//// public void updateAssignmentPlanToMeta(AssignmentPlan plan) +//// throws IOException { +//// try { +//// LOG.info("Start to update the META with the new assignment plan"); +//// List puts = new ArrayList(); +//// Map> assignmentMap = +//// plan.getAssignmentMap(); +//// for (Map.Entry> entry : +//// assignmentMap.entrySet()) { +//// String favoredNodes = RegionPlacement.getFavoredNodes(entry.getValue()); +//// Put put = new Put(entry.getKey().getRegionName()); +//// put.add(HConstants.CATALOG_FAMILY, HConstants.FAVOREDNODES_QUALIFIER, +//// favoredNodes.getBytes()); +//// puts.add(put); +//// } +//// +//// // Write the region assignments to the meta table. +//// HTable metaTable = new HTable(conf, HConstants.META_TABLE_NAME); +//// metaTable.put(puts); +//// LOG.info("Updated the META with the new assignment plan"); +//// } catch (Exception e) { +//// LOG.error("Failed to update META with the new assignment" + +//// "plan because " + e.getMessage()); +//// } +//// } +// +// /** +// * Update the assignment plan to all the region servers +// * @param plan +// * @throws IOException +// */ +// private void updateAssignmentPlanToRegionServers(AssignmentPlan plan) +// throws IOException{ +// LOG.info("Start to update the region servers with the new assignment plan"); +// // Get the region to region server map +// Map> currentAssignment = +// this.getRegionAssignmentSnapshot().getRegionServerToRegionMap(); +// HConnection connection = this.getHBaseAdmin().getConnection(); +// +// // track of the failed and succeeded updates +// int succeededNum = 0; +// Map failedUpdateMap = +// new HashMap(); +// +// for (Map.Entry> entry : +// currentAssignment.entrySet()) { +// try { +// // Keep track of the favored updates for the current region server +// AssignmentPlan singleServerPlan = null; +// // Find out all the updates for the current region server +// for (HRegionInfo region : entry.getValue()) { +// List favoredServerList = plan.getAssignment(region); +// if (favoredServerList != null && +// favoredServerList.size() == HConstants.FAVORED_NODES_NUM) { +// // Create the single server plan if necessary +// if (singleServerPlan == null) { +// singleServerPlan = new AssignmentPlan(); +// } +// // Update the single server update +// singleServerPlan.updateAssignmentPlan(region, favoredServerList); +// } +// +// } +// if (singleServerPlan != null) { +// // Update the current region server with its updated favored nodes +// HRegionInterface currentRegionServer = +// connection.getHRegionConnection(entry.getKey()); +// int updatedRegionNum = +// currentRegionServer.updateFavoredNodes(singleServerPlan); +// LOG.info("Region server " + +// currentRegionServer.getHServerInfo().getHostnamePort() + +// " has updated " + updatedRegionNum + " / " + +// singleServerPlan.getAssignmentMap().size() + +// " regions with the assignment plan"); +// succeededNum ++; +// } +// } catch (Exception e) { +// failedUpdateMap.put(entry.getKey(), e); +// } +// } +// // log the succeeded updates +// LOG.info("Updated " + succeededNum + " region servers with " + +// "the new assignment plan"); +// +// // log the failed updates +// int failedNum = failedUpdateMap.size(); +// if (failedNum != 0) { +// LOG.error("Failed to update the following + " + failedNum + +// " region servers with its corresponding favored nodes"); +// for (Map.Entry entry : +// failedUpdateMap.entrySet() ) { +// LOG.error("Failed to update " + entry.getKey().getHostNameWithPort() + +// " because of " + entry.getValue().getMessage()); +// } +// } +// } +// +// +// /** +// * Verify the region placement is consistent with the assignment plan; +// * @throws IOException +// */ +// public void verifyRegionPlacement(boolean isDetailMode) throws IOException { +// System.out.println("Start to verify the region assignment and " + +// "generate the verification report"); +// // Get the region assignment snapshot +// RegionAssignmentSnapshot snapshot = this.getRegionAssignmentSnapshot(); +// +// // Get all the tables +// Set tables = snapshot.getTableSet(); +// +// // Get the region locality map +// Map> regionLocalityMap = null; +// if (this.enforceLocality == true) { +// regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf); +// } +// // Iterate all the tables to fill up the verification report +// for (String table : tables) { +// if (!this.targetTableSet.isEmpty() && +// !this.targetTableSet.contains(table)) { +// continue; +// } +// AssignmentVerificationReport report = new AssignmentVerificationReport(); +// report.fillUp(table, snapshot, regionLocalityMap); +// report.print(isDetailMode); +// } +// } +// +// public void printDispersionScores(String table, +// RegionAssignmentSnapshot snapshot, int numRegions, AssignmentPlan newPlan, +// boolean simplePrint) { +// if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) { +// return; +// } +// AssignmentVerificationReport report = new AssignmentVerificationReport(); +// report.fillUpDispersion(table, snapshot, newPlan); +// List dispersion = report.getDispersionInformation(); +// if (simplePrint) { +// DecimalFormat df = new java.text.DecimalFormat("#.##"); +// System.out.println("\tAvg dispersion score: " +// + df.format(dispersion.get(0)) + " hosts;\tMax dispersion score: " +// + df.format(dispersion.get(1)) + " hosts;\tMin dispersion score: " +// + df.format(dispersion.get(2)) + " hosts;"); +// } else { +// LOG.info("For Table: " + table + " ; #Total Regions: " + numRegions +// + " ; The average dispersion score is " + dispersion.get(0)); +// } +// } + + public void setTargetTableName(String[] tableNames) { + if (tableNames != null) { + for (String table : tableNames) + this.targetTableSet.add(table); + } + } + + /** + * @param serverList + * @return string the favoredNodes generated by the server list. + */ + public static String getFavoredNodes(List serverAddrList) { + String favoredNodes = ""; + if (serverAddrList != null) { + for (int i = 0 ; i < serverAddrList.size(); i++) { + favoredNodes += serverAddrList.get(i).getHostAndPort(); + if (i != serverAddrList.size() - 1 ) { + favoredNodes += SERVER_NAME_SEPARATOR; + } + } + } + return favoredNodes; + } + + /** + * @param favoredNodes The joint string of the favored nodes. + * @return The array of the InetSocketAddress + */ + public static InetSocketAddress[] getFavoredInetSocketAddress( + String favoredNodes) { + String[] favoredNodesArray = StringUtils.split(favoredNodes, SERVER_NAME_SEPARATOR); + InetSocketAddress[] addresses = + new InetSocketAddress[favoredNodesArray.length]; + for (int i = 0; i < favoredNodesArray.length; i++) { + HServerAddress serverAddress = new HServerAddress(favoredNodesArray[i]); + addresses[i] = serverAddress.getInetSocketAddress(); + } + return addresses; + } + + /** + * @param serverList + * @return The array of the InetSocketAddress + */ + public static InetSocketAddress[] getFavoredInetSocketAddress( + List serverList) { + if (serverList == null || serverList.size() == 0) + return null; + + InetSocketAddress[] addresses = + new InetSocketAddress[serverList.size()]; + for (int i = 0; i < serverList.size(); i++) { + addresses[i] = serverList.get(i).getInetSocketAddress(); + } + return addresses; + } + + /** + * @param favoredNodes The bytes of favored nodes + * @return the list of HServerAddress for the byte array of favored nodes. + */ + public static List getFavoredNodesList(byte[] favoredNodes) { + String favoredNodesStr = Bytes.toString(favoredNodes); + return getFavoredNodeList(favoredNodesStr); + } + + /** + * @param favoredNodes The Stromg of favored nodes + * @return the list of HServerAddress for the byte array of favored nodes. + */ + public static List getFavoredNodeList(String favoredNodesStr) { + String[] favoredNodesArray = StringUtils.split(favoredNodesStr, SERVER_NAME_SEPARATOR); + if (favoredNodesArray == null) + return null; + + List serverList = new ArrayList(); + for (String hostNameAndPort : favoredNodesArray) { + serverList.add(new ServerName(hostNameAndPort)); + } + return serverList; + } + /** + * @param favoredNodes The byte array of the favored nodes + * @return string the favoredNodes generated by the byte array of favored nodes. + */ + public static String getFavoredNodes(byte[] favoredNodes) { + List serverList = getFavoredNodesList(favoredNodes); + String favoredNodesStr = getFavoredNodes(serverList); + return favoredNodesStr; + } + + /** + * Print the assignment plan to the system output stream + * @param plan + */ + public static void printAssignmentPlan(AssignmentPlan plan) { + if (plan == null) return; + LOG.info("========== Start to print the assignment plan ================"); + // sort the map based on region info + Map> assignmentMap = + new TreeMap>(plan.getAssignmentMap()); + + for (Map.Entry> entry : + assignmentMap.entrySet()) { + String serverList = RegionPlacement.getFavoredNodes(entry.getValue()); + String regionName = entry.getKey().getRegionNameAsString(); + LOG.info("Region: " + regionName ); + LOG.info("Its favored nodes: " + serverList); + } + LOG.info("========== Finish to print the assignment plan ================"); + } + + public static void main(String[] args) throws IOException, + InterruptedException { + // Set all the options + Options opt = new Options(); + opt.addOption("w", "write", false, "write the assignments to META only"); + opt.addOption("u", "update", false, + "update the assignments to META and RegionServers together"); + opt.addOption("n", "dry-run", false, "do not write assignments to META"); + opt.addOption("v", "verify", false, "verify current assignments against META"); + opt.addOption("p", "print", false, "print the current assignment plan in META"); + opt.addOption("h", "help", false, "print usage"); + opt.addOption("d", "verification-details", false, + "print the details of verification report"); + + opt.addOption("zk", true, "to set the zookeeper quorum"); + opt.addOption("fs", true, "to set HDFS"); + opt.addOption("hbase_root", true, "to set hbase_root directory"); + + opt.addOption("overwrite", false, + "overwrite the favored nodes for a single region," + + "for example: -update -r regionName -f server1:port,server2:port,server3:port"); + opt.addOption("r", true, "The region name that needs to be updated"); + opt.addOption("f", true, "The new favored nodes"); + + opt.addOption("tables", true, + "The list of table names splitted by ',' ;" + + "For example: -tables: t1,t2,...,tn"); + opt.addOption("l", "locality", true, "enforce the maxium locality"); + opt.addOption("m", "min-move", true, "enforce minium assignment move"); + opt.addOption("diff", false, "calculate difference between assignment plans"); + opt.addOption("munkres", false, + "use munkres to place secondaries and tertiaries"); + opt.addOption("ld", "locality-dispersion", false, "print locality and dispersion information for current plan"); + try { + // Set the log4j + Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR); + Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.ERROR); + Logger.getLogger("org.apache.hadoop.hbase.master.RegionPlacement"). + setLevel(Level.INFO); + + CommandLine cmd = new GnuParser().parse(opt, args); + Configuration conf = HBaseConfiguration.create(); + + boolean enforceMinAssignmentMove = true; + boolean enforceLocality = true; + boolean verificationDetails =false; + + // Read all the options + if ((cmd.hasOption("l") && + cmd.getOptionValue("l").equalsIgnoreCase("false")) || + (cmd.hasOption("locality") && + cmd.getOptionValue("locality").equalsIgnoreCase("false"))) { + enforceLocality = false; + } + + if ((cmd.hasOption("m") && + cmd.getOptionValue("m").equalsIgnoreCase("false")) || + (cmd.hasOption("min-move") && + cmd.getOptionValue("min-move").equalsIgnoreCase("false"))) { + enforceMinAssignmentMove = false; + } + + if (cmd.hasOption("zk")) { + conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue("zk")); + LOG.info("Setting the zk quorum: " + conf.get(HConstants.ZOOKEEPER_QUORUM)); + } + + if (cmd.hasOption("fs")) { + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, cmd.getOptionValue("fs")); + LOG.info("Setting the HDFS: " + conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); + } + + if (cmd.hasOption("hbase_root")) { + conf.set(HConstants.HBASE_DIR, cmd.getOptionValue("hbase_root")); + LOG.info("Setting the hbase root directory: " + conf.get(HConstants.HBASE_DIR)); + } + + // Create the region placement obj + RegionPlacement rp = new RegionPlacement(conf, enforceLocality, + enforceMinAssignmentMove); + + if (cmd.hasOption("d") || cmd.hasOption("verification-details")) { + verificationDetails = true; + } + + if (cmd.hasOption("tables")) { + String tableNameListStr = cmd.getOptionValue("tables"); + String[] tableNames = StringUtils.split(tableNameListStr, ","); + rp.setTargetTableName(tableNames); + } + + if (cmd.hasOption("munkres")) { + USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true; + } + + // Read all the modes + if (cmd.hasOption("v") || cmd.hasOption("verify")) { + // Verify the region placement. +// rp.verifyRegionPlacement(verificationDetails); +// } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) { +// // Generate the assignment plan only without updating the META and RS +// AssignmentPlan plan = rp.getNewAssignmentPlan(); +// RegionPlacement.printAssignmentPlan(plan); +// } else if (cmd.hasOption("w") || cmd.hasOption("write")) { +// // Generate the new assignment plan +// AssignmentPlan plan = rp.getNewAssignmentPlan(); +// // Print the new assignment plan +// RegionPlacement.printAssignmentPlan(plan); +// // Write the new assignment plan to META +// rp.updateAssignmentPlanToMeta(plan); +// } else if (cmd.hasOption("u") || cmd.hasOption("update")) { +// // Generate the new assignment plan +// AssignmentPlan plan = rp.getNewAssignmentPlan(); +// // Print the new assignment plan +// RegionPlacement.printAssignmentPlan(plan); +// // Update the assignment to META and Region Servers +// rp.updateAssignmentPlan(plan); +// } else if (cmd.hasOption("diff")) { +// AssignmentPlan newPlan = rp.getNewAssignmentPlan(); +// Map> locality = FSUtils +// .getRegionDegreeLocalityMappingFromFS(conf); +// Map movesPerTable = rp.getRegionsMovement(newPlan); +// rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan); +// System.out.println("Do you want to update the assignment plan? [y/n]"); +// Scanner s = new Scanner(System.in); +// String input = s.nextLine().trim(); +// if (input.equals("y")) { +// System.out.println("Updating assignment plan..."); +// rp.updateAssignmentPlan(newPlan); +// } +// s.close(); +// } else if (cmd.hasOption("ld")) { +// Map> locality = FSUtils +// .getRegionDegreeLocalityMappingFromFS(conf); +// rp.printLocalityAndDispersionForCurrentPlan(locality); +// } else if (cmd.hasOption("p") || cmd.hasOption("print")) { +// AssignmentPlan plan = rp.getExistingAssignmentPlan(); +// RegionPlacement.printAssignmentPlan(plan); +// } else if (cmd.hasOption("overwrite")) { +// if (!cmd.hasOption("f") || !cmd.hasOption("r")) { +// throw new IllegalArgumentException("Please specify: " + +// " -update -r regionName -f server1:port,server2:port,server3:port"); +// } +// +// String regionName = cmd.getOptionValue("r"); +// String favoredNodesStr = cmd.getOptionValue("f"); +// LOG.info("Going to update the region " + regionName + " with the new favored nodes " + +// favoredNodesStr); +// List favoredNodes = null; +// HRegionInfo regionInfo = +// rp.getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap().get(regionName); +// if (regionInfo == null) { +// LOG.error("Cannot find the region " + regionName + " from the META"); +// } else { +// try { +// favoredNodes = RegionPlacement.getFavoredNodeList(favoredNodesStr); +// } catch (IllegalArgumentException e) { +// LOG.error("Cannot parse the invalid favored nodes because " + e); +// } +// AssignmentPlan newPlan = new AssignmentPlan(); +// newPlan.updateAssignmentPlan(regionInfo, favoredNodes); +// rp.updateAssignmentPlan(newPlan); +// } + } else { + printHelp(opt); + } + } catch (ParseException e) { + printHelp(opt); + } + } + + private static void printHelp(Options opt) { + new HelpFormatter().printHelp( + "RegionPlacement < -w | -u | -n | -v | -t | -h | -overwrite -r regionName -f favoredNodes " + + "-diff>" + + " [-l false] [-m false] [-d] [-tables t1,t2,...tn] [-zk zk1,zk2,zk3]" + + " [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]", opt); + } + + /** + * @return the cached HBaseAdmin + * @throws IOException + */ + private HBaseAdmin getHBaseAdmin() throws IOException { + if (this.admin == null) { + this.admin = new HBaseAdmin(this.conf); + } + return this.admin; + } + + /** + * @return the new RegionAssignmentSnapshot + * @throws IOException + */ + public RegionAssignmentSnapshot getRegionAssignmentSnapshot() + throws IOException { + RegionAssignmentSnapshot currentAssignmentShapshot = + new RegionAssignmentSnapshot(this.conf); + currentAssignmentShapshot.initialize(); + return currentAssignmentShapshot; + } + + /** + * Some algorithms for solving the assignment problem may traverse workers or + * jobs in linear order which may result in skewing the assignments of the + * first jobs in the matrix toward the last workers in the matrix if the + * costs are uniform. To avoid this kind of clumping, we can randomize the + * rows and columns of the cost matrix in a reversible way, such that the + * solution to the assignment problem can be interpreted in terms of the + * original untransformed cost matrix. Rows and columns are transformed + * independently such that the elements contained in any row of the input + * matrix are the same as the elements in the corresponding output matrix, + * and each row has its elements transformed in the same way. Similarly for + * columns. + */ + protected static class RandomizedMatrix { + private final int rows; + private final int cols; + private final int[] rowTransform; + private final int[] rowInverse; + private final int[] colTransform; + private final int[] colInverse; + + /** + * Create a randomization scheme for a matrix of a given size. + * @param rows the number of rows in the matrix + * @param cols the number of columns in the matrix + */ + public RandomizedMatrix(int rows, int cols) { + this.rows = rows; + this.cols = cols; + Random random = new Random(); + rowTransform = new int[rows]; + rowInverse = new int[rows]; + for (int i = 0; i < rows; i++) { + rowTransform[i] = i; + } + // Shuffle the row indices. + for (int i = rows - 1; i >= 0; i--) { + int r = random.nextInt(i + 1); + int temp = rowTransform[r]; + rowTransform[r] = rowTransform[i]; + rowTransform[i] = temp; + } + // Generate the inverse row indices. + for (int i = 0; i < rows; i++) { + rowInverse[rowTransform[i]] = i; + } + + colTransform = new int[cols]; + colInverse = new int[cols]; + for (int i = 0; i < cols; i++) { + colTransform[i] = i; + } + // Shuffle the column indices. + for (int i = cols - 1; i >= 0; i--) { + int r = random.nextInt(i + 1); + int temp = colTransform[r]; + colTransform[r] = colTransform[i]; + colTransform[i] = temp; + } + // Generate the inverse column indices. + for (int i = 0; i < cols; i++) { + colInverse[colTransform[i]] = i; + } + } + + /** + * Copy a given matrix into a new matrix, transforming each row index and + * each column index according to the randomization scheme that was created + * at construction time. + * @param matrix the cost matrix to transform + * @return a new matrix with row and column indices transformed + */ + public float[][] transform(float[][] matrix) { + float[][] result = new float[rows][cols]; + for (int i = 0; i < rows; i++) { + for (int j = 0; j < cols; j++) { + result[rowTransform[i]][colTransform[j]] = matrix[i][j]; + } + } + return result; + } + + /** + * Copy a given matrix into a new matrix, transforming each row index and + * each column index according to the inverse of the randomization scheme + * that was created at construction time. + * @param matrix the cost matrix to be inverted + * @return a new matrix with row and column indices inverted + */ + public float[][] invert(float[][] matrix) { + float[][] result = new float[rows][cols]; + for (int i = 0; i < rows; i++) { + for (int j = 0; j < cols; j++) { + result[rowInverse[i]][colInverse[j]] = matrix[i][j]; + } + } + return result; + } + + /** + * Given an array where each element {@code indices[i]} represents the + * randomized column index corresponding to randomized row index {@code i}, + * create a new array with the corresponding inverted indices. + * @param indices an array of transformed indices to be inverted + * @return an array of inverted indices + */ + public int[] invertIndices(int[] indices) { + int[] result = new int[indices.length]; + for (int i = 0; i < indices.length; i++) { + result[rowInverse[i]] = colInverse[indices[i]]; + } + return result; + } + } + + /** + * Return how many regions will move per table since their primary RS will + * change + * + * @param newPlanMap - new AssignmentPlan + * @return how many primaries will move per table + */ + public Map getRegionsMovement(AssignmentPlan newPlan) + throws IOException { + Map movesPerTable = new HashMap(); + RegionAssignmentSnapshot snapshot = this.getRegionAssignmentSnapshot(); + Map> tableToRegions = snapshot + .getTableToRegionMap(); + AssignmentPlan oldPlan = snapshot.getExistingAssignmentPlan(); + Set tables = snapshot.getTableSet(); + for (String table : tables) { + int movedPrimaries = 0; + if (!this.targetTableSet.isEmpty() + && !this.targetTableSet.contains(table)) { + continue; + } + List regions = tableToRegions.get(table); + for (HRegionInfo region : regions) { + List oldServers = oldPlan.getAssignment(region); + List newServers = newPlan.getAssignment(region); + if (oldServers != null && newServers != null) { + ServerName oldPrimary = oldServers.get(0); + ServerName newPrimary = newServers.get(0); + if (oldPrimary.compareTo(newPrimary) != 0) { + movedPrimaries++; + } + } + } + movesPerTable.put(table, movedPrimaries); + } + return movesPerTable; + } + + /** + * Compares two plans and check whether the locality dropped or increased + * (prints the information as a string) also prints the baseline locality + * + * @param movesPerTable - how many primary regions will move per table + * @param regionLocalityMap - locality map from FS + * @param newPlan - new assignment plan + * @param do we want to run verification report + * @throws IOException + */ + public void checkDifferencesWithOldPlan(Map movesPerTable, + Map> regionLocalityMap, AssignmentPlan newPlan) + throws IOException { + // localities for primary, secondary and tertiary + RegionAssignmentSnapshot snapshot = this.getRegionAssignmentSnapshot(); + AssignmentPlan oldPlan = snapshot.getExistingAssignmentPlan(); + Set tables = snapshot.getTableSet(); + Map> tableToRegionsMap = snapshot.getTableToRegionMap(); + for (String table : tables) { + float[] deltaLocality = new float[3]; + float[] locality = new float[3]; + if (!this.targetTableSet.isEmpty() + && !this.targetTableSet.contains(table)) { + continue; + } + List regions = tableToRegionsMap.get(table); + System.out.println("=================================================="); + System.out.println("Assignment Plan Projection Report For Table: " + table); + System.out.println("\t Total regions: " + regions.size()); + System.out.println("\t" + movesPerTable.get(table) + + " primaries will move due to their primary has changed"); + for (HRegionInfo currentRegion : regions) { + Map regionLocality = regionLocalityMap.get(currentRegion + .getEncodedName()); + if (regionLocality == null) { + continue; + } + List oldServers = oldPlan.getAssignment(currentRegion); + List newServers = newPlan.getAssignment(currentRegion); + if (newServers != null && oldServers != null) { + int i=0; + for (AssignmentPlan.POSITION p : AssignmentPlan.POSITION.values()) { + ServerName newServer = newServers.get(p.ordinal()); + ServerName oldServer = oldServers.get(p.ordinal()); + Float oldLocality = 0f; + if (oldServers != null) { + oldLocality = regionLocality.get(oldServer.getHostname()); + if (oldLocality == null) { + oldLocality = 0f; + } + locality[i] += oldLocality; + } + Float newLocality = regionLocality.get(newServer.getHostname()); + if (newLocality == null) { + newLocality = 0f; + } + deltaLocality[i] += newLocality - oldLocality; + i++; + } + } + } + DecimalFormat df = new java.text.DecimalFormat( "#.##"); + for (int i = 0; i < deltaLocality.length; i++) { + System.out.print("\t\t Baseline locality for "); + if (i == 0) { + System.out.print("primary "); + } else if (i == 1) { + System.out.print("secondary "); + } else if (i == 2) { + System.out.print("tertiary "); + } + System.out.println(df.format(100 * locality[i] / regions.size()) + "%"); + System.out.print("\t\t Locality will change with the new plan: "); + System.out.println(df.format(100 * deltaLocality[i] / regions.size()) + + "%"); + } + System.out.println("\t Baseline dispersion"); +// printDispersionScores(table, snapshot, regions.size(), null, true); +// System.out.println("\t Projected dispersion"); +// printDispersionScores(table, snapshot, regions.size(), newPlan, true); + } + } + + public void printLocalityAndDispersionForCurrentPlan( + Map> regionLocalityMap) throws IOException { + RegionAssignmentSnapshot snapshot = this.getRegionAssignmentSnapshot(); + AssignmentPlan assignmentPlan = snapshot.getExistingAssignmentPlan(); + Set tables = snapshot.getTableSet(); + Map> tableToRegionsMap = snapshot + .getTableToRegionMap(); + for (String table : tables) { + float[] locality = new float[3]; + if (!this.targetTableSet.isEmpty() + && !this.targetTableSet.contains(table)) { + continue; + } + List regions = tableToRegionsMap.get(table); + for (HRegionInfo currentRegion : regions) { + Map regionLocality = regionLocalityMap.get(currentRegion + .getEncodedName()); + if (regionLocality == null) { + continue; + } + List servers = assignmentPlan.getAssignment(currentRegion); + if (servers != null) { + int i = 0; + for (AssignmentPlan.POSITION p : AssignmentPlan.POSITION.values()) { + ServerName server = servers.get(p.ordinal()); + Float currentLocality = 0f; + if (servers != null) { + currentLocality = regionLocality.get(server.getHostname()); + if (currentLocality == null) { + currentLocality = 0f; + } + locality[i] += currentLocality; + } + i++; + } + } + } + for (int i = 0; i < locality.length; i++) { + String copy = null; + if (i == 0) { + copy = "primary"; + } else if (i == 1) { + copy = "secondary"; + } else if (i == 2) { + copy = "tertiary" ; + } + float avgLocality = 100 * locality[i] / regions.size(); + LOG.info("For Table: " + table + " ; #Total Regions: " + regions.size() + + " ; The average locality for " + copy+ " is " + avgLocality + " %"); + } +// printDispersionScores(table, snapshot, regions.size(), null, false); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementPolicy.java new file mode 100644 index 0000000..639e302 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementPolicy.java @@ -0,0 +1,19 @@ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HRegionInfo; + +public interface RegionPlacementPolicy { + +/** + * Get the assignment plan for the new regions + * @param regions + * @param servers + * @return the favored assignment plan for the regions. + * @throws IOException + */ + public AssignmentPlan getNewAssignmentPlan(HRegionInfo[] regions, + AssignmentDomain domain) throws IOException; + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java index 0130f51..3539914 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java @@ -23,6 +23,7 @@ import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; @@ -36,22 +37,28 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.RegionPlacement; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.Writables; import org.apache.zookeeper.KeeperException; /** @@ -66,10 +73,12 @@ public class CreateTableHandler extends EventHandler { private final AssignmentManager assignmentManager; private final CatalogTracker catalogTracker; private final HRegionInfo [] newRegions; + private final Map> assignmentMap; public CreateTableHandler(Server server, MasterFileSystem fileSystemManager, HTableDescriptor hTableDescriptor, Configuration conf, HRegionInfo [] newRegions, - CatalogTracker catalogTracker, AssignmentManager assignmentManager) + CatalogTracker catalogTracker, AssignmentManager assignmentManager, + Map> assignmentMap) throws NotAllMetaRegionsOnlineException, TableExistsException, IOException { super(server, EventType.C_M_CREATE_TABLE); @@ -79,6 +88,7 @@ public class CreateTableHandler extends EventHandler { this.newRegions = newRegions; this.catalogTracker = catalogTracker; this.assignmentManager = assignmentManager; + this.assignmentMap = assignmentMap; int timeout = conf.getInt("hbase.client.catalog.timeout", 10000); // Need META availability to create a table @@ -186,14 +196,14 @@ public class CreateTableHandler extends EventHandler { regionOpenAndInitThreadPool.shutdownNow(); } if (regionInfos.size() > 0) { - MetaEditor.addRegionsToMeta(this.catalogTracker, regionInfos); + MetaEditor.addRegionsToMeta(this.catalogTracker, regionInfos, assignmentMap); } // 4. Trigger immediate assignment of the regions in round-robin fashion try { List regions = Arrays.asList(newRegions); assignmentManager.getRegionStates().createRegionStates(regions); - assignmentManager.assign(regions); + assignmentManager.assignWithFavoredNodesPlan(regions, assignmentMap); } catch (InterruptedException ie) { LOG.error("Caught " + ie + " during round-robin assignment"); throw new IOException(ie); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index d987ff8..32be44d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.io.UnsupportedEncodingException; import java.lang.reflect.Constructor; +import java.net.InetSocketAddress; import java.text.ParseException; import java.util.AbstractList; import java.util.ArrayList; @@ -84,6 +85,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionTooBusyException; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.client.Append; @@ -129,6 +131,7 @@ import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.MultipleIOException; +import org.apache.hadoop.io.SetFile; import org.apache.hadoop.util.StringUtils; import org.cliffc.high_scale_lib.Counter; @@ -197,6 +200,10 @@ public class HRegion implements HeapSize { // , Writable{ protected long completeSequenceId = -1L; + // When writing store files for this region, replicas will preferrably be + // placed on these nodes, if non-null. + private InetSocketAddress[] favoredNodes = null; + ////////////////////////////////////////////////////////////////////////////// // Members ////////////////////////////////////////////////////////////////////////////// @@ -1786,9 +1793,28 @@ public class HRegion implements HeapSize { // , Writable{ } } + /** + * @return the nodes on which to place replicas of all store files, or null if + * there are no favored nodes. + */ + public InetSocketAddress[] getFavoredNodes() { + return this.favoredNodes; + } + ////////////////////////////////////////////////////////////////////////////// // set() methods for client use. ////////////////////////////////////////////////////////////////////////////// + + /** + * Set the favored nodes on which to place replicas of all store files. The + * array can be null to set no preference for favored nodes, but elements of + * the array must not be null. Placement of replicas on favored nodes is best- + * effort only and the filesystem may choose different nodes. + * @param favoredNodes the favored nodes, or null + */ + public void setFavoredNodes(InetSocketAddress[] favoredNodes) { + this.favoredNodes = favoredNodes; + } /** * @param delete delete object * @param writeToWAL append to the write ahead lock or not @@ -3886,6 +3912,7 @@ public class HRegion implements HeapSize { // , Writable{ * @param hlog shared HLog * @param initialize - true to initialize the region * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable + * @param favoredNodes - the list of favored nodes for this region * @return new HRegion * @throws IOException */ @@ -3913,6 +3940,7 @@ public class HRegion implements HeapSize { // , Writable{ } HRegion region = HRegion.newHRegion(tableDir, effectiveHLog, fs, conf, info, hTableDescriptor, null); + if (initialize) { region.initialize(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MunkresAssignment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MunkresAssignment.java new file mode 100644 index 0000000..2456b24 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MunkresAssignment.java @@ -0,0 +1,514 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Arrays; +import java.util.Deque; +import java.util.LinkedList; + +/** + * Computes the optimal (minimal cost) assignment of jobs to workers (or other + * analogous) concepts given a cost matrix of each pair of job and worker, using + * the algorithm by James Munkres in "Algorithms for the Assignment and + * Transportation Problems", with additional optimizations as described by Jin + * Kue Wong in "A New Implementation of an Algorithm for the Optimal Assignment + * Problem: An Improved Version of Munkres' Algorithm". The algorithm runs in + * O(n^3) time and need O(n^2) auxiliary space where n is the number of jobs or + * workers, whichever is greater. + */ +public class MunkresAssignment { + + // The original algorithm by Munkres uses the terms STAR and PRIME to denote + // different states of zero values in the cost matrix. These values are + // represented as byte constants instead of enums to save space in the mask + // matrix by a factor of 4n^2 where n is the size of the problem. + private static final byte NONE = 0; + private static final byte STAR = 1; + private static final byte PRIME = 2; + + // The algorithm requires that the number of column is at least as great as + // the number of rows. If that is not the case, then the cost matrix should + // be transposed before computation, and the solution matrix transposed before + // returning to the caller. + private final boolean transposed; + + // The number of rows of internal matrices. + private final int rows; + + // The number of columns of internal matrices. + private final int cols; + + // The cost matrix, the cost of assigning each row index to column index. + private float[][] cost; + + // Mask of zero cost assignment states. + private byte[][] mask; + + // Covering some rows of the cost matrix. + private boolean[] rowsCovered; + + // Covering some columns of the cost matrix. + private boolean[] colsCovered; + + // The alternating path between starred zeroes and primed zeroes + private Deque> path; + + // The solution, marking which rows should be assigned to which columns. The + // positions of elements in this array correspond to the rows of the cost + // matrix, and the value of each element correspond to the columns of the cost + // matrix, i.e. assignments[i] = j indicates that row i should be assigned to + // column j. + private int[] assignments; + + // Improvements described by Jin Kue Wong cache the least value in each row, + // as well as the column index of the least value in each row, and the pending + // adjustments to each row and each column. + private float[] leastInRow; + private int[] leastInRowIndex; + private float[] rowAdjust; + private float[] colAdjust; + + /** + * Construct a new problem instance with the specified cost matrix. The cost + * matrix must be rectangular, though not necessarily square. If one dimension + * is greater than the other, some elements in the greater dimension will not + * be assigned. The input cost matrix will not be modified. + * @param costMatrix + */ + public MunkresAssignment(float[][] costMatrix) { + // The algorithm assumes that the number of columns is at least as great as + // the number of rows. If this is not the case of the input matrix, then + // all internal structures must be transposed relative to the input. + this.transposed = costMatrix.length > costMatrix[0].length; + if (this.transposed) { + this.rows = costMatrix[0].length; + this.cols = costMatrix.length; + } else { + this.rows = costMatrix.length; + this.cols = costMatrix[0].length; + } + + cost = new float[rows][cols]; + mask = new byte[rows][cols]; + rowsCovered = new boolean[rows]; + colsCovered = new boolean[cols]; + path = new LinkedList>(); + + leastInRow = new float[rows]; + leastInRowIndex = new int[rows]; + rowAdjust = new float[rows]; + colAdjust = new float[cols]; + + assignments = null; + + // Copy cost matrix. + if (transposed) { + for (int r = 0; r < rows; r++) { + for (int c = 0; c < cols; c++) { + cost[r][c] = costMatrix[c][r]; + } + } + } else { + for (int r = 0; r < rows; r++) { + for (int c = 0; c < cols; c++) { + cost[r][c] = costMatrix[r][c]; + } + } + } + + // Costs must be finite otherwise the matrix can get into a bad state where + // no progress can be made. If your use case depends on a distinction + // between costs of MAX_VALUE and POSITIVE_INFINITY, you're doing it wrong. + for (int r = 0; r < rows; r++) { + for (int c = 0; c < cols; c++) { + if (cost[r][c] == Float.POSITIVE_INFINITY) { + cost[r][c] = Float.MAX_VALUE; + } + } + } + } + + /** + * Get the optimal assignments. The returned array will have the same number + * of elements as the number of elements as the number of rows in the input + * cost matrix. Each element will indicate which column should be assigned to + * that row or -1 if no column should be assigned, i.e. if result[i] = j then + * row i should be assigned to column j. Subsequent invocations of this method + * will simply return the same object without additional computation. + * @return an array with the optimal assignments + */ + public int[] solve() { + // If this assignment problem has already been solved, return the known + // solution + if (assignments != null) { + return assignments; + } + + preliminaries(); + + // Find the optimal assignments. + while (!testIsDone()) { + while (!stepOne()) { + stepThree(); + } + stepTwo(); + } + + // Extract the assignments from the mask matrix. + if (transposed) { + assignments = new int[cols]; + outer: + for (int c = 0; c < cols; c++) { + for (int r = 0; r < rows; r++) { + if (mask[r][c] == STAR) { + assignments[c] = r; + continue outer; + } + } + // There is no assignment for this row of the input/output. + assignments[c] = -1; + } + } else { + assignments = new int[rows]; + outer: + for (int r = 0; r < rows; r++) { + for (int c = 0; c < cols; c++) { + if (mask[r][c] == STAR) { + assignments[r] = c; + continue outer; + } + } + } + } + + // Once the solution has been computed, there is no need to keep any of the + // other internal structures. Clear all unnecessary internal references so + // the garbage collector may reclaim that memory. + cost = null; + mask = null; + rowsCovered = null; + colsCovered = null; + path = null; + leastInRow = null; + leastInRowIndex = null; + rowAdjust = null; + colAdjust = null; + + return assignments; + } + + /** + * Corresponds to the "preliminaries" step of the original algorithm. + * Guarantees that the matrix is an equivalent non-negative matrix with at + * least one zero in each row. + */ + private void preliminaries() { + for (int r = 0; r < rows; r++) { + // Find the minimum cost of each row. + float min = Float.POSITIVE_INFINITY; + for (int c = 0; c < cols; c++) { + min = Math.min(min, cost[r][c]); + } + + // Subtract that minimum cost from each element in the row. + for (int c = 0; c < cols; c++) { + cost[r][c] -= min; + + // If the element is now zero and there are no zeroes in the same row + // or column which are already starred, then star this one. There + // must be at least one zero because of subtracting the min cost. + if (cost[r][c] == 0 && !rowsCovered[r] && !colsCovered[c]) { + mask[r][c] = STAR; + // Cover this row and column so that no other zeroes in them can be + // starred. + rowsCovered[r] = true; + colsCovered[c] = true; + } + } + } + + // Clear the covered rows and columns. + Arrays.fill(rowsCovered, false); + Arrays.fill(colsCovered, false); + } + + /** + * Test whether the algorithm is done, i.e. we have the optimal assignment. + * This occurs when there is exactly one starred zero in each row. + * @return true if the algorithm is done + */ + private boolean testIsDone() { + // Cover all columns containing a starred zero. There can be at most one + // starred zero per column. Therefore, a covered column has an optimal + // assignment. + for (int r = 0; r < rows; r++) { + for (int c = 0; c < cols; c++) { + if (mask[r][c] == STAR) { + colsCovered[c] = true; + } + } + } + + // Count the total number of covered columns. + int coveredCols = 0; + for (int c = 0; c < cols; c++) { + coveredCols += colsCovered[c] ? 1 : 0; + } + + // Apply an row and column adjustments that are pending. + for (int r = 0; r < rows; r++) { + for (int c = 0; c < cols; c++) { + cost[r][c] += rowAdjust[r]; + cost[r][c] += colAdjust[c]; + } + } + + // Clear the pending row and column adjustments. + Arrays.fill(rowAdjust, 0); + Arrays.fill(colAdjust, 0); + + // The covers on columns and rows may have been reset, recompute the least + // value for each row. + for (int r = 0; r < rows; r++) { + leastInRow[r] = Float.POSITIVE_INFINITY; + for (int c = 0; c < cols; c++) { + if (!rowsCovered[r] && !colsCovered[c] && cost[r][c] < leastInRow[r]) { + leastInRow[r] = cost[r][c]; + leastInRowIndex[r] = c; + } + } + } + + // If all columns are covered, then we are done. Since there may be more + // columns than rows, we are also done if the number of covered columns is + // at least as great as the number of rows. + return (coveredCols == cols || coveredCols >= rows); + } + + /** + * Corresponds to step 1 of the original algorithm. + * @return false if all zeroes are covered + */ + private boolean stepOne() { + while (true) { + Pair zero = findUncoveredZero(); + if (zero == null) { + // No uncovered zeroes, need to manipulate the cost matrix in step + // three. + return false; + } else { + // Prime the uncovered zero and find a starred zero in the same row. + mask[zero.getFirst()][zero.getSecond()] = PRIME; + Pair star = starInRow(zero.getFirst()); + if (star != null) { + // Cover the row with both the newly primed zero and the starred zero. + // Since this is the only place where zeroes are primed, and we cover + // it here, and rows are only uncovered when primes are erased, then + // there can be at most one primed uncovered zero. + rowsCovered[star.getFirst()] = true; + colsCovered[star.getSecond()] = false; + updateMin(star.getFirst(), star.getSecond()); + } else { + // Will go to step two after, where a path will be constructed, + // starting from the uncovered primed zero (there is only one). Since + // we have already found it, save it as the first node in the path. + path.clear(); + path.offerLast(new Pair(zero.getFirst(), + zero.getSecond())); + return true; + } + } + } + } + + /** + * Corresponds to step 2 of the original algorithm. + */ + private void stepTwo() { + // Construct a path of alternating starred zeroes and primed zeroes, where + // each starred zero is in the same column as the previous primed zero, and + // each primed zero is in the same row as the previous starred zero. The + // path will always end in a primed zero. + while (true) { + Pair star = starInCol(path.getLast().getSecond()); + if (star != null) { + path.offerLast(star); + } else { + break; + } + Pair prime = primeInRow(path.getLast().getFirst()); + path.offerLast(prime); + } + + // Augment path - unmask all starred zeroes and star all primed zeroes. All + // nodes in the path will be either starred or primed zeroes. The set of + // starred zeroes is independent and now one larger than before. + for (Pair p : path) { + if (mask[p.getFirst()][p.getSecond()] == STAR) { + mask[p.getFirst()][p.getSecond()] = NONE; + } else { + mask[p.getFirst()][p.getSecond()] = STAR; + } + } + + // Clear all covers from rows and columns. + Arrays.fill(rowsCovered, false); + Arrays.fill(colsCovered, false); + + // Remove the prime mask from all primed zeroes. + for (int r = 0; r < rows; r++) { + for (int c = 0; c < cols; c++) { + if (mask[r][c] == PRIME) { + mask[r][c] = NONE; + } + } + } + } + + /** + * Corresponds to step 3 of the original algorithm. + */ + private void stepThree() { + // Find the minimum uncovered cost. + float min = leastInRow[0]; + for (int r = 1; r < rows; r++) { + if (leastInRow[r] < min) { + min = leastInRow[r]; + } + } + + // Add the minimum cost to each of the costs in a covered row, or subtract + // the minimum cost from each of the costs in an uncovered column. As an + // optimization, do not actually modify the cost matrix yet, but track the + // adjustments that need to be made to each row and column. + for (int r = 0; r < rows; r++) { + if (rowsCovered[r]) { + rowAdjust[r] += min; + } + } + for (int c = 0; c < cols; c++) { + if (!colsCovered[c]) { + colAdjust[c] -= min; + } + } + + // Since the cost matrix is not being updated yet, the minimum uncovered + // cost per row must be updated. + for (int r = 0; r < rows; r++) { + if (!colsCovered[leastInRowIndex[r]]) { + // The least value in this row was in an uncovered column, meaning that + // it would have had the minimum value subtracted from it, and therefore + // will still be the minimum value in that row. + leastInRow[r] -= min; + } else { + // The least value in this row was in a covered column and would not + // have had the minimum value subtracted from it, so the minimum value + // could be some in another column. + for (int c = 0; c < cols; c++) { + if (cost[r][c] + colAdjust[c] + rowAdjust[r] < leastInRow[r]) { + leastInRow[r] = cost[r][c] + colAdjust[c] + rowAdjust[r]; + leastInRowIndex[r] = c; + } + } + } + } + } + + /** + * Find a zero cost assignment which is not covered. If there are no zero cost + * assignments which are uncovered, then null will be returned. + * @return pair of row and column indices of an uncovered zero or null + */ + private Pair findUncoveredZero() { + for (int r = 0; r < rows; r++) { + if (leastInRow[r] == 0) { + return new Pair(r, leastInRowIndex[r]); + } + } + return null; + } + + /** + * A specified row has become covered, and a specified column has become + * uncovered. The least value per row may need to be updated. + * @param row the index of the row which was just covered + * @param col the index of the column which was just uncovered + */ + private void updateMin(int row, int col) { + // If the row is covered we want to ignore it as far as least values go. + leastInRow[row] = Float.POSITIVE_INFINITY; + + for (int r = 0; r < rows; r++) { + // Since the column has only just been uncovered, it could not have any + // pending adjustments. Only covered rows can have pending adjustments + // and covered costs do not count toward row minimums. Therefore, we do + // not need to consider rowAdjust[r] or colAdjust[col]. + if (!rowsCovered[r] && cost[r][col] < leastInRow[r]) { + leastInRow[r] = cost[r][col]; + leastInRowIndex[r] = col; + } + } + } + + /** + * Find a starred zero in a specified row. If there are no starred zeroes in + * the specified row, then null will be returned. + * @param r the index of the row to be searched + * @return pair of row and column indices of starred zero or null + */ + private Pair starInRow(int r) { + for (int c = 0; c < cols; c++) { + if (mask[r][c] == STAR) { + return new Pair(r, c); + } + } + return null; + } + + /** + * Find a starred zero in the specified column. If there are no starred zeroes + * in the specified row, then null will be returned. + * @param c the index of the column to be searched + * @return pair of row and column indices of starred zero or null + */ + private Pair starInCol(int c) { + for (int r = 0; r < rows; r++) { + if (mask[r][c] == STAR) { + return new Pair(r, c); + } + } + return null; + } + + /** + * Find a primed zero in the specified row. If there are no primed zeroes in + * the specified row, then null will be returned. + * @param r the index of the row to be searched + * @return pair of row and column indices of primed zero or null + */ + private Pair primeInRow(int r) { + for (int c = 0; c < cols; c++) { + if (mask[r][c] == PRIME) { + return new Pair(r, c); + } + } + return null; + } +}