diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java index 498b95c..e186fa3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RackManager.java @@ -20,15 +20,24 @@ package org.apache.hadoop.hbase.master; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DaemonThreadFactory; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.ScriptBasedMapping; + +import com.google.common.annotations.VisibleForTesting; /** * Wrapper over the rack resolution utility in Hadoop. The rack resolution * utility in Hadoop does resolution from hosts to the racks they belong to. @@ -40,15 +49,28 @@ public class RackManager { public static final String UNKNOWN_RACK = "Unknown Rack"; private DNSToSwitchMapping switchMapping; + private int numThreads; + // the amount of time, in milliseconds, we give IP to rack determiner to resolve rack + private int timeout; + private ExecutorService executorService; + public final static String RACK_DETERMINER_CLASS = "hbase.util.ip.to.rack.determiner"; + public final static String IP_TO_RACK_DETERMINER_TIMEOUT = "hbase.ip.to.rack.determiner.timeout"; + private final static int DEFAULT_IP_TO_RACK_DETERMINER_TIMEOUT = 60000; + private final static String IP_TO_RACK_DETERMINER_THREADS = "hbase.ip.to.rack.determiner.threads"; + private final static int DEFAULT_IP_TO_RACK_DETERMINER_THREADS = 3; public RackManager() { } public RackManager(Configuration conf) { switchMapping = ReflectionUtils.instantiateWithCustomCtor( - conf.getClass("hbase.util.ip.to.rack.determiner", ScriptBasedMapping.class, + conf.getClass(RACK_DETERMINER_CLASS, ScriptBasedMapping.class, DNSToSwitchMapping.class).getName(), new Class[]{Configuration.class}, new Object[]{conf}); + timeout = conf.getInt(IP_TO_RACK_DETERMINER_TIMEOUT, DEFAULT_IP_TO_RACK_DETERMINER_TIMEOUT); + numThreads = conf.getInt(IP_TO_RACK_DETERMINER_THREADS, DEFAULT_IP_TO_RACK_DETERMINER_THREADS); + executorService = + Executors.newFixedThreadPool(numThreads, new DaemonThreadFactory("rackmanager")); } /** @@ -57,20 +79,43 @@ public class RackManager { * @param server the server for which to get the rack name * @return the rack name of the server */ - public String getRack(ServerName server) { + public String getRack(final ServerName server) { if (server == null) { return UNKNOWN_RACK; } - // just a note - switchMapping caches results (at least the implementation should unless the - // resolution is really a lightweight process) - List racks = switchMapping.resolve(Arrays.asList(server.getHostname())); - if (racks != null && !racks.isEmpty()) { - return racks.get(0); + final List racks = new ArrayList(); + final String hostname = server.getHostname(); + + Future f = (Future)executorService.submit(new Runnable() { + + @Override + public void run() { + // just a note - switchMapping caches results (at least the implementation should unless the + // resolution is really a lightweight process) + racks.addAll(switchMapping.resolve(Arrays.asList(hostname))); + } + }); + try { + f.get(timeout, TimeUnit.MILLISECONDS); + if (racks != null && !racks.isEmpty()) { + return racks.get(0); + } + } catch (TimeoutException e) { + f.cancel(true); + LOG.warn("Timeout retrieving rack for " + hostname); + } catch (Exception e) { + LOG.warn("Got exception retrieving rack for " + hostname, e); } return UNKNOWN_RACK; } + @VisibleForTesting + public int getActiveCount() { + ThreadPoolExecutor exe = (ThreadPoolExecutor)executorService; + return exe.getActiveCount(); + } + /** * Same as {@link #getRack(ServerName)} except that a list is passed * @param servers list of servers we're requesting racks information for diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 0a2d7b0..30ecf79 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -212,7 +212,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionsToIndex = new HashMap(numRegions); servers = new ServerName[numServers]; serversPerHost = new int[numHosts][]; - serversPerRack = new int[numRacks][]; regions = new HRegionInfo[numRegions]; regionIndexToServerIndex = new int[numRegions]; initialRegionIndexToServerIndex = new int[numRegions]; @@ -226,10 +225,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { serverIndexToRackIndex = new int[numServers]; regionsPerServer = new int[numServers][]; regionsPerHost = new int[numHosts][]; - regionsPerRack = new int[numRacks][]; primariesOfRegionsPerServer = new int[numServers][]; primariesOfRegionsPerHost = new int[numHosts][]; - primariesOfRegionsPerRack = new int[numRacks][]; int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0; @@ -270,7 +267,12 @@ public abstract class BaseLoadBalancer implements LoadBalancer { int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); serverIndexToHostIndex[serverIndex] = hostIndex; - int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); + String rack = this.rackManager.getRack(entry.getKey()); + if (!racksToIndex.containsKey(rack)) { + racksToIndex.put(rack, numRacks++); + serversPerRackList.add(new ArrayList()); + } + int rackIndex = racksToIndex.get(rack); serverIndexToRackIndex[serverIndex] = rackIndex; for (HRegionInfo region : entry.getValue()) { @@ -295,6 +297,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } } + serversPerRack = new int[numRacks][]; for (int i = 0; i < serversPerRackList.size(); i++) { serversPerRack[i] = new int[serversPerRackList.get(i).size()]; for (int j = 0; j < serversPerRack[i].length; j++) { @@ -376,6 +379,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } } + regionsPerRack = new int[numRacks][]; + primariesOfRegionsPerRack = new int[numRacks][]; // compute regionsPerRack if (numRacks > 1) { for (int i = 0 ; i < serversPerRack.length; i++) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java index 73ae11e..a9bce91 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java @@ -50,6 +50,8 @@ import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.ScriptBasedMapping; import org.junit.BeforeClass; @@ -85,6 +87,11 @@ public class TestBaseLoadBalancer extends BalancerTestBase { @Override public List resolve(List names) { List ret = new ArrayList(names.size()); + try { + Threads.sleep(30000); + } catch (Exception e) { + LOG.error("Resolver got ", e); + } for (String name : names) { ret.add(RACK); } @@ -96,6 +103,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase { public static void beforeAllTests() throws Exception { Configuration conf = HBaseConfiguration.create(); conf.setClass("hbase.util.ip.to.rack.determiner", MockMapping.class, DNSToSwitchMapping.class); + conf.setInt(RackManager.IP_TO_RACK_DETERMINER_TIMEOUT, 4); loadBalancer = new MockBalancer(); loadBalancer.setConf(conf); MasterServices st = Mockito.mock(MasterServices.class); @@ -118,6 +126,20 @@ public class TestBaseLoadBalancer extends BalancerTestBase { } } + @Test (timeout=30000) + public void testRackResolutionInterruption() throws Exception { + RackManager rackMgr = new RackManager(loadBalancer.getConf()); + rackMgr.getRack(ServerName.valueOf("a:2", 12)); + int cnt; + long start = EnvironmentEdgeManager.currentTime(); + do { + cnt = rackMgr.getActiveCount(); + } while (cnt > 0); + long duration = EnvironmentEdgeManager.currentTime() - start; + // check that threads are free + assertEquals(rackMgr.getActiveCount(), 0); + } + public static class MockBalancer extends BaseLoadBalancer { @Override