diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java index 498b95c..c68b589 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java @@ -20,13 +20,20 @@ package org.apache.hadoop.hbase.master; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DaemonThreadFactory; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.hadoop.net.CachedDNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.ScriptBasedMapping; /** @@ -40,15 +47,24 @@ public class RackManager { public static final String UNKNOWN_RACK = "Unknown Rack"; private DNSToSwitchMapping switchMapping; + private final int numThreads = 3; + // the amount of time, in milliseconds, we give IP to rack determiner to resolve rack + private int timeout; + private final ExecutorService executorService = + Executors.newFixedThreadPool(numThreads, new DaemonThreadFactory("rackmanager")); + public final static String RACK_DETERMINER_CLASS = "hbase.util.ip.to.rack.determiner"; + public final static String IP_TO_RACK_DETERMINER_TIMEOUT = "hbase.ip.to.rack.determiner.timeout"; + private final static int DEFAULT_IP_TO_RACK_DETERMINER_TIMEOUT = 60000; public RackManager() { } public RackManager(Configuration conf) { switchMapping = ReflectionUtils.instantiateWithCustomCtor( - conf.getClass("hbase.util.ip.to.rack.determiner", ScriptBasedMapping.class, + conf.getClass(RACK_DETERMINER_CLASS, ScriptBasedMapping.class, DNSToSwitchMapping.class).getName(), new Class[]{Configuration.class}, new Object[]{conf}); + timeout = conf.getInt(IP_TO_RACK_DETERMINER_TIMEOUT, DEFAULT_IP_TO_RACK_DETERMINER_TIMEOUT); } /** @@ -57,15 +73,31 @@ public class RackManager { * @param server the server for which to get the rack name * @return the rack name of the server */ - public String getRack(ServerName server) { + public String getRack(final ServerName server) { if (server == null) { return UNKNOWN_RACK; } - // just a note - switchMapping caches results (at least the implementation should unless the - // resolution is really a lightweight process) - List racks = switchMapping.resolve(Arrays.asList(server.getHostname())); - if (racks != null && !racks.isEmpty()) { - return racks.get(0); + final List racks = new ArrayList(); + final String hostname = server.getHostname(); + Future f = (Future)executorService.submit(new Runnable() { + + @Override + public void run() { + // just a note - switchMapping caches results (at least the implementation should unless the + // resolution is really a lightweight process) + racks.addAll(switchMapping.resolve(Arrays.asList(hostname))); + } + }); + try { + f.get(timeout, TimeUnit.MILLISECONDS); + if (racks != null && !racks.isEmpty()) { + return racks.get(0); + } + } catch (TimeoutException e) { + f.cancel(true); + LOG.warn("Timeout retrieving rack for " + hostname); + } catch (Exception e) { + LOG.warn("Got exception retrieving rack for " + hostname, e); } return UNKNOWN_RACK; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 0a2d7b0..30ecf79 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -212,7 +212,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionsToIndex = new HashMap(numRegions); servers = new ServerName[numServers]; serversPerHost = new int[numHosts][]; - serversPerRack = new int[numRacks][]; regions = new HRegionInfo[numRegions]; regionIndexToServerIndex = new int[numRegions]; initialRegionIndexToServerIndex = new int[numRegions]; @@ -226,10 +225,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { serverIndexToRackIndex = new int[numServers]; regionsPerServer = new int[numServers][]; regionsPerHost = new int[numHosts][]; - regionsPerRack = new int[numRacks][]; primariesOfRegionsPerServer = new int[numServers][]; primariesOfRegionsPerHost = new int[numHosts][]; - primariesOfRegionsPerRack = new int[numRacks][]; int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0; @@ -270,7 +267,12 @@ public abstract class BaseLoadBalancer implements LoadBalancer { int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); serverIndexToHostIndex[serverIndex] = hostIndex; - int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); + String rack = this.rackManager.getRack(entry.getKey()); + if (!racksToIndex.containsKey(rack)) { + racksToIndex.put(rack, numRacks++); + serversPerRackList.add(new ArrayList()); + } + int rackIndex = racksToIndex.get(rack); serverIndexToRackIndex[serverIndex] = rackIndex; for (HRegionInfo region : entry.getValue()) { @@ -295,6 +297,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } } + serversPerRack = new int[numRacks][]; for (int i = 0; i < serversPerRackList.size(); i++) { serversPerRack[i] = new int[serversPerRackList.get(i).size()]; for (int j = 0; j < serversPerRack[i].length; j++) { @@ -376,6 +379,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } } + regionsPerRack = new int[numRacks][]; + primariesOfRegionsPerRack = new int[numRacks][]; // compute regionsPerRack if (numRacks > 1) { for (int i = 0 ; i < serversPerRack.length; i++) {