From 320899b3aff0d240501e3a5e4c2cd1f6be533252 Mon Sep 17 00:00:00 2001 From: rahulgidwani Date: Thu, 15 Jan 2015 14:22:22 -0800 Subject: [PATCH] Potentially improve block locality during major compaction for old regions --- .../hadoop/hbase/regionserver/HRegionServer.java | 13 +++++++++---- .../compactions/CompactionConfiguration.java | 20 ++++++++++++++++++-- .../compactions/RatioBasedCompactionPolicy.java | 21 +++++++++++++++++---- 3 files changed, 44 insertions(+), 10 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 80844a9..e4424a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -28,6 +28,7 @@ import java.lang.management.MemoryUsage; import java.lang.reflect.Constructor; import java.net.BindException; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -591,10 +592,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); // Server to handle client requests. - String hostname = conf.get("hbase.regionserver.ipc.address", - Strings.domainNamePointerToHostName(DNS.getDefaultHost( - conf.get("hbase.regionserver.dns.interface", "default"), - conf.get("hbase.regionserver.dns.nameserver", "default")))); + String hostname = getHostname(conf); int port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT); // Creation of a HSA will force a resolve. @@ -655,6 +653,13 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa this.rsInfo.setInfoPort(putUpWebUI()); } + public static String getHostname(Configuration conf) throws UnknownHostException { + return conf.get("hbase.regionserver.ipc.address", + Strings.domainNamePointerToHostName(DNS.getDefaultHost( + conf.get("hbase.regionserver.dns.interface", "default"), + conf.get("hbase.regionserver.dns.nameserver", "default")))); + } + @Override public boolean registerService(Service instance) { /* diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java index dc89512..4f3530c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -47,6 +47,8 @@ public class CompactionConfiguration { static final Log LOG = LogFactory.getLog(CompactionConfiguration.class); private static final String CONFIG_PREFIX = "hbase.hstore.compaction."; + public static final String HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT = + "hbase.hstore.min.locality.to.skip.major.compact"; public static final String RATIO_KEY = CONFIG_PREFIX + "ratio"; public static final String MIN_KEY = CONFIG_PREFIX + "min"; public static final String MAX_KEY = CONFIG_PREFIX + "max"; @@ -63,6 +65,8 @@ public class CompactionConfiguration { long throttlePoint; long majorCompactionPeriod; float majorCompactionJitter; + final float minLocalityToForceCompact; + CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) { this.conf = conf; @@ -82,6 +86,7 @@ public class CompactionConfiguration { majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24*7); // Make it 0.5 so jitter has us fall evenly either side of when the compaction should run majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.50F); + minLocalityToForceCompact = conf.getFloat(HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT, 0f); LOG.info(this); } @@ -90,7 +95,7 @@ public class CompactionConfiguration { public String toString() { return String.format( "size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; throttle point %d;" - + " major period %d, major jitter %f", + + " major period %d, major jitter %f, min locality to compact %f\"", minCompactSize, maxCompactSize, minFilesToCompact, @@ -99,7 +104,9 @@ public class CompactionConfiguration { offPeekCompactionRatio, throttlePoint, majorCompactionPeriod, - majorCompactionJitter); + majorCompactionJitter, + minLocalityToForceCompact + ); } /** @@ -166,4 +173,13 @@ public class CompactionConfiguration { float getMajorCompactionJitter() { return majorCompactionJitter; } + + /** + * @return Block locality ratio, the ratio at which we will include old regions with a single + * store file for major compaction. Used to improve block locality for regions that + * haven't had writes in a while but are still being read. + */ + float getMinLocalityToForceCompact() { + return minLocalityToForceCompact; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java index c70b061..0613e77 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreUtils; @@ -297,10 +298,22 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { : now - minTimestamp.longValue(); if (sf.isMajorCompaction() && (cfTtl == HConstants.FOREVER || oldest < cfTtl)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping major compaction of " + this + - " because one (major) compacted file only and oldestTime " + - oldest + "ms is < ttl=" + cfTtl); + float blockLocalityIndex = sf.getHDFSBlockDistribution() + .getBlockLocalityIndex(HRegionServer.getHostname(comConf.conf)); + if (blockLocalityIndex > comConf.getMinLocalityToForceCompact()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Major compaction triggered on only store " + this + + "; to make hdfs blocks local, current locality: " + blockLocalityIndex + ); + } + result = true; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping major compaction of " + this + + " because one (major) compacted file only and oldestTime " + + oldest + "ms is < ttl=" + cfTtl); + } + } } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) { LOG.debug("Major compaction triggered on store " + this + -- 2.1.0