From 6e116e8dec3e978ccf1d94402cc5035f4f48ed0d Mon Sep 17 00:00:00 2001 From: "terence.yoo" Date: Thu, 31 Dec 2015 10:41:24 +0900 Subject: [PATCH] skip large files only if there is no TTL expired record --- .../compactions/RatioBasedCompactionPolicy.java | 17 ++- .../hbase/regionserver/TestMajorCompaction.java | 124 +++++++++++++++++++-- 2 files changed, 128 insertions(+), 13 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java index a823d7c..8d027ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.regionserver.StoreUtils; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Collections2; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * The default algorithm for selecting files for compaction. @@ -133,6 +134,12 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { int pos = 0; while (pos < candidates.size() && !candidates.get(pos).isReference() && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize(mayUseOffpeak))) { + long oldest = getOldest(candidates.get(pos), EnvironmentEdgeManager.currentTime()); + long cfTtl = this.storeConfigInfo.getStoreFileTtl(); + if (cfTtl != HConstants.FOREVER && oldest > cfTtl) { + // To keep all of candidates, return immediately if there is any TTL expiration + return candidates; + } ++pos; } if (pos > 0) { @@ -143,6 +150,11 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { return candidates; } + private long getOldest(StoreFile storeFile, long now) { + Long minTimestamp = storeFile.getMinimumTimestamp(); + return (minTimestamp == null) ? Long.MIN_VALUE : now - minTimestamp; + } + /** * @param candidates pre-filtrate * @return filtered subset @@ -295,10 +307,7 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { if (filesToCompact.size() == 1) { // Single file StoreFile sf = filesToCompact.iterator().next(); - Long minTimestamp = sf.getMinimumTimestamp(); - long oldest = (minTimestamp == null) - ? Long.MIN_VALUE - : now - minTimestamp.longValue(); + long oldest = getOldest(sf, now); if (sf.isMajorCompaction() && (cfTtl == HConstants.FOREVER || oldest < cfTtl)) { float blockLocalityIndex = sf.getHDFSBlockDistribution().getBlockLocalityIndex( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java index 3ef89ad..79f5504 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java @@ -18,9 +18,9 @@ */ package org.apache.hadoop.hbase.regionserver; -import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; -import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES; -import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; +import static org.apache.hadoop.hbase.HBaseTestingUtility.*; +import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR; +import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -37,12 +37,7 @@ import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseTestCase; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; @@ -58,6 +53,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPoli import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.wal.WAL; import org.junit.After; import org.junit.Before; @@ -85,6 +81,8 @@ public class TestMajorCompaction { private int compactionThreshold; private byte[] secondRowBytes, thirdRowBytes; private static final long MAX_FILES_TO_COMPACT = 10; + private static int TTL = 3; // second + private static final int MAX_VERSIONS = 3; /** constructor */ public TestMajorCompaction() { @@ -478,4 +476,112 @@ public class TestMajorCompaction { s.close(); assertEquals(0, counter); } + + /** + * Test for HBASE-15055 + * Test major compaction with both of TTL and hbase.hstore.compaction.max.size + * 1 store file + */ + @Test + public void testMajorCompactionWithTtlAndCompactionMaxSize1() throws Exception { + testMajorCompactionWithTtlAndCompactionMaxSize(1); + } + + /** + * Test for HBASE-15055 + * Test major compaction with both of TTL and hbase.hstore.compaction.max.size + * 3 store files + */ + @Test + public void testMajorCompactionWithTtlAndCompactionMaxSize3() throws Exception { + testMajorCompactionWithTtlAndCompactionMaxSize(3); + } + + private void testMajorCompactionWithTtlAndCompactionMaxSize(int storeFiles) throws Exception { + int delay = TTL / 2 * 1000; + float jitterPct = 0.0f; + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, delay); + conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct); + long verySmallThreshold = 1; + conf.setLong("hbase.hstore.compaction.max.size", verySmallThreshold); + + long notExpiredRecordCount = 0; + + recreateRegionWithTTL(); + + HStore s = ((HStore) r.getStore(COLUMN_FAMILY)); + s.storeEngine.getCompactionPolicy().setConf(conf); + try { + for (int i = 0; i < storeFiles; i++) + notExpiredRecordCount = createStoreFileWithSomeExpiration(r); + + assertEquals(storeFiles, s.getStorefilesCount()); + long storefilesSizeBeforeMC = s.getStorefilesSize(); + + // wait for expiration some records + Thread.sleep(TTL * 1000); + + r.compact(false); + + // should be major compacted + assertEquals(1, s.getStorefilesCount()); + assertTrue(storefilesSizeBeforeMC > s.getStorefilesSize()); + + assertEquals(notExpiredRecordCount, countRecords()); + } finally { + // reset the timed compaction settings + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000 * 60 * 60 * 24); + conf.setFloat("hbase.hregion.majorcompaction.jitter", 0.20F); + // run a major to reset the cache + createStoreFile(r); + r.compact(true); + assertEquals(1, s.getStorefilesCount()); + } + } + + private void recreateRegionWithTTL() throws Exception { + tearDown(); + + HTableDescriptor htd = UTIL.createTableDescriptor(TableName.valueOf(name.getMethodName()), + HColumnDescriptor.DEFAULT_MIN_VERSIONS, MAX_VERSIONS, TTL, + HColumnDescriptor.DEFAULT_KEEP_DELETED); + this.r = UTIL.createLocalHRegion(htd, null, null); + } + + private long createStoreFileWithSomeExpiration(final Region region) throws IOException { + final byte [] START_KEY_BYTES_1 = {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR}; + final byte [] START_KEY_BYTES_2 = {LAST_CHAR, FIRST_CHAR, FIRST_CHAR}; + Table loader = new RegionAsTable(region); + + long now = EnvironmentEdgeManager.currentTime(); + long notExpiredTimestamp = now + TTL * 2 * 1000; + + // will not be expired data + long notExpiredRecordCount = HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY), + null, START_KEY_BYTES_1, START_KEY_BYTES_2, notExpiredTimestamp); + + // will be expired data + HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY), + null, START_KEY_BYTES_2, null, now); + + region.flush(true); + + return notExpiredRecordCount; + } + + private long countRecords() throws IOException { + long counter = 0; + RegionScanner scanner = r.getScanner(new Scan()); + do { + List cells = new ArrayList<>(); + boolean existNext = scanner.next(cells); + if (existNext || cells.size() > 0) { + counter++; + } + if (!existNext) { + break; + } + } while (true); + return counter; + } } -- 2.5.4 (Apple Git-61)