From 20504a1dda06e516e078c7f7042c8b1575fe5414 Mon Sep 17 00:00:00 2001 From: "terence.yoo" Date: Thu, 31 Dec 2015 10:41:24 +0900 Subject: [PATCH] Do not skip large files when the sum of the size of TTL expired store files is greater than threshold --- .../compactions/CompactionConfiguration.java | 15 ++- .../compactions/RatioBasedCompactionPolicy.java | 53 ++++++-- .../hbase/regionserver/TestMajorCompaction.java | 143 +++++++++++++++++++++ 3 files changed, 198 insertions(+), 13 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java index 633477e..c06f91e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -21,9 +21,9 @@ package org.apache.hadoop.hbase.regionserver.compactions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; /** @@ -63,6 +63,9 @@ public class CompactionConfiguration { public static final String HBASE_HSTORE_OFFPEAK_START_HOUR = "hbase.offpeak.start.hour"; public static final String HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT = "hbase.hstore.min.locality.to.skip.major.compact"; + public static final String HBASE_HSTORE_EXPIRED_SIZE_RATIO_THRESHOLD = + "hbase.hstore.expired.size.ratio"; + public static final float HBASE_HSTORE_EXPIRED_SIZE_RATIO_THRESHOLD_DEFAULT = 0.5f; public static final String HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT = "hbase.hfile.compaction.discharger.thread.count"; @@ -82,6 +85,7 @@ public class CompactionConfiguration { private final long majorCompactionPeriod; private final float majorCompactionJitter; private final float minLocalityToForceCompact; + private final float expiredSizeRatioThreshold; CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) { this.conf = conf; @@ -104,6 +108,8 @@ public class CompactionConfiguration { // Make it 0.5 so jitter has us fall evenly either side of when the compaction should run majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.50F); minLocalityToForceCompact = conf.getFloat(HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT, 0f); + expiredSizeRatioThreshold = conf.getFloat(HBASE_HSTORE_EXPIRED_SIZE_RATIO_THRESHOLD, + HBASE_HSTORE_EXPIRED_SIZE_RATIO_THRESHOLD_DEFAULT); LOG.info(this); } @@ -210,4 +216,11 @@ public class CompactionConfiguration { return getMaxCompactSize(); } } + + /** + * @return TTL Expired store file size ratio threshold not to skip large file selection + */ + public float getExpiredSizeRatioThreshold() { + return expiredSizeRatioThreshold; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java index a823d7c..a96123c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java @@ -19,6 +19,10 @@ package org.apache.hadoop.hbase.regionserver.compactions; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; + import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -34,10 +38,7 @@ import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreUtils; - -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.Collections2; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * The default algorithm for selecting files for compaction. @@ -130,11 +131,37 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { */ private ArrayList skipLargeFiles(ArrayList candidates, boolean mayUseOffpeak) { - int pos = 0; - while (pos < candidates.size() && !candidates.get(pos).isReference() - && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize(mayUseOffpeak))) { - ++pos; + int pos; + long largeAndExpiredFileSizeSum = 0; + long allFileSizeSum = 0; + + for(pos = 0; pos < candidates.size(); pos++) { + long fileSize = candidates.get(pos).getReader().length(); + if (candidates.get(pos).isReference() + || fileSize <= comConf.getMaxCompactSize(mayUseOffpeak)) { + break; + } + + long oldest = getOldest(candidates.get(pos), EnvironmentEdgeManager.currentTime()); + long cfTtl = this.storeConfigInfo.getStoreFileTtl(); + allFileSizeSum += fileSize; + if (cfTtl != HConstants.FOREVER && oldest > cfTtl) { + largeAndExpiredFileSizeSum += fileSize; + } } + + // Do not skip large files when the sum of the size of TTL expired store files is greater + // than threshold. Major compaction should be triggered to delete TTL expired records. + if (largeAndExpiredFileSizeSum > 0 && allFileSizeSum > 0) { + float expiredRatio = (float) largeAndExpiredFileSizeSum / allFileSizeSum; + float threshold = comConf.getExpiredSizeRatioThreshold(); + if (expiredRatio > threshold) { + LOG.debug("Some files have TTL expired records. Expired ratio is " + expiredRatio + + ". Threshold is " + threshold + ". Keeping all candidates."); + return candidates; + } + } + if (pos > 0) { LOG.debug("Some files are too large. Excluding " + pos + " files from compaction candidates"); @@ -143,6 +170,11 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { return candidates; } + private long getOldest(StoreFile storeFile, long now) { + Long minTimestamp = storeFile.getMinimumTimestamp(); + return (minTimestamp == null) ? Long.MIN_VALUE : now - minTimestamp; + } + /** * @param candidates pre-filtrate * @return filtered subset @@ -295,10 +327,7 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { if (filesToCompact.size() == 1) { // Single file StoreFile sf = filesToCompact.iterator().next(); - Long minTimestamp = sf.getMinimumTimestamp(); - long oldest = (minTimestamp == null) - ? Long.MIN_VALUE - : now - minTimestamp.longValue(); + long oldest = getOldest(sf, now); if (sf.isMajorCompaction() && (cfTtl == HConstants.FOREVER || oldest < cfTtl)) { float blockLocalityIndex = sf.getHDFSBlockDistribution().getBlockLocalityIndex( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java index 3ef89ad..e9e03e4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR; +import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR; import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES; import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; @@ -41,8 +43,10 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; @@ -58,6 +62,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPoli import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.wal.WAL; import org.junit.After; import org.junit.Before; @@ -85,6 +90,8 @@ public class TestMajorCompaction { private int compactionThreshold; private byte[] secondRowBytes, thirdRowBytes; private static final long MAX_FILES_TO_COMPACT = 10; + private static int TTL = 10; // second + private static final int MAX_VERSIONS = 3; /** constructor */ public TestMajorCompaction() { @@ -478,4 +485,140 @@ public class TestMajorCompaction { s.close(); assertEquals(0, counter); } + + /** + * Test for HBASE-15055 + * Test major compaction with both of TTL and hbase.hstore.compaction.max.size with 1 store file + * The expiredRatio should be 1.0. + */ + @Test + public void testMajorCompactionWithTtlAndCompactionMaxSize1() throws Exception { + testMajorCompactionWithTtlAndCompactionMaxSize(1); + } + + /** + * Test for HBASE-15055 + * Test major compaction with both of TTL and hbase.hstore.compaction.max.size with 3 store files + * The expiredRatio should be greater than the default threshold, 0.5. + */ + @Test + public void testMajorCompactionWithTtlAndCompactionMaxSize3() throws Exception { + testMajorCompactionWithTtlAndCompactionMaxSize(3); + } + + /** + * Test for HBASE-15055 + * Test major compaction with both of TTL and hbase.hstore.compaction.max.size + */ + private void testMajorCompactionWithTtlAndCompactionMaxSize(int storeFiles) throws Exception { + int delay = (int) (TTL / 3.0 * 1000); + float jitterPct = 0.0f; + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, delay); + conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct); + long verySmallThreshold = 1; + conf.setLong("hbase.hstore.compaction.max.size", verySmallThreshold); + + long notExpiredRecordCount = 0; + + recreateRegionWithTTL(); + + HStore s = ((HStore) r.getStore(COLUMN_FAMILY)); + s.storeEngine.getCompactionPolicy().setConf(conf); + try { + if (storeFiles == 1) { + notExpiredRecordCount = createStoreFile(r, true); + } else { + for (int i = 0; i < storeFiles - 1; i++) + createStoreFile(r, true); + + // The store file that does not TTL expired records should be created if the number of + // store file is greater than 1. + notExpiredRecordCount = createStoreFile(r, false); + } + + assertEquals(storeFiles, s.getStorefilesCount()); + long storefilesSizeBeforeMC = s.getStorefilesSize(); + + // Should not be major compacted yet. No TTL expiration. + Thread.sleep(delay); + r.compact(false); + assertEquals(storeFiles, s.getStorefilesCount()); + assertTrue(storefilesSizeBeforeMC == s.getStorefilesSize()); + + // Should not be major compacted yet. No TTL expiration. + Thread.sleep(delay); + r.compact(false); + assertEquals(storeFiles, s.getStorefilesCount()); + assertTrue(storefilesSizeBeforeMC == s.getStorefilesSize()); + + // Should be major compacted. Some TTL expiration. + Thread.sleep(delay); + r.compact(false); + assertEquals(1, s.getStorefilesCount()); + assertTrue(storefilesSizeBeforeMC > s.getStorefilesSize()); + + assertEquals(notExpiredRecordCount, countRecords()); + } finally { + // reset the timed compaction settings + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000 * 60 * 60 * 24); + conf.setFloat("hbase.hregion.majorcompaction.jitter", 0.20F); + // run a major to reset the cache + createStoreFile(r); + r.compact(true); + assertEquals(1, s.getStorefilesCount()); + } + } + + private void recreateRegionWithTTL() throws Exception { + tearDown(); + + HTableDescriptor htd = UTIL.createTableDescriptor(TableName.valueOf(name.getMethodName()), + HColumnDescriptor.DEFAULT_MIN_VERSIONS, MAX_VERSIONS, TTL, + HColumnDescriptor.DEFAULT_KEEP_DELETED); + this.r = UTIL.createLocalHRegion(htd, null, null); + } + + /** + * The size of store file that has TTL expired records is slightly greater than + * the size of store file that does not have TTL expired records. + */ + private long createStoreFile(final Region region, boolean hasExpiredRecords) + throws IOException { + final byte [] START_KEY_BYTES_1 = {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR}; + final byte [] START_KEY_BYTES_2 = {LAST_CHAR, FIRST_CHAR, FIRST_CHAR}; + Table loader = new RegionAsTable(region); + + long now = EnvironmentEdgeManager.currentTime(); + long notExpiredTimestamp = now + TTL * 2 * 1000; + + // The data will not be expire + long notExpiredRecordCount = HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY), + null, START_KEY_BYTES_1, START_KEY_BYTES_2, notExpiredTimestamp); + + // The data will be expired + if (hasExpiredRecords) { + HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY), + null, START_KEY_BYTES_2, null, now); + } + + region.flush(true); + + return notExpiredRecordCount; + } + + private long countRecords() throws IOException { + long counter = 0; + RegionScanner scanner = r.getScanner(new Scan()); + do { + List cells = new ArrayList<>(); + boolean existNext = scanner.next(cells); + if (existNext || cells.size() > 0) { + counter++; + } + if (!existNext) { + break; + } + } while (true); + return counter; + } } -- 2.5.4 (Apple Git-61)