From 9502e6492ecbdb2e34bbbe5295b7488003e389b1 Mon Sep 17 00:00:00 2001 From: "terence.yoo" Date: Thu, 31 Dec 2015 00:37:13 +0900 Subject: [PATCH] skip large files only if there is no TTL expired record --- .../compactions/RatioBasedCompactionPolicy.java | 15 +- .../regionserver/TestMajorCompactionWithTTL.java | 184 +++++++++++++++++++++ 2 files changed, 195 insertions(+), 4 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompactionWithTTL.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java index a823d7c..72f5fa1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java @@ -133,6 +133,11 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { int pos = 0; while (pos < candidates.size() && !candidates.get(pos).isReference() && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize(mayUseOffpeak))) { + long oldest = getOldest(candidates.get(pos), System.currentTimeMillis()); + long cfTtl = this.storeConfigInfo.getStoreFileTtl(); + if (cfTtl != HConstants.FOREVER && oldest > cfTtl) + return candidates; + ++pos; } if (pos > 0) { @@ -143,6 +148,11 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { return candidates; } + private long getOldest(StoreFile storeFile, long now) { + Long minTimestamp = storeFile.getMinimumTimestamp(); + return (minTimestamp == null) ? Long.MIN_VALUE : now - minTimestamp; + } + /** * @param candidates pre-filtrate * @return filtered subset @@ -295,10 +305,7 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { if (filesToCompact.size() == 1) { // Single file StoreFile sf = filesToCompact.iterator().next(); - Long minTimestamp = sf.getMinimumTimestamp(); - long oldest = (minTimestamp == null) - ? Long.MIN_VALUE - : now - minTimestamp.longValue(); + long oldest = getOldest(sf, now); if (sf.isMajorCompaction() && (cfTtl == HConstants.FOREVER || oldest < cfTtl)) { float blockLocalityIndex = sf.getHDFSBlockDistribution().getBlockLocalityIndex( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompactionWithTTL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompactionWithTTL.java new file mode 100644 index 0000000..05ac29f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompactionWithTTL.java @@ -0,0 +1,184 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.*; + +import static org.apache.hadoop.hbase.HBaseTestingUtility.*; +import static org.junit.Assert.*; + +/** + * Test major compactions + */ +@Category({RegionServerTests.class, SmallTests.class}) +public class TestMajorCompactionWithTTL { + @Rule public TestName name = new TestName(); + private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU(); + protected Configuration conf = UTIL.getConfiguration(); + + private Region r = null; + private static final byte [] COLUMN_FAMILY = fam1; + private static final int MAX_VERSIONS = 3; + private static int TTL = 3; // second + + /** constructor */ + public TestMajorCompactionWithTTL() { + super(); + + // Set cache flush size to 1MB + conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024); + conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100); + } + + @Before + public void setUp() throws Exception { + HTableDescriptor htd = UTIL.createTableDescriptor(TableName.valueOf(name.getMethodName()), + HColumnDescriptor.DEFAULT_MIN_VERSIONS, MAX_VERSIONS, TTL, + HColumnDescriptor.DEFAULT_KEEP_DELETED); + this.r = UTIL.createLocalHRegion(htd, null, null); + } + + @After + public void tearDown() throws Exception { + WAL wal = ((HRegion)r).getWAL(); + ((HRegion)r).close(); + wal.close(); + } + + private void createStoreFile(final Region region) throws IOException { + createStoreFile(region, Bytes.toString(COLUMN_FAMILY)); + } + + private void createStoreFile(final Region region, String family) throws IOException { + Table loader = new RegionAsTable(region); + HBaseTestCase.addContent(loader, family); + region.flush(true); + } + + private long createStoreFileHasSomeExpiredData(final Region region) throws IOException { + final byte [] START_KEY_BYTES_1 = {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR}; + final byte [] START_KEY_BYTES_2 = {LAST_CHAR, FIRST_CHAR, FIRST_CHAR}; + Table loader = new RegionAsTable(region); + + long now = System.currentTimeMillis(); + long notExpiredTimestamp = now + TTL * 2 * 1000; + + // not expired data + long notExpiredRecordCount = HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY), + null, START_KEY_BYTES_1, START_KEY_BYTES_2, notExpiredTimestamp); + + // expired data + HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY), + null, START_KEY_BYTES_2, null, now); + + region.flush(true); + + return notExpiredRecordCount; + } + + /** + * Test major compaction with both of TTL and hbase.hstore.compaction.max.size + * 1 store file + */ + @Test + public void testMajorCompactionWithTtlAndCompactionMaxSize1() throws Exception { + testMajorCompactionWithTtlAndCompactionMaxSize(1); + } + + /** + * Test major compaction with both of TTL and hbase.hstore.compaction.max.size + * 3 store files + */ + @Test + public void testMajorCompactionWithTtlAndCompactionMaxSize3() throws Exception { + testMajorCompactionWithTtlAndCompactionMaxSize(3); + } + + private void testMajorCompactionWithTtlAndCompactionMaxSize(int storeFiles) throws IOException, + InterruptedException { + int delay = TTL / 2 * 1000; + float jitterPct = 0.0f; + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, delay); + conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct); + long verySmallThreshold = 1; + conf.setLong("hbase.hstore.compaction.max.size", verySmallThreshold); + + long notExpiredRecordCount = 0; + + HStore s = ((HStore) r.getStore(COLUMN_FAMILY)); + s.storeEngine.getCompactionPolicy().setConf(conf); + try { + for (int i = 0; i < storeFiles; i++) + notExpiredRecordCount = createStoreFileHasSomeExpiredData(r); + + assertEquals(storeFiles, s.getStorefilesCount()); + long storefilesSizeBeforeMC = s.getStorefilesSize(); + + // wait for expiration some records + Thread.sleep(TTL * 1000); + + r.compact(false); + + // should be major compacted + assertEquals(1, s.getStorefilesCount()); + assertTrue(storefilesSizeBeforeMC > s.getStorefilesSize()); + + assertEquals(notExpiredRecordCount, countRecords()); + } finally { + // reset the timed compaction settings + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000 * 60 * 60 * 24); + conf.setFloat("hbase.hregion.majorcompaction.jitter", 0.20F); + // run a major to reset the cache + createStoreFile(r); + r.compact(true); + assertEquals(1, s.getStorefilesCount()); + } + } + + private long countRecords() throws IOException { + long counter = 0; + RegionScanner scanner = r.getScanner(new Scan()); + do { + List cells = new ArrayList<>(); + boolean existNext = scanner.next(cells); + if (existNext || cells.size() > 0) { + counter++; + } + if (!existNext) { + break; + } + } while (true); + return counter; + } +} -- 2.5.4 (Apple Git-61)