From d24eb025165358d7c98e8ebb7271420232479679 Mon Sep 17 00:00:00 2001 From: "terence.yoo" Date: Thu, 31 Dec 2015 10:41:24 +0900 Subject: [PATCH] Do not skip large files when the sum of the size of TTL expired store files is greater than threshold --- .../compactions/CompactionConfiguration.java | 13 ++ .../compactions/RatioBasedCompactionPolicy.java | 42 +++++-- .../hbase/regionserver/TestMajorCompaction.java | 137 +++++++++++++++++++-- 3 files changed, 175 insertions(+), 17 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java index 62e7c7c..82a8c39 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -63,6 +63,9 @@ public class CompactionConfiguration { public static final String HBASE_HSTORE_OFFPEAK_START_HOUR = "hbase.offpeak.start.hour"; public static final String HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT = "hbase.hstore.min.locality.to.skip.major.compact"; + public static final String HBASE_HSTORE_EXPIRED_SIZE_RATIO_THRESHOLD = + "hbase.hstore.expired.size.ratio"; + public static final float HBASE_HSTORE_EXPIRED_SIZE_RATIO_THRESHOLD_DEFAULT = 0.5f; Configuration conf; StoreConfigInformation storeConfigInfo; @@ -79,6 +82,7 @@ public class CompactionConfiguration { private final long majorCompactionPeriod; private final float majorCompactionJitter; private final float minLocalityToForceCompact; + private final float expiredSizeRatioThreshold; CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) { this.conf = conf; @@ -101,6 +105,8 @@ public class CompactionConfiguration { // Make it 0.5 so jitter has us fall evenly either side of when the compaction should run majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.50F); minLocalityToForceCompact = conf.getFloat(HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT, 0f); + expiredSizeRatioThreshold = conf.getFloat(HBASE_HSTORE_EXPIRED_SIZE_RATIO_THRESHOLD, + HBASE_HSTORE_EXPIRED_SIZE_RATIO_THRESHOLD_DEFAULT); LOG.info(this); } @@ -207,4 +213,11 @@ public class CompactionConfiguration { return getMaxCompactSize(); } } + + /** + * @return TTL Expired store file size ratio threshold not to skip large file selection + */ + public float getExpiredSizeRatioThreshold() { + return expiredSizeRatioThreshold; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java index a823d7c..8682321 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.regionserver.StoreUtils; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Collections2; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * The default algorithm for selecting files for compaction. @@ -130,11 +131,34 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { */ private ArrayList skipLargeFiles(ArrayList candidates, boolean mayUseOffpeak) { - int pos = 0; - while (pos < candidates.size() && !candidates.get(pos).isReference() - && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize(mayUseOffpeak))) { - ++pos; + int pos; + long largeAndExpiredFileSizeSum = 0; + long allFileSizeSum = 0; + + for(pos = 0; pos < candidates.size(); pos++) { + long fileSize = candidates.get(pos).getReader().length(); + if (candidates.get(pos).isReference() || fileSize <= comConf.getMaxCompactSize(mayUseOffpeak)) + break; + + long oldest = getOldest(candidates.get(pos), EnvironmentEdgeManager.currentTime()); + long cfTtl = this.storeConfigInfo.getStoreFileTtl(); + allFileSizeSum += fileSize; + if (cfTtl != HConstants.FOREVER && oldest > cfTtl) + largeAndExpiredFileSizeSum += fileSize; } + + // Do not skip large files when the sum of the size of TTL expired store files is greater + // than threshold. Major compaction should be triggered to delete TTL expired records. + if (largeAndExpiredFileSizeSum > 0 && allFileSizeSum > 0) { + float expiredRatio = largeAndExpiredFileSizeSum / allFileSizeSum; + float threshold = comConf.getExpiredSizeRatioThreshold(); + if (expiredRatio > threshold) { + LOG.debug("Some files have TTL expired records. Expired ratio is " + expiredRatio + + ". Threshold is " + threshold + ". Keeping all candidates."); + return candidates; + } + } + if (pos > 0) { LOG.debug("Some files are too large. Excluding " + pos + " files from compaction candidates"); @@ -143,6 +167,11 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { return candidates; } + private long getOldest(StoreFile storeFile, long now) { + Long minTimestamp = storeFile.getMinimumTimestamp(); + return (minTimestamp == null) ? Long.MIN_VALUE : now - minTimestamp; + } + /** * @param candidates pre-filtrate * @return filtered subset @@ -295,10 +324,7 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { if (filesToCompact.size() == 1) { // Single file StoreFile sf = filesToCompact.iterator().next(); - Long minTimestamp = sf.getMinimumTimestamp(); - long oldest = (minTimestamp == null) - ? Long.MIN_VALUE - : now - minTimestamp.longValue(); + long oldest = getOldest(sf, now); if (sf.isMajorCompaction() && (cfTtl == HConstants.FOREVER || oldest < cfTtl)) { float blockLocalityIndex = sf.getHDFSBlockDistribution().getBlockLocalityIndex( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java index 3ef89ad..399e64f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java @@ -18,9 +18,9 @@ */ package org.apache.hadoop.hbase.regionserver; -import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; -import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES; -import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; +import static org.apache.hadoop.hbase.HBaseTestingUtility.*; +import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR; +import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -37,12 +37,7 @@ import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseTestCase; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; @@ -58,6 +53,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPoli import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.wal.WAL; import org.junit.After; import org.junit.Before; @@ -85,6 +81,8 @@ public class TestMajorCompaction { private int compactionThreshold; private byte[] secondRowBytes, thirdRowBytes; private static final long MAX_FILES_TO_COMPACT = 10; + private static int TTL = 10; // second + private static final int MAX_VERSIONS = 3; /** constructor */ public TestMajorCompaction() { @@ -478,4 +476,125 @@ public class TestMajorCompaction { s.close(); assertEquals(0, counter); } + + /** + * Test for HBASE-15055 + * Test major compaction with both of TTL and hbase.hstore.compaction.max.size + * 1 store file + */ + @Test + public void testMajorCompactionWithTtlAndCompactionMaxSize1() throws Exception { + testMajorCompactionWithTtlAndCompactionMaxSize(1); + } + + /** + * Test for HBASE-15055 + * Test major compaction with both of TTL and hbase.hstore.compaction.max.size + * 3 store files + */ + @Test + public void testMajorCompactionWithTtlAndCompactionMaxSize3() throws Exception { + testMajorCompactionWithTtlAndCompactionMaxSize(3); + } + + /** + * Test for HBASE-15055 + * Test major compaction with both of TTL and hbase.hstore.compaction.max.size + */ + private void testMajorCompactionWithTtlAndCompactionMaxSize(int storeFiles) throws Exception { + int delay = (int) (TTL / 3.0 * 1000); + float jitterPct = 0.0f; + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, delay); + conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct); + long verySmallThreshold = 1; + conf.setLong("hbase.hstore.compaction.max.size", verySmallThreshold); + + long notExpiredRecordCount = 0; + + recreateRegionWithTTL(); + + HStore s = ((HStore) r.getStore(COLUMN_FAMILY)); + s.storeEngine.getCompactionPolicy().setConf(conf); + try { + for (int i = 0; i < storeFiles; i++) + notExpiredRecordCount = createStoreFileWithSomeExpiration(r); + + assertEquals(storeFiles, s.getStorefilesCount()); + long storefilesSizeBeforeMC = s.getStorefilesSize(); + + // Should not be major compacted yet. No TTL expiration. + Thread.sleep(delay); + r.compact(false); + assertEquals(storeFiles, s.getStorefilesCount()); + assertTrue(storefilesSizeBeforeMC == s.getStorefilesSize()); + + // Should not be major compacted yet. No TTL expiration. + Thread.sleep(delay); + r.compact(false); + assertEquals(storeFiles, s.getStorefilesCount()); + assertTrue(storefilesSizeBeforeMC == s.getStorefilesSize()); + + // Should be major compacted. Some TTL expiration. + Thread.sleep(delay); + r.compact(false); + assertEquals(1, s.getStorefilesCount()); + assertTrue(storefilesSizeBeforeMC > s.getStorefilesSize()); + + assertEquals(notExpiredRecordCount, countRecords()); + } finally { + // reset the timed compaction settings + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000 * 60 * 60 * 24); + conf.setFloat("hbase.hregion.majorcompaction.jitter", 0.20F); + // run a major to reset the cache + createStoreFile(r); + r.compact(true); + assertEquals(1, s.getStorefilesCount()); + } + } + + private void recreateRegionWithTTL() throws Exception { + tearDown(); + + HTableDescriptor htd = UTIL.createTableDescriptor(TableName.valueOf(name.getMethodName()), + HColumnDescriptor.DEFAULT_MIN_VERSIONS, MAX_VERSIONS, TTL, + HColumnDescriptor.DEFAULT_KEEP_DELETED); + this.r = UTIL.createLocalHRegion(htd, null, null); + } + + private long createStoreFileWithSomeExpiration(final Region region) throws IOException { + final byte [] START_KEY_BYTES_1 = {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR}; + final byte [] START_KEY_BYTES_2 = {LAST_CHAR, FIRST_CHAR, FIRST_CHAR}; + Table loader = new RegionAsTable(region); + + long now = EnvironmentEdgeManager.currentTime(); + long notExpiredTimestamp = now + TTL * 2 * 1000; + + // will not be expired data + long notExpiredRecordCount = HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY), + null, START_KEY_BYTES_1, START_KEY_BYTES_2, notExpiredTimestamp); + + // will be expired data + HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY), + null, START_KEY_BYTES_2, null, now); + + region.flush(true); + + return notExpiredRecordCount; + } + + private long countRecords() throws IOException { + long counter = 0; + RegionScanner scanner = r.getScanner(new Scan()); + do { + List cells = new ArrayList<>(); + boolean existNext = scanner.next(cells); + if (existNext || cells.size() > 0) { + counter++; + } + if (!existNext) { + break; + } + } while (true); + return counter; + } } -- 2.5.4 (Apple Git-61)