diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java index 498b95c..c68b589 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java @@ -20,13 +20,20 @@ package org.apache.hadoop.hbase.master; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DaemonThreadFactory; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.hadoop.net.CachedDNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.ScriptBasedMapping; /** @@ -40,15 +47,24 @@ public class RackManager { public static final String UNKNOWN_RACK = "Unknown Rack"; private DNSToSwitchMapping switchMapping; + private final int numThreads = 3; + // the amount of time, in milliseconds, we give IP to rack determiner to resolve rack + private int timeout; + private final ExecutorService executorService = + Executors.newFixedThreadPool(numThreads, new DaemonThreadFactory("rackmanager")); + public final static String RACK_DETERMINER_CLASS = "hbase.util.ip.to.rack.determiner"; + public final static String IP_TO_RACK_DETEMINDER_TIMEOUT = "hbase.ip.to.rack.determiner.timeout"; + private final static int DEFAULT_IP_TO_RACK_DETEMINDER_TIMEOUT = 60000; public RackManager() { } public RackManager(Configuration conf) { switchMapping = ReflectionUtils.instantiateWithCustomCtor( - conf.getClass("hbase.util.ip.to.rack.determiner", ScriptBasedMapping.class, + conf.getClass(RACK_DETERMINER_CLASS, ScriptBasedMapping.class, DNSToSwitchMapping.class).getName(), new Class[]{Configuration.class}, new Object[]{conf}); + timeout = conf.getInt(IP_TO_RACK_DETEMINDER_TIMEOUT, DEFAULT_IP_TO_RACK_DETEMINDER_TIMEOUT); } /** @@ -57,15 +73,31 @@ public class RackManager { * @param server the server for which to get the rack name * @return the rack name of the server */ - public String getRack(ServerName server) { + public String getRack(final ServerName server) { if (server == null) { return UNKNOWN_RACK; } - // just a note - switchMapping caches results (at least the implementation should unless the - // resolution is really a lightweight process) - List racks = switchMapping.resolve(Arrays.asList(server.getHostname())); - if (racks != null && !racks.isEmpty()) { - return racks.get(0); + final List racks = new ArrayList(); + final String hostname = server.getHostname(); + Future f = (Future)executorService.submit(new Runnable() { + + @Override + public void run() { + // just a note - switchMapping caches results (at least the implementation should unless the + // resolution is really a lightweight process) + racks.addAll(switchMapping.resolve(Arrays.asList(hostname))); + } + }); + try { + f.get(timeout, TimeUnit.MILLISECONDS); + if (racks != null && !racks.isEmpty()) { + return racks.get(0); + } + } catch (TimeoutException e) { + f.cancel(true); + LOG.warn("Timeout retrieving rack for " + hostname); + } catch (Exception e) { + LOG.warn("Got exception retrieving rack for " + hostname, e); } return UNKNOWN_RACK; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 0a2d7b0..30ecf79 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -212,7 +212,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionsToIndex = new HashMap(numRegions); servers = new ServerName[numServers]; serversPerHost = new int[numHosts][]; - serversPerRack = new int[numRacks][]; regions = new HRegionInfo[numRegions]; regionIndexToServerIndex = new int[numRegions]; initialRegionIndexToServerIndex = new int[numRegions]; @@ -226,10 +225,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { serverIndexToRackIndex = new int[numServers]; regionsPerServer = new int[numServers][]; regionsPerHost = new int[numHosts][]; - regionsPerRack = new int[numRacks][]; primariesOfRegionsPerServer = new int[numServers][]; primariesOfRegionsPerHost = new int[numHosts][]; - primariesOfRegionsPerRack = new int[numRacks][]; int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0; @@ -270,7 +267,12 @@ public abstract class BaseLoadBalancer implements LoadBalancer { int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); serverIndexToHostIndex[serverIndex] = hostIndex; - int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); + String rack = this.rackManager.getRack(entry.getKey()); + if (!racksToIndex.containsKey(rack)) { + racksToIndex.put(rack, numRacks++); + serversPerRackList.add(new ArrayList()); + } + int rackIndex = racksToIndex.get(rack); serverIndexToRackIndex[serverIndex] = rackIndex; for (HRegionInfo region : entry.getValue()) { @@ -295,6 +297,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } } + serversPerRack = new int[numRacks][]; for (int i = 0; i < serversPerRackList.size(); i++) { serversPerRack[i] = new int[serversPerRackList.get(i).size()]; for (int j = 0; j < serversPerRack[i].length; j++) { @@ -376,6 +379,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } } + regionsPerRack = new int[numRacks][]; + primariesOfRegionsPerRack = new int[numRacks][]; // compute regionsPerRack if (numRacks > 1) { for (int i = 0 ; i < serversPerRack.length; i++) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java index 11ca6c7..06ccff6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java @@ -50,6 +50,8 @@ import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.net.ScriptBasedMapping; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -74,9 +76,25 @@ public class TestBaseLoadBalancer extends BalancerTestBase { new int[] { 1, 3 }, new int[] { 2, 3 }, new int[] { 3, 3 }, new int[] { 25, 3 }, new int[] { 2, 10 }, new int[] { 2, 100 }, new int[] { 12, 10 }, new int[] { 12, 100 }, }; + static public class MockMapping extends ScriptBasedMapping { + public MockMapping(Configuration conf) { + } + + private static String RACK = "rack"; + @Override + public List resolve(List names) { + List ret = new ArrayList(names.size()); + for (String name : names) { + ret.add(RACK); + } + return ret; + } + } + @BeforeClass public static void beforeAllTests() throws Exception { Configuration conf = HBaseConfiguration.create(); + conf.setClass(RackManager.RACK_DETERMINER_CLASS, MockMapping.class, DNSToSwitchMapping.class); loadBalancer = new MockBalancer(); loadBalancer.setConf(conf); MasterServices st = Mockito.mock(MasterServices.class);