From 7214d72e81ee1e636ee6e2a769d4dfa1b9cc6726 Mon Sep 17 00:00:00 2001 From: Clara Date: Wed, 27 Jan 2016 21:43:29 -0800 Subject: [PATCH] HBASE-15181 A simple implementation of date based tiered compaction --- .../apache/hadoop/hbase/regionserver/HStore.java | 5 + .../hbase/regionserver/StoreConfigInformation.java | 7 + .../hadoop/hbase/regionserver/StoreFile.java | 7 + .../compactions/CompactionConfiguration.java | 37 ++- .../compactions/RatioBasedCompactionPolicy.java | 6 +- .../compactions/TieredCompactionPolicy.java | 254 +++++++++++++++++++++ .../hadoop/hbase/regionserver/MockStoreFile.java | 12 + .../hbase/regionserver/TestCompactionPolicy.java | 199 ++++++++++++++++ .../regionserver/TestDefaultCompactSelection.java | 5 +- .../hbase/regionserver/TestDefaultStoreEngine.java | 2 + .../hbase/regionserver/TestStripeStoreEngine.java | 3 + .../hbase/regionserver/TestTieredCompaction.java | 153 +++++++++++++ .../compactions/TestStripeCompactionPolicy.java | 3 + 13 files changed, 685 insertions(+), 8 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/TieredCompactionPolicy.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTieredCompaction.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 9ebdaee..eb73f28 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2368,4 +2368,9 @@ public class HStore implements Store { lock.writeLock().unlock(); } } + + @Override + public HColumnDescriptor getHColumnDescriptor() { + return family; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java index d07bded..801b113 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -53,4 +54,10 @@ public interface StoreConfigInformation { * The number of files required before flushes for this store will be blocked. */ long getBlockingFileCount(); + + /** + * Get the HColumnDescriptor for the store to override configuration dynamically + * @return + */ + HColumnDescriptor getHColumnDescriptor(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index f3830ee..4e54f15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -760,6 +760,13 @@ public class StoreFile { null : getReader().timeRangeTracker.getMinimumTimestamp(); } + + public Long getMaximumTimestamp() { + return (getReader().timeRangeTracker == null) ? + null : + getReader().timeRangeTracker.getMaximumTimestamp(); + } + /** * Gets the approximate mid-point of this file that is optimal for use in splitting it. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java index 633477e..e5a09fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; @@ -66,7 +67,21 @@ public class CompactionConfiguration { public static final String HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT = "hbase.hfile.compaction.discharger.thread.count"; - + + /* + * The epoch time length for the windows we no longer compact + */ + private static final String CONFIG_PREFIX = "hbase.hstore.compaction."; + public static final String MAX_AGE = CONFIG_PREFIX + "tiered.max.storefile.age"; + public static final String TIME_UNIT = CONFIG_PREFIX + "tiered.time.unit"; + public static final String TIER_BASE = CONFIG_PREFIX + "tiered.tier.base"; + public static final String MIN_THRESHOLD = CONFIG_PREFIX + "tiered.min.threshold"; + public static final String COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS = + CONFIG_PREFIX + "tiered.window.compaction.policy.class"; + + private static final Class + DEFAULT_TIER_COMPACTION_POLICY_CLASS = ExploringCompactionPolicy.class; + Configuration conf; StoreConfigInformation storeConfigInfo; @@ -82,9 +97,15 @@ public class CompactionConfiguration { private final long majorCompactionPeriod; private final float majorCompactionJitter; private final float minLocalityToForceCompact; + final long maxStoreFileAge; + final long timeUnit; + final int tierBase; + final int minThreshold; + final String compactionPolicyForTieredWindow; CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) { - this.conf = conf; + this.conf = new CompoundConfiguration().add(conf) + .addBytesMap(storeConfigInfo.getHColumnDescriptor().getValues());; this.storeConfigInfo = storeConfigInfo; maxCompactSize = conf.getLong(HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, Long.MAX_VALUE); @@ -104,6 +125,13 @@ public class CompactionConfiguration { // Make it 0.5 so jitter has us fall evenly either side of when the compaction should run majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.50F); minLocalityToForceCompact = conf.getFloat(HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT, 0f); + + maxStoreFileAge = conf.getLong(MAX_AGE, Long.MAX_VALUE); + timeUnit = conf.getLong(TIME_UNIT, 3600000 * 6); + tierBase = conf.getInt(TIER_BASE, 4); + minThreshold = conf.getInt(MIN_THRESHOLD, 6); + compactionPolicyForTieredWindow = conf.get(COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS, + DEFAULT_TIER_COMPACTION_POLICY_CLASS.getName()); LOG.info(this); } @@ -122,6 +150,11 @@ public class CompactionConfiguration { throttlePoint, majorCompactionPeriod, majorCompactionJitter, + minLocalityToForceCompact, + maxStoreFileAge, + timeUnit, + tierBase, + minThreshold, minLocalityToForceCompact); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java index a823d7c..a7b517d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java @@ -128,7 +128,7 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { * exclude all files above maxCompactSize * Also save all references. We MUST compact them */ - private ArrayList skipLargeFiles(ArrayList candidates, + protected ArrayList skipLargeFiles(ArrayList candidates, boolean mayUseOffpeak) { int pos = 0; while (pos < candidates.size() && !candidates.get(pos).isReference() @@ -148,7 +148,7 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { * @return filtered subset * exclude all bulk load files if configured */ - private ArrayList filterBulk(ArrayList candidates) { + protected ArrayList filterBulk(ArrayList candidates) { candidates.removeAll(Collections2.filter(candidates, new Predicate() { @Override @@ -184,7 +184,7 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { * @return filtered subset * forget the compactionSelection if we don't have enough files */ - private ArrayList checkMinFilesCriteria(ArrayList candidates) { + protected ArrayList checkMinFilesCriteria(ArrayList candidates) { int minFiles = comConf.getMinFilesToCompact(); if (candidates.size() < minFiles) { if(LOG.isDebugEnabled()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/TieredCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/TieredCompactionPolicy.java new file mode 100644 index 0000000..84589eb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/TieredCompactionPolicy.java @@ -0,0 +1,254 @@ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.PeekingIterator; + +public class TieredCompactionPolicy extends RatioBasedCompactionPolicy { + private static final Log LOG = LogFactory.getLog(TieredCompactionPolicy.class); + + private RatioBasedCompactionPolicy compactionPolicyPerWindow; + + public TieredCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) + throws IOException { + super(conf, storeConfigInfo); + try { + compactionPolicyPerWindow = + ReflectionUtils.instantiateWithCustomCtor(comConf.compactionPolicyForTieredWindow, + new Class[] { Configuration.class, StoreConfigInformation.class }, new Object[] { conf, + storeConfigInfo }); + } catch (Exception e) { + throw new IOException("Unable to load configured compaction policy '" + + comConf.compactionPolicyForTieredWindow + "'", e); + } + } + + @Override + public boolean isMajorCompaction(Collection filesToCompact) throws IOException { + // Never do major compaction unless forced + return false; + } + + @Override + /** + * Heuristics for guessing whether we need compaction. + */ + public boolean needsCompaction(final Collection storeFiles, + final List filesCompacting) { + return needsCompaction(storeFiles, filesCompacting, System.currentTimeMillis()); + } + + @VisibleForTesting + public boolean needsCompaction(final Collection storeFiles, + final List filesCompacting, long now) { + if (!super.needsCompaction(storeFiles, filesCompacting)) { + return false; + } + + ArrayList candidates = new ArrayList(storeFiles); + candidates = filterBulk(candidates); + candidates = skipLargeFiles(candidates, true); + try { + candidates = applyCompactionPolicy(candidates, true, false, now); + } catch (Exception e) { + LOG.error("Can not check for compaction: ", e); + return false; + } + + return candidates != null && candidates.size() >= comConf.getMinFilesToCompact(); + } + + @Override + /** + * Could return null if no candidates are found + */ + public ArrayList applyCompactionPolicy(ArrayList candidates, + boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { + return applyCompactionPolicy(candidates, mayUseOffPeak, mayBeStuck, System.currentTimeMillis()); + } + + @VisibleForTesting + public ArrayList applyCompactionPolicy(ArrayList candidates, + boolean mayUseOffPeak, boolean mayBeStuck, long now) throws IOException { + Iterable candidatesInWindow = + filterOldStoreFiles(Lists.newArrayList(candidates), comConf.maxStoreFileAge, now); + + List> buckets = + getBuckets(createStoreFileAndMaxTimestampPairs(candidatesInWindow), comConf.timeUnit, + comConf.tierBase, now); + LOG.debug("Compaction buckets are: " + buckets); + + return newestBucket(buckets, comConf.minThreshold, comConf.getMaxFilesToCompact(), now, + comConf.timeUnit, mayUseOffPeak); + } + + /** + * @param buckets list of buckets, sorted from newest to oldest, from which to return the newest + * bucket within thresholds. + * @param minThreshold minimum number of storeFiles in a bucket to qualify. + * @param maxThreshold maximum number of storeFiles to compact at once (the returned bucket will + * be trimmed down to this). + * @return a bucket (list) of store files to compact. + * @throws IOException + */ + ArrayList newestBucket(List> buckets, int minThreshold, + int maxThreshold, long now, long timeUnit, boolean mayUseOffPeak) throws IOException { + // If the "incoming window" has at least minThreshold store files, + // choose + // that one. + // For any other bucket, at least 2 store files is enough. + // In any case, limit to maxThreshold store files. + Target incomingWindow = getInitialTarget(now, timeUnit); + for (ArrayList bucket : buckets) { + boolean isIncomingWindow = incomingWindow.onTarget(bucket.get(0).getMaximumTimestamp()); + if (bucket.size() >= minThreshold && isIncomingWindow) { + return bucket; + } else if (!isIncomingWindow && compactionPolicyPerWindow != null) { + ArrayList candidates = + compactionPolicyPerWindow.applyCompactionPolicy(bucket, mayUseOffPeak, false); + if (!candidates.isEmpty()) { + return candidates; + } + } + } + return null; + } + + static List> getBuckets(Collection> files, long timeUnit, + int tierBase, long now) { + // Sort files by age. Newest first. + final List> sortedFiles = Lists.newArrayList(files); + Collections.sort(sortedFiles, Collections.reverseOrder(new Comparator>() { + public int compare(Pair p1, Pair p2) { + return p1.getSecond().compareTo(p2.getSecond()); + } + })); + + List> buckets = Lists.newArrayList(); + Target target = getInitialTarget(now, timeUnit); + PeekingIterator> it = Iterators.peekingIterator(sortedFiles.iterator()); + + outerLoop: while (it.hasNext()) { + while (!target.onTarget(it.peek().getSecond())) { + // If the file is too new for the target, skip it. + if (target.compareToTimestamp(it.peek().getSecond()) < 0) { + it.next(); + if (!it.hasNext()) break outerLoop; + } else + // If the file is too old for the target, switch to higher + // tier. + target = target.nextTarget(tierBase); + } + + ArrayList bucket = Lists.newArrayList(); + // Add all files of the same tier to current bucket + while (target.onTarget(it.peek().getSecond())) { + bucket.add(it.next().getFirst()); + + if (!it.hasNext()) break; + } + buckets.add(bucket); + } + + return buckets; + } + + /** + * @param storeFiles + * @return + */ + public static List> createStoreFileAndMaxTimestampPairs( + Iterable storeFiles) { + List> storefileMaxTimestampPairs = + Lists.newArrayListWithCapacity(Iterables.size(storeFiles)); + for (StoreFile storeFile : storeFiles) + storefileMaxTimestampPairs.add(new Pair(storeFile, storeFile + .getMaximumTimestamp())); + return storefileMaxTimestampPairs; + } + + /** + * Removes all store files with max timestamp older than maxAge. + * @param storeFiles all store files to consider + * @param maxAge the age in milliseconds when a store file stops participating in compaction + * @param now current time. store files with max timestamp less than (now - maxAge) are filtered. + * @return a list of storeFiles with the store file older than maxAge excluded + */ + @VisibleForTesting static Iterable filterOldStoreFiles(List storeFiles, + long maxAge, long now) { + if (maxAge == 0) return storeFiles; + final long cutoff = now - maxAge; + return Iterables.filter(storeFiles, new Predicate() { + @Override + public boolean apply(StoreFile storeFile) { + return storeFile.getMaximumTimestamp() >= cutoff; + } + }); + } + + static Target getInitialTarget(long now, long timeUnit) { + return new Target(timeUnit, now / timeUnit); + } + + private static class Target { + // How big a range of timestamps fit inside the target. + public final long timeUnit; + // A timestamp t hits the target iff t / size == divPosition. + public final long divPosition; + + public Target(long timeUnit, long divPosition) { + this.timeUnit = timeUnit; + this.divPosition = divPosition; + } + + /** + * Compares the target to a timestamp. + * @param timestamp the timestamp to compare. + * @return a negative integer, zero, or a positive integer as the target lies before, covering, + * or after than the timestamp. + */ + public int compareToTimestamp(long timestamp) { + long pos = timestamp / timeUnit; + return divPosition == pos ? 0 : divPosition < pos ? -1 : 1; + } + + /** + * Tells if the timestamp hits the target. + * @param timestamp the timestamp to test. + * @return true iff timestamp / size == divPosition. + */ + public boolean onTarget(long timestamp) { + return compareToTimestamp(timestamp) == 0; + } + + /** + * Move to the new target of the same tier or Gets the next tier, which represents an earlier + * time span. + * @param tierBase The number of contiguous targets that will have the same size. Targets + * following those will be tierBase times as big. + * @return + */ + public Target nextTarget(int tierBase) { + if (divPosition % tierBase > 0) return new Target(timeUnit, divPosition - 1); + else return new Target(timeUnit * tierBase, divPosition / tierBase - 1); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java index 3614846..2dc7ca6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java @@ -100,6 +100,18 @@ public class MockStoreFile extends StoreFile { this.entryCount = entryCount; } + public Long getMinimumTimestamp() { + return (timeRangeTracker == null) ? + null : + timeRangeTracker.getMinimumTimestamp(); + } + + public Long getMaximumTimestamp() { + return (timeRangeTracker == null) ? + null : + timeRangeTracker.getMaximumTimestamp(); + } + @Override public StoreFile.Reader getReader() { final long len = this.length; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java new file mode 100644 index 0000000..8a42b0f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java @@ -0,0 +1,199 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.After; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + + +@Category(SmallTests.class) +public class TestCompactionPolicy extends TestCase +{ + private final static Log LOG = LogFactory.getLog(TestDefaultCompactSelection.class); + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + protected Configuration conf; + protected HStore store; + private static final String DIR= + TEST_UTIL.getDataTestDir(TestDefaultCompactSelection.class.getSimpleName()).toString(); + protected static Path TEST_FILE; + + protected static final int minFiles = 3; + protected static final int maxFiles = 5; + + protected static final long minSize = 10; + protected static final long maxSize = 2100; + + private FSHLog hlog; + private HRegion region; + + @Override + public void setUp() throws Exception + { + config(); + initialize(); + } + + /** + * setup config values necessary for store + */ + protected void config() + { + this.conf = TEST_UTIL.getConfiguration(); + this.conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 0); + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, minFiles); + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, maxFiles); + this.conf.setLong(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_SIZE_KEY, minSize); + this.conf.setLong(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, maxSize); + this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.0F); + } + + /** + * Setting up a Store + * @throws IOException + */ + protected void initialize() throws IOException + { + Path basedir = new Path(DIR); + String logName = "logs"; + Path logdir = new Path(DIR, logName); + HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes("family")); + FileSystem fs = FileSystem.get(conf); + + fs.delete(logdir, true); + + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(Bytes.toBytes("table"))); + htd.addFamily(hcd); + HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); + + + hlog = new FSHLog(fs, basedir, logName, conf); + region = HRegion.createHRegion(info, basedir, conf, htd, hlog); + region.close(); + Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); + region = new HRegion(tableDir, hlog, fs, conf, info, htd, null); + + store = new HStore(region, hcd, conf); + + TEST_FILE = region.getRegionFileSystem().createTempName(); + fs.createNewFile(TEST_FILE); + } + + @After + public void tearDown() throws IOException { + IOException ex = null; + try { + region.close(); + } catch (IOException e) { + LOG.warn("Caught Exception", e); + ex = e; + } + try { + hlog.close(); + } catch (IOException e) { + LOG.warn("Caught Exception", e); + ex = e; + } + if (ex != null) { + throw ex; + } + } + + ArrayList toArrayList(long... numbers) { + ArrayList result = new ArrayList(); + for (long i : numbers) { + result.add(i); + } + return result; + } + + List sfCreate(long... sizes) throws IOException { + ArrayList ageInDisk = new ArrayList(); + for (int i = 0; i < sizes.length; i++) { + ageInDisk.add(0L); + } + return sfCreate(toArrayList(sizes), ageInDisk); + } + + List sfCreate(ArrayList sizes, ArrayList ageInDisk) + throws IOException { + return sfCreate(false, sizes, ageInDisk); + } + + List sfCreate(boolean isReference, long... sizes) throws IOException { + ArrayList ageInDisk = new ArrayList(sizes.length); + for (int i = 0; i < sizes.length; i++) { + ageInDisk.add(0L); + } + return sfCreate(isReference, toArrayList(sizes), ageInDisk); + } + + List sfCreate(boolean isReference, ArrayList sizes, ArrayList ageInDisk) + throws IOException { + List ret = Lists.newArrayList(); + for (int i = 0; i < sizes.size(); i++) { + ret.add(new MockStoreFile(TEST_UTIL, TEST_FILE, + sizes.get(i), ageInDisk.get(i), isReference, i)); + } + return ret; + } + + long[] getSizes(List sfList) { + long[] aNums = new long[sfList.size()]; + for (int i = 0; i < sfList.size(); ++i) { + aNums[i] = sfList.get(i).getReader().length(); + } + return aNums; + } + + void compactEquals(List candidates, long... expected) + throws IOException { + compactEquals(candidates, false, false, expected); + } + + void compactEquals(List candidates, boolean forcemajor, long... expected) + throws IOException { + compactEquals(candidates, forcemajor, false, expected); + } + + void compactEquals(List candidates, boolean forcemajor, boolean isOffPeak, + long ... expected) + throws IOException { + store.forceMajor = forcemajor; + //Test Default compactions + CompactionRequest result = + ((RatioBasedCompactionPolicy)store.storeEngine.getCompactionPolicy()) + .selectCompaction(candidates, new ArrayList(), false, isOffPeak, forcemajor); + List actual = new ArrayList(result.getFiles()); + if (isOffPeak && !forcemajor) { + assertTrue(result.isOffPeak()); + } + assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual))); + store.forceMajor = false; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java index d68c6b1..1489518 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java @@ -22,8 +22,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import junit.framework.TestCase; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -44,12 +42,13 @@ import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.junit.After; +import org.junit.Test; import org.junit.experimental.categories.Category; import com.google.common.collect.Lists; @Category({RegionServerTests.class, SmallTests.class}) -public class TestDefaultCompactSelection extends TestCase { +public class TestDefaultCompactSelection extends TestCompactionPolicy { private final static Log LOG = LogFactory.getLog(TestDefaultCompactSelection.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java index b9982aa..7644db5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy; @@ -60,6 +61,7 @@ public class TestDefaultStoreEngine { conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, DummyStoreFlusher.class.getName()); Store mockStore = Mockito.mock(Store.class); + Mockito.when (mockStore.getHColumnDescriptor()).thenReturn(new HColumnDescriptor("vallance")); StoreEngine se = StoreEngine.create(mockStore, conf, CellComparator.COMPARATOR); Assert.assertTrue(se instanceof DefaultStoreEngine); Assert.assertTrue(se.getCompactionPolicy() instanceof DummyCompactionPolicy); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java index 1454aa8..1717fec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java @@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; @@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; @Category({RegionServerTests.class, SmallTests.class}) public class TestStripeStoreEngine { @@ -113,6 +115,7 @@ public class TestStripeStoreEngine { private static TestStoreEngine createEngine(Configuration conf) throws Exception { Store store = mock(Store.class); + Mockito.when (store.getHColumnDescriptor()).thenReturn(new HColumnDescriptor("vallance")); CellComparator kvComparator = mock(CellComparator.class); return (TestStoreEngine)StoreEngine.create(store, conf, kvComparator); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTieredCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTieredCompaction.java new file mode 100644 index 0000000..f274c4a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTieredCompaction.java @@ -0,0 +1,153 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.regionserver.compactions.TieredCompactionPolicy; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestTieredCompaction extends TestCompactionPolicy { + ArrayList sfCreate(long[] minTimestamps, long[] maxTimestamps, long[] sizes) + throws IOException { + ArrayList ageInDisk = new ArrayList(); + for (long size : sizes) { + ageInDisk.add(0L); + } + + ArrayList ret = Lists.newArrayList(); + for (int i = 0; i < sizes.length; i++) { + MockStoreFile msf = + new MockStoreFile(TEST_UTIL, TEST_FILE, sizes[i], ageInDisk.get(i), false, i); + msf.setTimeRangeTracker(new TimeRangeTracker(minTimestamps[i], maxTimestamps[i])); + ret.add(msf); + } + return ret; + } + + @Override + protected void config() { + super.config(); + + // Set up policy + conf.setInt(CompactionConfiguration.MIN_THRESHOLD, 3); + conf.setLong(CompactionConfiguration.MAX_AGE, 100); + conf.setLong(CompactionConfiguration.TIME_UNIT, 6); + conf.setInt(CompactionConfiguration.TIER_BASE, 4); + conf.set(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, + TieredCompactionPolicy.class.getName()); + + // Special settings for compaction policy per window + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2); + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 12); + this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F); + this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_OFFPEAK_KEY, 1.2F); + + } + + void compactEquals(long now, ArrayList candidates, long... expected) + throws IOException { + assertTrue(((TieredCompactionPolicy) store.storeEngine.getCompactionPolicy()).needsCompaction( + candidates, ImmutableList. of(), now)); + + List actual = + ((TieredCompactionPolicy) store.storeEngine.getCompactionPolicy()).applyCompactionPolicy( + candidates, false, false, now); + + assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual))); + } + + public void testTieredCompactionT0() throws IOException { + // Set up candidates + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 }; + long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 }; + + compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 13, 12, 11, 10); + } + + /** + * Not enough files in incoming window + * @throws IOException + */ + public void testTieredCompactionNoT0() throws IOException { + // Set up candidates + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 }; + long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11 }; + + compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 25, 24, 23, 22, 21, 20); + } + + /** + * If there is no T1 window, we don't build 2 + * @throws IOException + */ + public void testTieredCompactioNoT2() throws IOException { + // Set up candidates + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 44, 60, 61, 92, 95, 100 }; + long[] sizes = new long[] { 0, 20, 21, 22, 23, 1 }; + + compactEquals(100, sfCreate(minTimestamps, maxTimestamps, sizes), 23, 22); + } + + public void testTieredCompactionT1() throws IOException { + // Set up candidates + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 120, 124, 143, 145, 157 }; + long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 30, 31, 32, 2, 1 }; + + compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), 32, 31, 30); + } + + /** + * Don't apply ratio-based logic on incoming window + * @throws IOException + */ + public void testIncomingWindowRatioT0() throws IOException { + // Set up candidates + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 }; + long[] sizes = new long[] { 30, 3, 3, 3, 3, 2, 2, 2, 2, 2, 2, 180, 10, 11, 12 }; + + compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 12, 11, 10, 180); + } + + public void testTieredCompactionRatioT0() throws IOException { + // Set up candidates + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }; + long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 280, 23, 24, 1 }; + + compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 22, 21, 20); + } + + public void testTieredCompactionRatioT2() throws IOException { + // Set up candidates + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 120, 124, 143, 145, 157 }; + long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 350, 30, 31, 2, 1 }; + + compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), 31, 30); + } + + /* + * The next compaction call after testTieredCompactionRatioT0 is compacted + */ + public void testTieredCompactionRatioT0Next() throws IOException { + // Set up candidates + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 8, 9, 10, 11, 12 }; + long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 280, 23, 24, 1 }; + + compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 24, 23); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index 56e71e8..99c0c25 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -77,6 +77,7 @@ import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -144,6 +145,7 @@ public class TestStripeCompactionPolicy { conf.setInt(StripeStoreConfig.MAX_FILES_KEY, 4); conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, 1000); // make sure the are no splits StoreConfigInformation sci = mock(StoreConfigInformation.class); + Mockito.when (sci.getHColumnDescriptor()).thenReturn(new HColumnDescriptor("vallance")); StripeStoreConfig ssc = new StripeStoreConfig(conf, sci); StripeCompactionPolicy policy = new StripeCompactionPolicy(conf, sci, ssc) { @Override @@ -451,6 +453,7 @@ public class TestStripeCompactionPolicy { conf.setFloat(StripeStoreConfig.SPLIT_PARTS_KEY, splitCount); conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialCount); StoreConfigInformation sci = mock(StoreConfigInformation.class); + Mockito.when (sci.getHColumnDescriptor()).thenReturn(new HColumnDescriptor("vallance")); when(sci.getStoreFileTtl()).thenReturn(hasTtl ? defaultTtl : Long.MAX_VALUE); StripeStoreConfig ssc = new StripeStoreConfig(conf, sci); return new StripeCompactionPolicy(conf, sci, ssc); -- 1.9.3 (Apple Git-50)