diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index bc9e23b..aa7328f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -39,6 +39,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseIOException; @@ -495,7 +496,7 @@ public class HBaseAdmin implements Abortable, Closeable { } catch (SocketTimeoutException ste) { LOG.warn("Creating " + desc.getTableName() + " took too long", ste); } - int numRegs = splitKeys == null ? 1 : splitKeys.length + 1; + int numRegs = (splitKeys == null ? 1 : splitKeys.length + 1) * desc.getRegionReplication(); int prevRegCount = 0; boolean doneWithMetaScan = false; for (int tries = 0; tries < this.numRetries * this.retryLongerMultiplier; @@ -506,19 +507,26 @@ public class HBaseAdmin implements Abortable, Closeable { MetaScannerVisitor visitor = new MetaScannerVisitorBase() { @Override public boolean processRow(Result rowResult) throws IOException { - HRegionInfo info = HRegionInfo.getHRegionInfo(rowResult); - if (info == null) { + RegionLocations list = MetaReader.getRegionLocations(rowResult); + if (list == null) { LOG.warn("No serialized HRegionInfo in " + rowResult); return true; } - if (!info.getTable().equals(desc.getTableName())) { + HRegionLocation l = list.getRegionLocation(); + if (l == null) { + return true; + } + if (!l.getRegionInfo().getTable().equals(desc.getTableName())) { return false; } - ServerName serverName = HRegionInfo.getServerName(rowResult); - // Make sure that regions are assigned to server - if (!(info.isOffline() || info.isSplit()) && serverName != null - && serverName.getHostAndPort() != null) { - actualRegCount.incrementAndGet(); + if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true; + HRegionLocation[] locations = list.getRegionLocations(); + for (HRegionLocation location : locations) { + ServerName serverName = location.getServerName(); + // Make sure that regions are assigned to server + if (serverName != null && serverName.getHostAndPort() != null) { + actualRegCount.incrementAndGet(); + } } return true; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java index abe9bf5..b0ef514 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java @@ -62,4 +62,9 @@ public class RegionReplicaUtil { return getRegionInfoForReplica(regionInfo, DEFAULT_REPLICA_ID); } + /** @return true if this region is a default replica for the region */ + public static boolean isDefaultReplica(HRegionInfo hri) { + return hri.getReplicaId() == DEFAULT_REPLICA_ID; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java index 5b5446b..ce200fa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; @@ -270,7 +271,9 @@ public class MetaEditor extends MetaReader { throws IOException { List puts = new ArrayList(); for (HRegionInfo regionInfo : regionInfos) { - puts.add(makePutFromRegionInfo(regionInfo)); + if (RegionReplicaUtil.isDefaultReplica(regionInfo)) { + puts.add(makePutFromRegionInfo(regionInfo)); + } } putsToMetaTable(catalogTracker, puts); LOG.info("Added " + puts.size()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index dc2f93a..6deb348 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionTransition; import org.apache.hadoop.hbase.Server; @@ -56,6 +57,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.executor.EventHandler; @@ -2548,19 +2550,22 @@ public class AssignmentManager extends ZooKeeperListener { boolean retainAssignment = server.getConfiguration(). getBoolean("hbase.master.startup.retainassign", true); + Set regionsFromMetaScan = allRegions.keySet(); if (retainAssignment) { assign(allRegions); } else { - List regions = new ArrayList(allRegions.keySet()); + List regions = new ArrayList(regionsFromMetaScan); assign(regions); } - for (HRegionInfo hri : allRegions.keySet()) { + for (HRegionInfo hri : regionsFromMetaScan) { TableName tableName = hri.getTable(); if (!zkTable.isEnabledTable(tableName)) { setEnabledTable(tableName); } } + // assign all the replicas that were not recorded in the meta + assign(HMaster.replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server)); } /** @@ -2612,62 +2617,68 @@ public class AssignmentManager extends ZooKeeperListener { new TreeMap>(); // Iterate regions in META for (Result result : results) { - Pair region = HRegionInfo.getHRegionInfoAndServerName(result); - if (region == null) continue; - HRegionInfo regionInfo = region.getFirst(); - ServerName regionLocation = region.getSecond(); + //Pair region = HRegionInfo.getHRegionInfoAndServerName(result); + HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(result); if (regionInfo == null) continue; - regionStates.createRegionState(regionInfo); - if (regionStates.isRegionInState(regionInfo, State.SPLIT)) { - // Split is considered to be completed. If the split znode still - // exists, the region will be put back to SPLITTING state later - LOG.debug("Region " + regionInfo.getRegionNameAsString() - + " split is completed. Hence need not add to regions list"); - continue; - } - TableName tableName = regionInfo.getTable(); - if (regionLocation == null) { - // regionLocation could be null if createTable didn't finish properly. - // When createTable is in progress, HMaster restarts. - // Some regions have been added to hbase:meta, but have not been assigned. - // When this happens, the region's table must be in ENABLING state. - // It can't be in ENABLED state as that is set when all regions are - // assigned. - // It can't be in DISABLING state, because DISABLING state transitions - // from ENABLED state when application calls disableTable. - // It can't be in DISABLED state, because DISABLED states transitions - // from DISABLING state. - if (!enablingTables.contains(tableName)) { - LOG.warn("Region " + regionInfo.getEncodedName() + - " has null regionLocation." + " But its table " + tableName + - " isn't in ENABLING state."); - } - } else if (!onlineServers.contains(regionLocation)) { - // Region is located on a server that isn't online - List offlineRegions = offlineServers.get(regionLocation); - if (offlineRegions == null) { - offlineRegions = new ArrayList(1); - offlineServers.put(regionLocation, offlineRegions); - } - offlineRegions.add(regionInfo); - // need to enable the table if not disabled or disabling or enabling - // this will be used in rolling restarts - if (!disabledOrDisablingOrEnabling.contains(tableName) - && !getZKTable().isEnabledTable(tableName)) { - setEnabledTable(tableName); - } - } else { - // Region is being served and on an active server - // add only if region not in disabled or enabling table - if (!disabledOrEnablingTables.contains(tableName)) { - regionStates.updateRegionState(regionInfo, State.OPEN, regionLocation); - regionStates.regionOnline(regionInfo, regionLocation); + HRegionLocation[] locations = MetaReader.getRegionLocations(result).getRegionLocations(); + if (locations == null) continue; + int replicaId = 0; + // Do the operations for all the replicas + for (HRegionLocation hrl : locations) { + ServerName regionLocation = hrl.getServerName(); + regionInfo = RegionReplicaUtil.getRegionInfoForReplica(regionInfo, replicaId++); + regionStates.createRegionState(regionInfo); + if (regionStates.isRegionInState(regionInfo, State.SPLIT)) { + // Split is considered to be completed. If the split znode still + // exists, the region will be put back to SPLITTING state later + LOG.debug("Region " + regionInfo.getRegionNameAsString() + + " split is completed. Hence need not add to regions list"); + continue; } - // need to enable the table if not disabled or disabling or enabling - // this will be used in rolling restarts - if (!disabledOrDisablingOrEnabling.contains(tableName) - && !getZKTable().isEnabledTable(tableName)) { - setEnabledTable(tableName); + TableName tableName = regionInfo.getTable(); + if (regionLocation == null) { + // regionLocation could be null if createTable didn't finish properly. + // When createTable is in progress, HMaster restarts. + // Some regions have been added to hbase:meta, but have not been assigned. + // When this happens, the region's table must be in ENABLING state. + // It can't be in ENABLED state as that is set when all regions are + // assigned. + // It can't be in DISABLING state, because DISABLING state transitions + // from ENABLED state when application calls disableTable. + // It can't be in DISABLED state, because DISABLED states transitions + // from DISABLING state. + if (!enablingTables.contains(tableName)) { + LOG.warn("Region " + regionInfo.getEncodedName() + + " has null regionLocation." + " But its table " + tableName + + " isn't in ENABLING state."); + } + } else if (!onlineServers.contains(regionLocation)) { + // Region is located on a server that isn't online + List offlineRegions = offlineServers.get(regionLocation); + if (offlineRegions == null) { + offlineRegions = new ArrayList(1); + offlineServers.put(regionLocation, offlineRegions); + } + offlineRegions.add(regionInfo); + // need to enable the table if not disabled or disabling or enabling + // this will be used in rolling restarts + if (!disabledOrDisablingOrEnabling.contains(tableName) + && !getZKTable().isEnabledTable(tableName)) { + setEnabledTable(tableName); + } + } else { + // Region is being served and on an active server + // add only if region not in disabled or enabling table + if (!disabledOrEnablingTables.contains(tableName)) { + regionStates.updateRegionState(regionInfo, State.OPEN, regionLocation); + regionStates.regionOnline(regionInfo, regionLocation); + } + // need to enable the table if not disabled or disabling or enabling + // this will be used in rolling restarts + if (!disabledOrDisablingOrEnabling.contains(tableName) + && !getZKTable().isEnabledTable(tableName)) { + setEnabledTable(tableName); + } } } } @@ -3517,4 +3528,9 @@ public class AssignmentManager extends ZooKeeperListener { public LoadBalancer getBalancer() { return this.balancer; } + + public Pair>, Map>> + getSnapShotOfAssignment(List infos) { + return getRegionStates().getRegionAssignments(infos); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index f3911fd..d857008 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.MetaScanner; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase; import org.apache.hadoop.hbase.client.Result; @@ -1786,18 +1787,30 @@ MasterServices, Server { private HRegionInfo[] getHRegionInfos(HTableDescriptor hTableDescriptor, byte[][] splitKeys) { HRegionInfo[] hRegionInfos = null; + int numRegionReplicas = hTableDescriptor.getRegionReplication(); + if (numRegionReplicas <= 0) { + LOG.warn("Invalid number of replicas per region in the table descriptor. Setting it to 1."); + numRegionReplicas = 1; + } + long regionId = System.currentTimeMillis(); if (splitKeys == null || splitKeys.length == 0) { - hRegionInfos = new HRegionInfo[]{ - new HRegionInfo(hTableDescriptor.getTableName(), null, null)}; + hRegionInfos = new HRegionInfo[numRegionReplicas]; + for (int i = 0; i < numRegionReplicas; i++) { + hRegionInfos[i] = new HRegionInfo(hTableDescriptor.getTableName(), null, null, + false, regionId, (short)i); + } } else { int numRegions = splitKeys.length + 1; - hRegionInfos = new HRegionInfo[numRegions]; + hRegionInfos = new HRegionInfo[numRegions * numRegionReplicas]; byte[] startKey = null; byte[] endKey = null; for (int i = 0; i < numRegions; i++) { endKey = (i == splitKeys.length) ? null : splitKeys[i]; - hRegionInfos[i] = - new HRegionInfo(hTableDescriptor.getTableName(), startKey, endKey); + for (int j = 0; j < numRegionReplicas; j++) { + hRegionInfos[i*numRegionReplicas + j] = + new HRegionInfo(hTableDescriptor.getTableName(), startKey, endKey, + false, regionId, (short)j); + } startKey = endKey; } } @@ -3203,4 +3216,31 @@ MasterServices, Server { return tableNames; } + /** + * Get a list of replica regions that are: + * not recorded in meta yet. We might not have recorded the locations + * for the replicas since the replicas may not have been online yet, master restarted + * in the middle of assigning, ZK erased, etc. + * @param regionsRecordedInMeta the list of regions we know are recorded in meta + * either as a primary, or, as the location of a non-primary replica + * @param master + * @return list of replica regions + * @throws IOException + */ + public static List replicaRegionsNotRecordedInMeta( + Set regionsRecordedInMeta, MasterServices master)throws IOException { + List regionsNotRecordedInMeta = new ArrayList(); + for (HRegionInfo hri : regionsRecordedInMeta) { + TableName table = hri.getTable(); + HTableDescriptor htd = master.getTableDescriptors().get(table); + // look at the HTD for the replica count. That's the source of truth + int desiredRegionReplication = htd.getRegionReplication(); + for (int i = 0; i < desiredRegionReplication; i++) { + HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i); + if (regionsRecordedInMeta.contains(replica)) continue; + regionsNotRecordedInMeta.add(replica); + } + } + return regionsNotRecordedInMeta; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java index 782e04e..0f6737b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.master; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -66,4 +67,20 @@ public class RackManager { return UNKNOWN_RACK; } + + /** + * Same as {@link #getRack(ServerName)} except that a list is passed + * @param servers + * @return + */ + public List getRack(List servers) { + // just a note - switchMapping caches results (at least the implementation should unless the + // resolution is really a lightweight process) + List serversAsString = new ArrayList(servers.size()); + for (ServerName server : servers) { + serversAsString.add(server.getHostname()); + } + List racks = switchMapping.resolve(serversAsString); + return racks; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java index eaa57fc..f5d34e6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -75,6 +76,12 @@ public class RegionStates { private final Map> serverHoldings; /** + * Maintains for each region (includes primary and all replicas) the set of servers + * it is currently hosted in. + */ + private final Map> regionReplicaToServer; + + /** * Region to server assignment map. * Contains the server a given region is currently assigned to. */ @@ -111,6 +118,7 @@ public class RegionStates { private final ServerManager serverManager; private final Server server; + private final RackManager rackManager; // The maximum time to keep a log split info in region states map static final String LOG_SPLIT_TIME = "hbase.master.maximum.logsplit.keeptime"; @@ -120,12 +128,14 @@ public class RegionStates { regionStates = new HashMap(); regionsInTransition = new HashMap(); serverHoldings = new HashMap>(); + regionReplicaToServer = new HashMap>(); regionAssignments = new TreeMap(); lastAssignments = new HashMap(); processedServers = new HashMap(); deadServers = new HashMap(); this.serverManager = serverManager; this.server = master; + this.rackManager = new RackManager(master.getConfiguration()); } /** @@ -136,6 +146,54 @@ public class RegionStates { return (Map)regionAssignments.clone(); } + /** + * Return the replicas for the regions grouped by ServerName and corresponding Racks + * @param regions + * @return a pair containing the groupings as Maps + */ + synchronized Pair>, Map>> + getRegionAssignments(List regions) { + Map> replicaAssignments = + new TreeMap>(); + Map> rackAssignments = + new TreeMap>(); + List servers = new ArrayList(); + + //what is being computed in the method below can also be maintained inline + //(in addToServerHoldings/removeFromServerHoldings) and on request, cloned and + //returned (just like getRegionAssignments() does), but in practice this + //method will be called with only a few regions and shouldn't be a big deal. Clone + //might be more expensive. + for (HRegionInfo region : regions) { + HRegionInfo primary = RegionReplicaUtil.getRegionInfoForDefaultReplica(region); + Set serversHostingRegionReplicas = regionReplicaToServer.get(primary); + if (serversHostingRegionReplicas != null) { + for (ServerName server : serversHostingRegionReplicas) { + Set regionsOnServer = replicaAssignments.get(server); + if (regionsOnServer == null) { + regionsOnServer = new HashSet(2); + replicaAssignments.put(server, regionsOnServer); + } + regionsOnServer.add(primary); + servers.add(server); + } + } + } + + List racks = rackManager.getRack(servers); + for (int i = 0; i < servers.size(); i++) { + Set r = replicaAssignments.get(servers.get(i)); + Set regionsOnRack = rackAssignments.get(racks.get(i)); + if (regionsOnRack == null) { + regionsOnRack = new HashSet(); + rackAssignments.put(racks.get(i), regionsOnRack); + } + regionsOnRack.addAll(r); + } + return new Pair>, + Map>>(replicaAssignments, rackAssignments); + } + public synchronized ServerName getRegionServerOfRegion(HRegionInfo hri) { return regionAssignments.get(hri); } @@ -375,23 +433,46 @@ public class RegionStates { ServerName oldServerName = regionAssignments.put(hri, serverName); if (!serverName.equals(oldServerName)) { LOG.info("Onlined " + hri.getShortNameToLog() + " on " + serverName); - Set regions = serverHoldings.get(serverName); - if (regions == null) { - regions = new HashSet(); - serverHoldings.put(serverName, regions); - } - regions.add(hri); + addToServerHoldings(serverName, hri); if (oldServerName != null) { LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName); - Set oldRegions = serverHoldings.get(oldServerName); - oldRegions.remove(hri); - if (oldRegions.isEmpty()) { - serverHoldings.remove(oldServerName); - } + removeFromServerHoldings(oldServerName, hri); } } } + private void addToServerHoldings(ServerName serverName, HRegionInfo hri) { + Set regions = serverHoldings.get(serverName); + if (regions == null) { + regions = new HashSet(); + serverHoldings.put(serverName, regions); + } + regions.add(hri); + + HRegionInfo primary = RegionReplicaUtil.getRegionInfoForDefaultReplica(hri); + Set serversHostingReplicaOfRegion = + regionReplicaToServer.get(primary); + if (serversHostingReplicaOfRegion == null) { + serversHostingReplicaOfRegion = new HashSet(); + regionReplicaToServer.put(primary, serversHostingReplicaOfRegion); + } + serversHostingReplicaOfRegion.add(serverName); + } + + private void removeFromServerHoldings(ServerName serverName, HRegionInfo hri) { + Set oldRegions = serverHoldings.get(serverName); + oldRegions.remove(hri); + if (oldRegions.isEmpty()) { + serverHoldings.remove(serverName); + } + HRegionInfo primary = RegionReplicaUtil.getRegionInfoForDefaultReplica(hri); + Set servers = regionReplicaToServer.get(primary); + servers.remove(serverName); + if (servers.isEmpty()) { + regionReplicaToServer.remove(primary); + } + } + /** * A dead server's hlogs have been split so that all the regions * used to be open on it can be safely assigned now. Mark them assignable. @@ -459,11 +540,7 @@ public class RegionStates { ServerName oldServerName = regionAssignments.remove(hri); if (oldServerName != null) { LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName); - Set oldRegions = serverHoldings.get(oldServerName); - oldRegions.remove(hri); - if (oldRegions.isEmpty()) { - serverHoldings.remove(oldServerName); - } + removeFromServerHoldings(oldServerName, hri); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SnapshotOfRegionAssignmentFromMeta.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SnapshotOfRegionAssignmentFromMeta.java index b98c860..2738d4b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SnapshotOfRegionAssignmentFromMeta.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SnapshotOfRegionAssignmentFromMeta.java @@ -33,11 +33,14 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.catalog.MetaReader.Visitor; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper; import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan; @@ -100,20 +103,31 @@ public class SnapshotOfRegionAssignmentFromMeta { public boolean visit(Result result) throws IOException { try { if (result == null || result.isEmpty()) return true; - Pair regionAndServer = - HRegionInfo.getHRegionInfoAndServerName(result); - HRegionInfo hri = regionAndServer.getFirst(); - if (hri == null) return true; + HRegionInfo hri = HRegionInfo.getHRegionInfo(result); + if (hri == null) return true; if (hri.getTable() == null) return true; if (disabledTables.contains(hri.getTable())) { return true; } // Are we to include split parents in the list? if (excludeOfflinedSplitParents && hri.isSplit()) return true; - // Add the current assignment to the snapshot - addAssignment(hri, regionAndServer.getSecond()); - addRegion(hri); - + RegionLocations locations = MetaReader.getRegionLocations(result); + HRegionLocation[] hrl = locations.getRegionLocations(); + + // Add the current assignment to the snapshot for all replicas + if (hrl != null) { + for (int i = 0; i < hrl.length; i++) { + hri = RegionReplicaUtil.getRegionInfoForReplica(hri, i); + addAssignment(hri, hrl[i].getServerName()); + addRegion(hri); + } + } else { + // add a 'null' assignment. Required for map.keyset operation on the + // return value from getRegionToRegionServerMap. The keyset should + // still contain the hri although the region is presently not assigned + addAssignment(hri, null); + addRegion(hri); + } // the code below is to handle favored nodes byte[] favoredNodes = result.getValue(HConstants.CATALOG_FAMILY, FavoredNodeAssignmentHelper.FAVOREDNODES_QUALIFIER); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 710796b..12f9d77 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Comparator; import java.util.Deque; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -39,10 +40,14 @@ import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RackManager; +import org.apache.hadoop.hbase.util.Pair; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Sets; @@ -80,6 +85,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { Map serversToIndex; Map tablesToIndex; + Map> serverToReplicaMap; + Map> rackToReplicaMap; int numRegions; int numServers; @@ -87,9 +94,11 @@ public abstract class BaseLoadBalancer implements LoadBalancer { int numMovedRegions = 0; //num moved regions from the initial configuration int numMovedMetaRegions = 0; //num of moved regions that are META + protected RackManager rackManager; + int uniqueRacks; protected Cluster(Map> clusterState, Map> loads, - RegionLocationFinder regionFinder) { + RegionLocationFinder regionFinder, RackManager rackManager) { serversToIndex = new HashMap(); tablesToIndex = new HashMap(); @@ -100,6 +109,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { numRegions = 0; + this.rackManager = rackManager; int serverIndex = 0; @@ -127,6 +137,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionLoads = new Deque[numRegions]; regionLocations = new int[numRegions][]; serverIndicesSortedByRegionCount = new Integer[numServers]; + serverToReplicaMap = new HashMap>(); + rackToReplicaMap = new HashMap>(); int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0; @@ -143,8 +155,16 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionsPerServer[serverIndex] = new int[entry.getValue().size()]; serverIndicesSortedByRegionCount[serverIndex] = serverIndex; } - + Setracks = new HashSet(); for (Entry> entry : clusterState.entrySet()) { + String rack = null; + if (rackManager != null) { + rack = rackManager.getRack(entry.getKey()); + if (!racks.contains(rack)) { + uniqueRacks++; + racks.add(rack); + } + } serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); regionPerServerIndex = 0; @@ -185,6 +205,24 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } } + Set replicaMap = serverToReplicaMap.get(serverIndex); + if (replicaMap == null) { + replicaMap = new HashSet(); + serverToReplicaMap.put(serverIndex, replicaMap); + } + //add the default replica for easy comparison later + HRegionInfo h = RegionReplicaUtil.getRegionInfoForDefaultReplica(region); + replicaMap.add(h); + if (rack != null) { + // maintain the rack information as well + Set perRackReplicaMap = rackToReplicaMap.get(rack); + if (perRackReplicaMap == null) { + perRackReplicaMap = new HashSet(); + rackToReplicaMap.put(rack, perRackReplicaMap); + } + perRackReplicaMap.add(h); + } + regionIndex++; } } @@ -217,19 +255,79 @@ public abstract class BaseLoadBalancer implements LoadBalancer { if (rRegion >= 0 && lRegion >= 0) { regionMoved(rRegion, rServer, lServer); regionsPerServer[rServer] = replaceRegion(regionsPerServer[rServer], rRegion, lRegion); + updateReplicaMap(rServer, lServer, rRegion); + updateReplicaMap(lServer, rServer, lRegion); regionMoved(lRegion, lServer, rServer); regionsPerServer[lServer] = replaceRegion(regionsPerServer[lServer], lRegion, rRegion); } else if (rRegion >= 0) { //move rRegion regionMoved(rRegion, rServer, lServer); regionsPerServer[rServer] = removeRegion(regionsPerServer[rServer], rRegion); regionsPerServer[lServer] = addRegion(regionsPerServer[lServer], rRegion); + updateReplicaMap(rServer, lServer, rRegion); } else if (lRegion >= 0) { //move lRegion regionMoved(lRegion, lServer, rServer); regionsPerServer[lServer] = removeRegion(regionsPerServer[lServer], lRegion); regionsPerServer[rServer] = addRegion(regionsPerServer[rServer], lRegion); + updateReplicaMap(rServer, lServer, lRegion); } } + // remove region from lServer and put it in rServer. Also update the racks information + @VisibleForTesting + void updateReplicaMap(int lServer, int rServer, int region) { + Set regionSet = serverToReplicaMap.get(lServer); + if (regionSet != null) { + regionSet.remove(regions[region]); + } + regionSet = serverToReplicaMap.get(rServer); + if (regionSet == null) { + regionSet = new HashSet(); + serverToReplicaMap.put(rServer, regionSet); + } + regionSet.add(regions[region]); + // update the racks info + if (rackManager != null) { + String srcRack = rackManager.getRack(servers[lServer]); + String destRack = rackManager.getRack(servers[rServer]); + regionSet = rackToReplicaMap.get(srcRack); + if (regionSet != null) { + regionSet.remove(regions[region]); + } + regionSet = rackToReplicaMap.get(destRack); + if (regionSet == null) { + regionSet = new HashSet(); + rackToReplicaMap.put(destRack, regionSet); + } + regionSet.add(regions[region]); + } + } + + /** + * Return true if the placement of region on server would lower the availability + * of the region in question + * @param server + * @param region + * @return true or false + */ + boolean wouldLowerAvailability(int server, int region) { + // availability would be lowered if the balancer chooses the node to move to as the node + // where the replica is + Set set = serverToReplicaMap.get(server); + HRegionInfo hri = RegionReplicaUtil.getRegionInfoForDefaultReplica(regions[region]); + if (set != null && set.contains(hri)) return true; + //TODO: add same host checks (if more than 1 host,then don't place replicas on the same host) + // also availability would be lowered if the balancer chooses the node to move to a + // node in the same rack (if there were multiple racks to choose from) + if (uniqueRacks > 1) { + String destRack = rackManager.getRack(servers[server]); + if (rackToReplicaMap.get(destRack).contains(hri)) { + // the destination rack contains a replica (primary or secondary) + return true; + } + //TODO: handle the case of more than one replica + } + return false; + } /** Region moved out of the server */ void regionMoved(int regionIndex, int oldServerIndex, int newServerIndex) { regionIndexToServerIndex[regionIndex] = newServerIndex; @@ -345,6 +443,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { // slop for regions protected float slop; private Configuration config; + protected RackManager rackManager; private static final Random RANDOM = new Random(System.currentTimeMillis()); private static final Log LOG = LogFactory.getLog(BaseLoadBalancer.class); @@ -358,6 +457,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { else if (slop > 1) slop = 1; this.config = conf; + this.rackManager = new RackManager(getConf()); } protected void setSlop(Configuration conf) { @@ -430,6 +530,12 @@ public abstract class BaseLoadBalancer implements LoadBalancer { return null; } Map> assignments = new TreeMap>(); + // Get the snapshot of the current assignments for the regions in question, and then create + // a cluster out of it. Note that we might have replicas already assigned to some servers + // earlier. So we want to get the snapshot to see those assignments. + Pair>, Map>> currentAssignments = + getCurrentAssignmentSnapshot(servers, regions); + int uniqueRacks = getUniqueRacks(servers); int numRegions = regions.size(); int numServers = servers.size(); int max = (int) Math.ceil((float) numRegions / numServers); @@ -438,18 +544,111 @@ public abstract class BaseLoadBalancer implements LoadBalancer { serverIdx = RANDOM.nextInt(numServers); } int regionIdx = 0; + List unassignedRegions = new ArrayList(); for (int j = 0; j < numServers; j++) { ServerName server = servers.get((j + serverIdx) % numServers); List serverRegions = new ArrayList(max); for (int i = regionIdx; i < numRegions; i += numServers) { - serverRegions.add(regions.get(i % numRegions)); + HRegionInfo region = regions.get(i % numRegions); + if (wouldLowerAvailability(currentAssignments.getFirst(), currentAssignments.getSecond(), + uniqueRacks, server, region)) { + unassignedRegions.add(region); + } else { + serverRegions.add(region); + } + // also update the assignments for checking availability + updateAvailabilityCheckerMaps(serverRegions, server, currentAssignments.getFirst(), + currentAssignments.getSecond()); } assignments.put(server, serverRegions); regionIdx++; } + List lastFewRegions = new ArrayList(); + lastFewRegions.addAll(unassignedRegions); + // assign the unassigned regions by going through the list and doing straight + // round robin + for (HRegionInfo hri : unassignedRegions) { + //TODO: remember the index every iteration (don't start from zero) + for (int j = 0; j < numServers; j++) { + if (!wouldLowerAvailability(currentAssignments.getFirst(), currentAssignments.getSecond(), + uniqueRacks, servers.get(j), hri)) { + List hris = assignments.get(servers.get(j)); + if (hris == null) { + hris = new ArrayList(); + } + hris.add(hri); + assignments.put(servers.get(j), hris); + // also update the assignments for checking availability + List h = new ArrayList(); + h.add(hri); + updateAvailabilityCheckerMaps(h, servers.get(j), currentAssignments.getFirst(), + currentAssignments.getSecond()); + lastFewRegions.remove(hri); + break; + } + } + } + // just sprinkle the rest of the regions on random regionservers. The balanceCluster will + // make it optimal later + for (HRegionInfo hri : lastFewRegions) { + int i = RANDOM.nextInt(numServers); + List hris = assignments.get(servers.get(i)); + if (hris == null) { + hris = new ArrayList(); + } + hris.add(hri); + assignments.put(servers.get(i), hris); + } return assignments; } + boolean wouldLowerAvailability(Map> serverToRegionAssignments, + Map> rackToRegionAssignments, + int uniqueRacks, + ServerName server, HRegionInfo region) { + HRegionInfo hri = RegionReplicaUtil.getRegionInfoForDefaultReplica(region); + Set regions = serverToRegionAssignments.get(server); + if (regions == null) return false; + if (regions.contains(hri)) return true; + //TODO: add same host checks (if more than 1 host,then don't place replicas on the same host) + // also availability would be lowered if the balancer chooses the node to move to a + // node in the same rack (if there were multiple racks to choose from) + if (uniqueRacks > 1) { + String destRack = rackManager.getRack(server); + regions = rackToRegionAssignments.get(destRack); + if (regions == null) return false; + if (rackToRegionAssignments.get(destRack).contains(hri)) { + // the destination rack contains a replica (primary or secondary) + return true; + } + } + return false; + } + + private void updateAvailabilityCheckerMaps(ListserverRegions, ServerName server, + Map> serverToRegions, + Map> rackToRegions) { + Set hris = serverToRegions.get(server); + if (hris == null) { + hris = new HashSet(); + serverToRegions.put(server, hris); + } + // add the primary regions - since for the comparisons in wouldLowerAvailability + // we use the primary regioninfo + for (HRegionInfo hri : serverRegions) { + hris.add(RegionReplicaUtil.getRegionInfoForDefaultReplica(hri)); + } + if (rackManager != null) { + hris = rackToRegions.get(rackManager.getRack(server)); + if (hris == null) { + hris = new HashSet(); + rackToRegions.put(rackManager.getRack(server), hris); + } + for (HRegionInfo hri : serverRegions) { + hris.add(RegionReplicaUtil.getRegionInfoForDefaultReplica(hri)); + } + } + } /** * Generates an immediate assignment plan to be used by a new master for * regions in transition that do not have an already known destination. @@ -488,7 +687,22 @@ public abstract class BaseLoadBalancer implements LoadBalancer { LOG.warn("Wanted to do random assignment but no servers to assign to"); return null; } - return servers.get(RANDOM.nextInt(servers.size())); + List regions = new ArrayList(); + regions.add(regionInfo); + // Get the snapshot of the current assignments for the regions in question, and then create + // a cluster out of it. Note that we might have replicas already assigned to some servers + // earlier. So we want to get the snapshot to see those assignments. + Pair>, Map>> currentAssignments = + getCurrentAssignmentSnapshot(servers, regions); + int uniqueRacks = getUniqueRacks(servers); + ServerName server = null; + Set serversConsidered = new HashSet(); + do { + server = servers.get(RANDOM.nextInt(servers.size())); + serversConsidered.add(server); + } while (servers.size() != serversConsidered.size() && + wouldLowerAvailability(currentAssignments.getFirst(), currentAssignments.getSecond(), + uniqueRacks, server, regionInfo)); return server; } /** @@ -538,6 +752,12 @@ public abstract class BaseLoadBalancer implements LoadBalancer { int numRandomAssignments = 0; int numRetainedAssigments = 0; + // Get the snapshot of the current assignments for the regions in question, and then create + // a cluster out of it. Note that we might have replicas already assigned to some servers + // earlier. So we want to get the snapshot to see those assignments. + Pair>, Map>> currentAssignments = + getCurrentAssignmentSnapshot(servers, new ArrayList(regions.keySet())); + int uniqueRacks = getUniqueRacks(servers); for (Map.Entry entry : regions.entrySet()) { HRegionInfo region = entry.getKey(); ServerName oldServerName = entry.getValue(); @@ -548,22 +768,33 @@ public abstract class BaseLoadBalancer implements LoadBalancer { if (localServers.isEmpty()) { // No servers on the new cluster match up with this hostname, // assign randomly. - ServerName randomServer = servers.get(RANDOM.nextInt(servers.size())); - assignments.get(randomServer).add(region); + assignToRandomServerFromList(servers, assignments, uniqueRacks, currentAssignments, region); numRandomAssignments++; if (oldServerName != null) oldHostsNoLongerPresent.add(oldServerName.getHostname()); } else if (localServers.size() == 1) { - // the usual case - one new server on same host - assignments.get(localServers.get(0)).add(region); + ServerName target = localServers.get(0); + // also update the assignments for checking availability + updateAvailabilityCheckerMaps(assignments.get(target), target, + currentAssignments.getFirst(), currentAssignments.getSecond()); numRetainedAssigments++; } else { // multiple new servers in the cluster on this same host int size = localServers.size(); - ServerName target = - localServers.contains(oldServerName) ? oldServerName : localServers.get(RANDOM - .nextInt(size)); - assignments.get(target).add(region); - numRetainedAssigments++; + ServerName target = localServers.get(RANDOM.nextInt(size)); + // if availability would be lowered by assigning to one server out of the many in the same host, + // we are sure that availability would be lowered if we assigned to some other server in the + // same host.. + if (wouldLowerAvailability(currentAssignments.getFirst(), + currentAssignments.getSecond(), uniqueRacks, target, region)) { + assignToRandomServerFromList(servers, assignments, uniqueRacks, currentAssignments, region); + numRandomAssignments++; + } else { + assignments.get(target).add(region); + // also update the assignments for checking availability + updateAvailabilityCheckerMaps(assignments.get(target), target, + currentAssignments.getFirst(), currentAssignments.getSecond()); + numRetainedAssigments++; + } } } @@ -581,6 +812,52 @@ public abstract class BaseLoadBalancer implements LoadBalancer { return assignments; } + private void assignToRandomServerFromList(List servers, + Map> assignments, int uniqueRacks, + Pair>, Map>> currentAssignments, + HRegionInfo region) { + ServerName randomServer = null; + Set serversConsidered = new HashSet(); + do { + randomServer = servers.get(RANDOM.nextInt(servers.size())); + serversConsidered.add(randomServer); + } while (servers.size() != serversConsidered.size() && + wouldLowerAvailability(currentAssignments.getFirst(), currentAssignments.getSecond(), + uniqueRacks, randomServer, region)); + assignments.get(randomServer).add(region); + // also update the assignments for checking availability + updateAvailabilityCheckerMaps(assignments.get(randomServer), randomServer, + currentAssignments.getFirst(), currentAssignments.getSecond()); + } + + private Pair>, Map>> + getCurrentAssignmentSnapshot(List servers, List regions) { + Pair>, Map>> currentAssignments; + if (this.services != null) { + currentAssignments = this.services.getAssignmentManager().getSnapShotOfAssignment(regions); + } else { //create empty datastructures .. lot of code. + currentAssignments = new Pair>, + Map>>(); + Map> a = new HashMap>(0); + Map> b = new HashMap>(0); + currentAssignments.setFirst(a); + currentAssignments.setSecond(b); + } + return currentAssignments; + } + + private int getUniqueRacks(List servers) { + //for this round of assignment, how many racks are we talking about (TODO: need to access + //the rackManager via the master or something) + int uniqueRacks = 1; + if (rackManager != null) { + List racks = rackManager.getRack(servers); + Set rackSet = new HashSet(racks); + uniqueRacks = rackSet.size(); + } + return uniqueRacks; + } + @Override public void initialize() throws HBaseIOException{ } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index 964d4f9..8848965 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -198,7 +198,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { long startTime = EnvironmentEdgeManager.currentTimeMillis(); // Keep track of servers to iterate through them. - Cluster cluster = new Cluster(clusterState, loads, regionFinder); + Cluster cluster = new Cluster(clusterState, loads, regionFinder, rackManager); double currentCost = computeCost(cluster, Double.MAX_VALUE); double initCost = currentCost; @@ -227,6 +227,16 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { if (leftRegion < 0 && rightRegion < 0) { continue; } + // Would we lower the availability if we did this + // TODO: does it make sense to look at a picker for the availability + if (leftRegion >= 0 && + cluster.wouldLowerAvailability(rightServer, leftRegion)) { + continue; + } + if (rightRegion >= 0 && + cluster.wouldLowerAvailability(leftServer, rightRegion)) { + continue; + } cluster.moveOrSwapRegion(leftServer, rightServer, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java index b9084b9..40420cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java @@ -27,7 +27,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -39,6 +38,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.regionserver.HRegion; /** @@ -111,7 +111,13 @@ public abstract class ModifyRegionUtils { CompletionService completionService = new ExecutorCompletionService( regionOpenAndInitThreadPool); List regionInfos = new ArrayList(); + int primaryRegions = 0; for (final HRegionInfo newRegion : newRegions) { + regionInfos.add(newRegion); + if (!RegionReplicaUtil.isDefaultReplica(newRegion)) { + continue; + } + primaryRegions++; completionService.submit(new Callable() { @Override public HRegionInfo call() throws IOException { @@ -121,10 +127,8 @@ public abstract class ModifyRegionUtils { } try { // wait for all regions to finish creation - for (int i = 0; i < regionNumber; i++) { - Future future = completionService.take(); - HRegionInfo regionInfo = future.get(); - regionInfos.add(regionInfo); + for (int i = 0; i < primaryRegions; i++) { + completionService.take().get(); } } catch (InterruptedException e) { LOG.error("Caught " + e + " during region creation"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java index 310ae90..ec041f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java @@ -164,7 +164,7 @@ public class BalancerTestBase { } protected BaseLoadBalancer.Cluster mockCluster(int[] mockCluster) { - return new BaseLoadBalancer.Cluster(mockClusterServers(mockCluster, -1), null, null); + return new BaseLoadBalancer.Cluster(mockClusterServers(mockCluster, -1), null, null, null); } protected Map> mockClusterServers(int[] mockCluster, int numTables) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java index d0cf4fa..e14e53b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -34,16 +35,24 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.master.LoadBalancer; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster; +import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RegionPlan; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; @Category(MediumTests.class) public class TestBaseLoadBalancer extends BalancerTestBase { private static LoadBalancer loadBalancer; + private static RackManager rackManager; + private static final int NUM_SERVERS = 15; + private static ServerName[] servers = new ServerName[NUM_SERVERS]; private static final Log LOG = LogFactory.getLog(TestStochasticLoadBalancer.class); int[][] regionsAndServersMocks = new int[][] { @@ -58,6 +67,20 @@ public class TestBaseLoadBalancer extends BalancerTestBase { Configuration conf = HBaseConfiguration.create(); loadBalancer = new MockBalancer(); loadBalancer.setConf(conf); + // Set up the rack topologies (5 machines per rack) + rackManager = Mockito.mock(RackManager.class); + for (int i = 0; i < NUM_SERVERS; i++) { + servers[i] = ServerName.valueOf("foo"+i+":1234",-1); + if (i < 5) { + Mockito.when(rackManager.getRack(servers[i])).thenReturn("rack1"); + } + if (i >= 5 && i < 10) { + Mockito.when(rackManager.getRack(servers[i])).thenReturn("rack2"); + } + if (i >= 10) { + Mockito.when(rackManager.getRack(servers[i])).thenReturn("rack3"); + } + } } public static class MockBalancer extends BaseLoadBalancer { @@ -174,6 +197,135 @@ public class TestBaseLoadBalancer extends BalancerTestBase { assertRetainedAssignment(existing, listOfServerNames, assignment); } + @Test + public void testRegionAvailability() throws Exception { + // Create a cluster with a few servers, assign them to specific racks + // then assign some regions. The tests should check whether moving a + // replica from one node to a specific other node or rack lowers the + // availability of the region or not + + List list0 = new ArrayList(); + List list1 = new ArrayList(); + List list2 = new ArrayList(); + // create a region (region1) + HRegionInfo hri1 = new HRegionInfo( + TableName.valueOf("table"), "key1".getBytes(), "key2".getBytes(), + false, 100); + // create a replica of the region (replica_of_region1) + HRegionInfo hri2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1); + // create a second region (region2) + HRegionInfo hri3 = new HRegionInfo( + TableName.valueOf("table"), "key2".getBytes(), "key3".getBytes(), + false, 101); + list0.add(hri1); //only region1 + list1.add(hri2); //only replica_of_region1 + list2.add(hri3); //only region2 + Map> clusterState = + new LinkedHashMap>(); + clusterState.put(servers[0], list0); //servers[0] hosts region1 + clusterState.put(servers[1], list1); //servers[1] hosts replica_of_region1 + clusterState.put(servers[2], list2); //servers[2] hosts region2 + // create a cluster with the above clusterState. The way in which the + // cluster is created (constructor code) would make sure the indices of + // the servers are in the order in which it is inserted in the clusterState + // map (linkedhashmap is important). A similar thing applies to the region lists + Cluster cluster = new Cluster(clusterState, null, null, rackManager); + // check whether a move of region1 from servers[0] to servers[1] would lower + // the availability of region1 + assertTrue(cluster.wouldLowerAvailability(1, 0)); + // check whether a move of region1 from servers[0] to servers[2] would lower + // the availability of region1 + assertTrue(!cluster.wouldLowerAvailability(2, 0)); + // check whether a move of replica_of_region1 from servers[0] to servers[2] would lower + // the availability of replica_of_region1 + assertTrue(!cluster.wouldLowerAvailability(2, 1)); + // check whether a move of region2 from servers[0] to servers[1] would lower + // the availability of region2 + assertTrue(!cluster.wouldLowerAvailability(1, 2)); + + // now lets have servers[1] host replica_of_region2 + list1.add(RegionReplicaUtil.getRegionInfoForReplica(hri3, 1)); + // create a new clusterState with the above change + cluster = new Cluster(clusterState, null, null, rackManager); + // now check whether a move of a replica from servers[0] to servers[1] would lower + // the availability of region2 + assertTrue(cluster.wouldLowerAvailability(1, 2)); + + // start over again + clusterState.clear(); + clusterState.put(servers[0], list0); //servers[0], rack1 hosts region1 + clusterState.put(servers[5], list1); //servers[5], rack2 hosts replica_of_region1 and region2 + clusterState.put(servers[6], list2); //servers[6], rack2 hosts region2 + // create a cluster with the above clusterState + cluster = new Cluster(clusterState, null, null, rackManager); + // check whether a move of region1 from servers[0],rack1 to servers[6],rack2 would + // lower the availability + assertTrue(cluster.wouldLowerAvailability(2, 0)); + + // now create a cluster without the rack manager + cluster = new Cluster(clusterState, null, null, null); + // now repeat check whether a move of region1 from servers[0] to servers[6] would + // lower the availability + assertTrue(!cluster.wouldLowerAvailability(2, 0)); + } + + @Test + public void testRegionAvailabilityWithRegionMoves() throws Exception { + List list0 = new ArrayList(); + List list1 = new ArrayList(); + List list2 = new ArrayList(); + // create a region (region1) + HRegionInfo hri1 = new HRegionInfo( + TableName.valueOf("table"), "key1".getBytes(), "key2".getBytes(), + false, 100); + // create a replica of the region (replica_of_region1) + HRegionInfo hri2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1); + // create a second region (region2) + HRegionInfo hri3 = new HRegionInfo( + TableName.valueOf("table"), "key2".getBytes(), "key3".getBytes(), + false, 101); + list0.add(hri1); //only region1 + list1.add(hri2); //only replica_of_region1 + list2.add(hri3); //only region2 + Map> clusterState = + new LinkedHashMap>(); + clusterState.put(servers[0], list0); //servers[0] hosts region1 + clusterState.put(servers[1], list1); //servers[1] hosts replica_of_region1 + clusterState.put(servers[2], list2); //servers[2] hosts region2 + // create a cluster with the above clusterState. The way in which the + // cluster is created (constructor code) would make sure the indices of + // the servers are in the order in which it is inserted in the clusterState + // map (linkedhashmap is important). + Cluster cluster = new Cluster(clusterState, null, null, rackManager); + // check whether moving region1 from servers[1] to servers[2] would lower availability + assertTrue(!cluster.wouldLowerAvailability(2, 0)); + + // now move region1 from servers[0] to servers[2] + cluster.updateReplicaMap(0, 2, 0); + // now repeat check whether moving region1 from servers[1] to servers[2] + // would lower availability + assertTrue(cluster.wouldLowerAvailability(2, 0)); + + // start over again + clusterState.clear(); + List list3 = new ArrayList(); + list3.add(RegionReplicaUtil.getRegionInfoForReplica(hri3, 1)); + clusterState.put(servers[0], list0); //servers[0], rack1 hosts region1 + clusterState.put(servers[5], list1); //servers[5], rack2 hosts replica_of_region1 + clusterState.put(servers[6], list2); //servers[6], rack2 hosts region2 + clusterState.put(servers[12], list3); //servers[12], rack3 hosts replica_of_region2 + // create a cluster with the above clusterState + cluster = new Cluster(clusterState, null, null, rackManager); + // check whether a move of replica_of_region2 from servers[12],rack3 to servers[0],rack1 would + // lower the availability + assertTrue(!cluster.wouldLowerAvailability(0, 3)); + // now move region2 from servers[6],rack2 to servers[0],rack1 + cluster.updateReplicaMap(2, 0, 2); + // now repeat check if replica_of_region2 from servers[12],rack3 to servers[0],rack1 would + // lower the availability + assertTrue(cluster.wouldLowerAvailability(0, 3)); + } + private List getListOfServerNames(final List sals) { List list = new ArrayList(); for (ServerAndLoad e : sals) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java index 162a257..a66a60e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.net.NetworkTopology; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -57,6 +59,8 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { public static void beforeAllTests() throws Exception { Configuration conf = HBaseConfiguration.create(); conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f); + conf.setClass("hbase.util.ip.to.rack.determiner", + MyRackResolver.class, DNSToSwitchMapping.class); loadBalancer = new StochasticLoadBalancer(); loadBalancer.setConf(conf); } @@ -386,4 +390,22 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { cluster[cluster.length - 1] = numRegions - ((cluster.length - 1) * numRegionsPerServer); return mockClusterServers(cluster, numTables); } + + + public static class MyRackResolver implements DNSToSwitchMapping { + + public MyRackResolver(Configuration conf) {} + + @Override + public List resolve(List names) { + List racks = new ArrayList(names.size()); + for (int i = 0; i < names.size(); i++) { + racks.add(i, NetworkTopology.DEFAULT_RACK); + } + return racks; + } + + @Override + public void reloadCachedMappings() {} + } }