From 23972af60c981e2ade4c9acf710af21948b42e2b Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Wed, 23 Sep 2015 14:57:51 -0400 Subject: [PATCH] HBASE-14473 Compute region locality in parallel --- .../master/balancer/RegionLocationFinder.java | 100 ++++++++++++++++++--- 1 file changed, 88 insertions(+), 12 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java index c0c05d7..fd0b1f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java @@ -23,9 +23,21 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -33,15 +45,22 @@ import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.RegionLoad; +import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.regionserver.HRegion; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * This will find where data for a region is located in HDFS. It ranks @@ -50,32 +69,53 @@ import com.google.common.collect.Lists; * */ class RegionLocationFinder { - private static final Log LOG = LogFactory.getLog(RegionLocationFinder.class); - private Configuration conf; + private volatile long cacheTime; private volatile ClusterStatus status; private MasterServices services; + private final ListeningExecutorService executor; + private long lastFullRefresh = 0; private CacheLoader loader = - new CacheLoader() { + new CacheLoader() { - @Override - public HDFSBlocksDistribution load(HRegionInfo key) throws Exception { - return internalGetTopBlockLocation(key); - } - }; + public ListenableFuture reload(final HRegionInfo hri, + HDFSBlocksDistribution oldValue) throws Exception { + return executor.submit(new Callable() { + @Override + public HDFSBlocksDistribution call() throws Exception { + return internalGetTopBlockLocation(hri); + } + }); + } + + @Override + public HDFSBlocksDistribution load(HRegionInfo key) throws Exception { + return internalGetTopBlockLocation(key); + } + }; // The cache for where regions are located. private LoadingCache cache = null; + RegionLocationFinder() { + executor = MoreExecutors.listeningDecorator( + Executors.newScheduledThreadPool( + 5, + new ThreadFactoryBuilder(). + setDaemon(true) + .setNameFormat("region-location-%d") + .build())); + } + /** * Create a cache for region to list of servers - * @param mins Number of mins to cache + * @param time time to cache the locations * @return A new Cache. */ - private LoadingCache createCache(int mins) { - return CacheBuilder.newBuilder().expireAfterAccess(mins, TimeUnit.MINUTES).build(loader); + private LoadingCache createCache(long time) { + return CacheBuilder.newBuilder().expireAfterWrite(time, TimeUnit.MILLISECONDS).build(loader); } public Configuration getConf() { @@ -84,7 +124,8 @@ class RegionLocationFinder { public void setConf(Configuration conf) { this.conf = conf; - cache = createCache(conf.getInt("hbase.master.balancer.regionLocationCacheTime", 30)); + cacheTime = conf.getInt("hbase.master.balancer.regionLocationCacheTime", 240 * 1000 * 60); + cache = createCache(cacheTime); } public void setServices(MasterServices services) { @@ -92,7 +133,42 @@ class RegionLocationFinder { } public void setClusterStatus(ClusterStatus status) { + long currentTime = EnvironmentEdgeManager.currentTime(); this.status = status; + if (currentTime > lastFullRefresh + (cacheTime / 2)) { + // Only count the refresh if it includes user tables ( eg more than meta and namespace ). + lastFullRefresh = scheduleFullRefresh()?currentTime:lastFullRefresh; + } + + } + + /** + * Refresh all the region locations. + * + * @return true if region locations got refreshed. + */ + private boolean scheduleFullRefresh() { + // Protect from anything being null while starting up. + if (services == null) { + return false; + } + AssignmentManager am = services.getAssignmentManager(); + + if (am == null) { + return false; + } + RegionStates regionStates = am.getRegionStates(); + if (regionStates == null) { + return false; + } + + Set regions = regionStates.getRegionAssignments().keySet(); + boolean includesUserTables = false; + for (final HRegionInfo hri : regions) { + cache.refresh(hri); + includesUserTables = includesUserTables || !hri.isSystemTable(); + } + return includesUserTables; } protected List getTopBlockLocations(HRegionInfo region) { -- 2.5.3