From 645cb49607aa48ab490a88b13032eb65006a71a2 Mon Sep 17 00:00:00 2001 From: Clara Date: Thu, 7 Apr 2016 21:31:31 -0700 Subject: [PATCH] HBASE-15400 Use DateTieredCompactor for Date Tiered Compaction --- .../hbase/regionserver/DateTieredStoreEngine.java | 108 ++++++ .../apache/hadoop/hbase/regionserver/HStore.java | 2 +- .../hadoop/hbase/regionserver/StoreFile.java | 32 +- .../compactions/CompactionConfiguration.java | 19 +- .../regionserver/compactions/CompactionPolicy.java | 2 +- .../compactions/CompactionRequest.java | 4 + .../compactions/DateTieredCompactionPolicy.java | 388 +++++++++++++------- .../compactions/DateTieredCompactionRequest.java | 42 +++ .../compactions/ExploringCompactionPolicy.java | 2 +- .../compactions/FIFOCompactionPolicy.java | 4 +- .../compactions/RatioBasedCompactionPolicy.java | 394 ++++++--------------- .../compactions/SortedCompactionPolicy.java | 232 ++++++++++++ .../compactions/StripeCompactionPolicy.java | 2 +- .../hadoop/hbase/regionserver/MockStoreFile.java | 43 ++- .../regionserver/TestDateTieredCompaction.java | 198 ++++++++--- .../regionserver/TestDefaultCompactSelection.java | 6 + .../regionserver/compactions/EverythingPolicy.java | 2 +- 17 files changed, 995 insertions(+), 485 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java new file mode 100644 index 0000000..1bd0f1c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor; +import org.apache.hadoop.hbase.security.User; + +/** + * HBASE-15400 This store engine allows us to store data in date tiered layout with exponential sizing + * so that the more recent data has more granularity. Time-range scan will perform the best with most recent data. + * When data reach maxAge, they are compacted in fixed-size time windows for TTL and archiving. + * Please refer to design spec for more details. + * https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/edit#heading=h.uk6y5pd3oqgx + */ +@InterfaceAudience.Private +public class DateTieredStoreEngine extends + StoreEngine { + + @Override + public boolean needsCompaction(List filesCompacting) { + return compactionPolicy.needsCompaction(storeFileManager.getStorefiles(), + filesCompacting); + } + + @Override + public CompactionContext createCompaction() throws IOException { + return new DateTieredCompactionContext(); + } + + @Override + protected void createComponents(Configuration conf, Store store, KVComparator kvComparator) + throws IOException { + this.compactionPolicy = new DateTieredCompactionPolicy(conf, store); + this.storeFileManager = new DefaultStoreFileManager(kvComparator, conf, + compactionPolicy.getConf()); + this.storeFlusher = new DefaultStoreFlusher(conf, store); + this.compactor = new DateTieredCompactor(conf, store); + } + + private final class DateTieredCompactionContext extends CompactionContext { + + @Override + public List preSelect(List filesCompacting) { + return compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStorefiles(), + filesCompacting); + } + + @Override + public boolean select(List filesCompacting, boolean isUserCompaction, + boolean mayUseOffPeak, boolean forceMajor) throws IOException { + request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(), filesCompacting, + isUserCompaction, mayUseOffPeak, forceMajor); + return request != null; + } + + @Override + public void forceSelect(CompactionRequest request) { + if (!(request instanceof DateTieredCompactionRequest)) { + throw new IllegalArgumentException("DateTieredCompactionRequest is expected. Actual: " + + request.getClass().getCanonicalName()); + } + super.forceSelect(request); + } + + @Override + public List compact(CompactionThroughputController throughputController) throws IOException { + return compact(throughputController, null); + } + + public List compact(CompactionThroughputController throughputController, User user) + throws IOException { + if (request instanceof DateTieredCompactionRequest) { + return compactor.compact(request, ((DateTieredCompactionRequest) request).getBoundaries(), + throughputController, user); + } else { + throw new IllegalArgumentException("DateTieredCompactionRequest is expected. Actual: " + + request.getClass().getCanonicalName()); + } + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index ee0e708..17682ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1408,7 +1408,7 @@ public class HStore implements Store { return false; } } - return storeEngine.getCompactionPolicy().isMajorCompaction( + return storeEngine.getCompactionPolicy().shouldPerformMajorCompaction( this.storeEngine.getStoreFileManager().getStorefiles()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 25e2e78..b959152 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -18,6 +18,11 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Ordering; + import java.io.DataInput; import java.io.IOException; import java.net.InetSocketAddress; @@ -59,11 +64,6 @@ import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.WritableUtils; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Ordering; - /** * A Store data file. Stores usually have one or more of these files. They * are produced by flushing the memstore to disk. To @@ -353,7 +353,7 @@ public class StoreFile { * is turned off, fall back to BULKLOAD_TIME_KEY. * @return true if this storefile was created by bulk load. */ - boolean isBulkLoadResult() { + public boolean isBulkLoadResult() { boolean bulkLoadedHFile = false; String fileName = this.getPath().getName(); int startPos = fileName.indexOf("SeqId_"); @@ -1611,6 +1611,19 @@ public class StoreFile { Ordering.natural().onResultOf(new GetPathName()) )); + /** + * Comparator for time-aware compaction. SeqId is still the first + * ordering criterion to maintain MVCC. + */ + public static final Comparator SEQ_ID_MAX_TIMESTAMP = + Ordering.compound(ImmutableList.of( + Ordering.natural().onResultOf(new GetSeqId()), + Ordering.natural().onResultOf(new GetMaxTimestamp()), + Ordering.natural().onResultOf(new GetFileSize()).reverse(), + Ordering.natural().onResultOf(new GetBulkTime()), + Ordering.natural().onResultOf(new GetPathName()) + )); + private static class GetSeqId implements Function { @Override public Long apply(StoreFile sf) { @@ -1639,5 +1652,12 @@ public class StoreFile { return sf.getPath().getName(); } } + + private static class GetMaxTimestamp implements Function { + @Override + public Long apply(StoreFile sf) { + return sf.getMaximumTimestamp() == null? (Long)Long.MAX_VALUE : sf.getMaximumTimestamp(); + } + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java index 85516df..49bd017 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -52,11 +52,11 @@ public class CompactionConfiguration { public static final String RATIO_KEY = CONFIG_PREFIX + "ratio"; public static final String MIN_KEY = CONFIG_PREFIX + "min"; public static final String MAX_KEY = CONFIG_PREFIX + "max"; - + /* * The epoch time length for the windows we no longer compact */ - public static final String MAX_AGE_KEY =CONFIG_PREFIX + "date.tiered.max.storefile.age.millis"; + public static final String MAX_AGE_MILLIS_KEY =CONFIG_PREFIX + "date.tiered.max.storefile.age.millis"; public static final String BASE_WINDOW_MILLIS_KEY = CONFIG_PREFIX + "date.tiered.base.window.millis"; public static final String WINDOWS_PER_TIER_KEY = CONFIG_PREFIX + "date.tiered.windows.per.tier"; @@ -64,6 +64,8 @@ public class CompactionConfiguration { CONFIG_PREFIX + "date.tiered.incoming.window.min"; public static final String COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY = CONFIG_PREFIX + "date.tiered.window.policy.class"; + public static final String SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY = + CONFIG_PREFIX + "date.tiered.single.output.for.minor.compaction"; private static final Class DEFAULT_TIER_COMPACTION_POLICY_CLASS = ExploringCompactionPolicy.class; @@ -86,7 +88,8 @@ public class CompactionConfiguration { private final int windowsPerTier; private final int incomingWindowMin; private final String compactionPolicyForTieredWindow; - + private final boolean singleOutputForMinorCompaction; + CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) { this.conf = conf; this.storeConfigInfo = storeConfigInfo; @@ -107,12 +110,14 @@ public class CompactionConfiguration { majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.50F); minLocalityToForceCompact = conf.getFloat(HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT, 0f); - maxStoreFileAgeMillis = conf.getLong(MAX_AGE_KEY, Long.MAX_VALUE); + maxStoreFileAgeMillis = conf.getLong(MAX_AGE_MILLIS_KEY, Long.MAX_VALUE); baseWindowMillis = conf.getLong(BASE_WINDOW_MILLIS_KEY, 3600000 * 6); windowsPerTier = conf.getInt(WINDOWS_PER_TIER_KEY, 4); incomingWindowMin = conf.getInt(INCOMING_WINDOW_MIN_KEY, 6); compactionPolicyForTieredWindow = conf.get(COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY, DEFAULT_TIER_COMPACTION_POLICY_CLASS.getName()); + singleOutputForMinorCompaction = conf.getBoolean(SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, + true); LOG.info(this); } @@ -167,7 +172,7 @@ public class CompactionConfiguration { public void setMinFilesToCompact(int threshold) { minFilesToCompact = threshold; } - + /** * @return upper bound on number of files to be included in minor compactions */ @@ -240,4 +245,8 @@ public class CompactionConfiguration { public String getCompactionPolicyForTieredWindow() { return compactionPolicyForTieredWindow; } + + public boolean useSingleOutputForMinorCompaction() { + return singleOutputForMinorCompaction; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java index c0d62b7..6b429c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java @@ -45,7 +45,7 @@ public abstract class CompactionPolicy { * @param filesToCompact Files to compact. Can be null. * @return True if we should run a major compaction. */ - public abstract boolean isMajorCompaction( + public abstract boolean shouldPerformMajorCompaction( final Collection filesToCompact) throws IOException; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java index fb26761..341cfdf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java @@ -215,6 +215,10 @@ public class CompactionRequest implements Comparable { ", priority=" + priority + ", time=" + timeInNanos; } + public void updateFiles(Collection files) { + this.filesToCompact = files; + } + /** * Recalculate the size of the compaction based on current files. * @param files files that should be included in the compaction diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java index ba72658..d32afe1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java @@ -18,43 +18,49 @@ */ package org.apache.hadoop.hbase.regionserver.compactions; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.PeekingIterator; +import com.google.common.math.LongMath; + import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import com.google.common.collect.PeekingIterator; - /** * HBASE-15181 This is a simple implementation of date-based tiered compaction similar to * Cassandra's for the following benefits: * 1. Improve date-range-based scan by structuring store files in date-based tiered layout. * 2. Reduce compaction overhead. * 3. Improve TTL efficiency. - * Perfect fit for the use cases that: + * Perfect fit for the use cases that: * 1. has mostly date-based data write and scan and a focus on the most recent data. - * 2. never or rarely deletes data. Out-of-order writes are handled gracefully. Time range - * overlapping among store files is tolerated and the performance impact is minimized. Configuration - * can be set at hbase-site or overriden at per-table or per-column-famly level by hbase shell. - * Design spec is at + * Out-of-order writes are handled gracefully. Time range overlapping among store files is + * tolerated and the performance impact is minimized. Configuration can be set at hbase-site + * or overridden at per-table or per-column-family level by hbase shell. Design spec is at * https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/ */ -public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy { +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class DateTieredCompactionPolicy extends SortedCompactionPolicy { private static final Log LOG = LogFactory.getLog(DateTieredCompactionPolicy.class); private RatioBasedCompactionPolicy compactionPolicyPerWindow; @@ -73,190 +79,290 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy { } } - @Override - public boolean isMajorCompaction(Collection filesToCompact) throws IOException { - // TODO: major compaction with tiered output. Never do major compaction unless forced for now. - return false; - } - - @Override /** - * Heuristics for guessing whether we need compaction. + * Heuristics for guessing whether we need minor compaction. */ - public boolean needsCompaction(final Collection storeFiles, - final List filesCompacting) { - return needsCompaction(storeFiles, filesCompacting, System.currentTimeMillis()); - } - + @Override @VisibleForTesting public boolean needsCompaction(final Collection storeFiles, - final List filesCompacting, long now) { + final List filesCompacting) { ArrayList candidates = new ArrayList(storeFiles); - candidates = filterBulk(candidates); - candidates = skipLargeFiles(candidates); try { - candidates = applyCompactionPolicy(candidates, true, false, now); + return selectMinorCompaction(candidates, false, true) != null; } catch (Exception e) { LOG.error("Can not check for compaction: ", e); return false; } - return candidates != null; - } - - @Override - /** - * Input candidates are sorted from oldest to newest by seqId - * Could return null if no candidates are found - */ - public ArrayList applyCompactionPolicy(ArrayList candidates, - boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { - return applyCompactionPolicy(candidates, mayUseOffPeak, mayBeStuck, - System.currentTimeMillis()); } - @VisibleForTesting - public ArrayList applyCompactionPolicy(ArrayList candidates, - boolean mayUseOffPeak, boolean mayBeStuck, long now) throws IOException { - Iterable candidatesInWindow = - filterOldStoreFiles(Lists.newArrayList(candidates), comConf.getMaxStoreFileAgeMillis(), now); - - List> buckets = - partitionFilesToBuckets(candidatesInWindow, comConf.getBaseWindowMillis(), - comConf.getWindowsPerTier(), now); - LOG.debug("Compaction buckets are: " + buckets); - if (buckets.size() >= storeConfigInfo.getBlockingFileCount()) { - LOG.warn("Number of compaction buckets:" + buckets.size() - + ", exceeds blocking file count setting: " - + storeConfigInfo.getBlockingFileCount() - + ", either increase hbase.hstore.blockingStoreFiles or " - + "reduce the number of tiered compaction windows"); + public boolean shouldPerformMajorCompaction(final Collection filesToCompact) throws IOException { + long mcTime = getNextMajorCompactTime(filesToCompact); + if (filesToCompact == null || mcTime == 0) { + return false; } - return newestBucket(buckets, comConf.getIncomingWindowMin(), now, comConf.getBaseWindowMillis(), - mayUseOffPeak); - } + // TODO: Use better method for determining stamp of last major (HBASE-2990) + long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); + long now = EnvironmentEdgeManager.currentTimeMillis(); + if (lowTimestamp <= 0l || lowTimestamp >= (now - mcTime)) { + return false; + } + + long cfTtl = this.storeConfigInfo.getStoreFileTtl(); + HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); + long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now); + List boundaries = getCompactBoundariesForMajor(filesToCompact, oldestToCompact, now); + boolean[] filesInWindow = new boolean[boundaries.size()]; + + for (StoreFile file: filesToCompact) { + Long minTimestamp = file.getMinimumTimestamp(); + long oldest = (minTimestamp == null) ? Long.MIN_VALUE : now - minTimestamp.longValue(); + if (cfTtl != HConstants.FOREVER && oldest >= cfTtl) { + LOG.debug("Major compaction triggered on store " + this + + "; for TTL maintenance"); + return true; + } + if (!file.isMajorCompaction() || file.isBulkLoadResult()) { + LOG.debug("Major compaction triggered on store " + this + + ", because there are new files and time since last major compaction " + + (now - lowTimestamp) + "ms"); + return true; + } + + int lowerWindowIndex = Collections.binarySearch(boundaries, + minTimestamp == null ? Long.MAX_VALUE : file.getMinimumTimestamp()); + int upperWindowIndex = Collections.binarySearch(boundaries, + file.getMaximumTimestamp() == null ? Long.MAX_VALUE : file.getMaximumTimestamp()); - /** - * @param buckets the list of buckets, sorted from newest to oldest, from which to return the - * newest bucket within thresholds. - * @param incomingWindowThreshold minimum number of storeFiles in a bucket to qualify. - * @param maxThreshold maximum number of storeFiles to compact at once (the returned bucket will - * be trimmed down to this). - * @return a bucket (a list of store files within a window to be compacted). - * @throws IOException - */ - private ArrayList newestBucket(List> buckets, - int incomingWindowThreshold, long now, long baseWindowMillis, boolean mayUseOffPeak) - throws IOException { - Window incomingWindow = getInitialWindow(now, baseWindowMillis); - for (ArrayList bucket : buckets) { - int minThreshold = incomingWindow.compareToTimestamp(bucket.get(0).getMaximumTimestamp()) - <= 0? comConf.getIncomingWindowMin() : comConf.minFilesToCompact; - compactionPolicyPerWindow.setMinThreshold(minThreshold); - ArrayList candidates = compactionPolicyPerWindow.applyCompactionPolicy(bucket, - mayUseOffPeak, false); - if (candidates != null && !candidates.isEmpty()) { - return candidates; + if (lowerWindowIndex != upperWindowIndex) { + LOG.debug("Major compaction triggered on store " + this + "; because file " + + file.getPath() + " has data with timestamps cross window boundaries"); + return true; + } else if (filesInWindow[upperWindowIndex]) { + LOG.debug("Major compaction triggered on store " + this + + "; because there are more than one file in some windows"); + return true; + } else { + filesInWindow[upperWindowIndex] = true; } + + hdfsBlocksDistribution.add(file.getHDFSBlockDistribution()); } - return null; + + float blockLocalityIndex = hdfsBlocksDistribution + .getBlockLocalityIndex(HRegionServer.getHostname(comConf.conf)); + if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) { + LOG.debug("Major compaction triggered on store " + this + + "; to make hdfs blocks local, current blockLocalityIndex is " + + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")"); + return true; + } + + LOG.debug("Skipping major compaction of " + this + ", because the files are already major compacted"); + return false; + } + + @Override + protected CompactionRequest getCompactionRequest(ArrayList candidateSelection, boolean tryingMajor, + boolean isUserCompaction, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { + CompactionRequest result = tryingMajor ? selectMajorCompaction(candidateSelection) + : selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck); + ArrayList filesToCompact = Lists.newArrayList(result.getFiles()); + removeExcessFiles(filesToCompact, isUserCompaction, tryingMajor); + result.updateFiles(filesToCompact); + result.setOffPeak(!filesToCompact.isEmpty() && !tryingMajor && mayUseOffPeak); + LOG.debug("Generated compaction request: " + result); + return result; } + public CompactionRequest selectMajorCompaction(ArrayList candidateSelection) { + long now = EnvironmentEdgeManager.currentTimeMillis(); + long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now); + return new DateTieredCompactionRequest(candidateSelection, + this.getCompactBoundariesForMajor(candidateSelection, oldestToCompact, now)); + } + /** * We receive store files sorted in ascending order by seqId then scan the list of files. If the * current file has a maxTimestamp older than last known maximum, treat this file as it carries * the last known maximum. This way both seqId and timestamp are in the same order. If files carry * the same maxTimestamps, they are ordered by seqId. We then reverse the list so they are ordered - * by seqId and maxTimestamp in decending order and build the time windows. All the out-of-order + * by seqId and maxTimestamp in descending order and build the time windows. All the out-of-order * data into the same compaction windows, guaranteeing contiguous compaction based on sequence id. */ - private static List> partitionFilesToBuckets(Iterable storeFiles, - long baseWindowSizeMillis, int windowsPerTier, long now) { - List> buckets = Lists.newArrayList(); - Window window = getInitialWindow(now, baseWindowSizeMillis); + public CompactionRequest selectMinorCompaction(ArrayList candidateSelection, + boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { + long now = EnvironmentEdgeManager.currentTimeMillis(); + long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now); + + // Make sure the store files is sorted by SeqId then maxTimestamp + List storeFileList = Lists.newArrayList(filterOldStoreFiles(candidateSelection, + oldestToCompact)); + Collections.sort(storeFileList, StoreFile.Comparators.SEQ_ID_MAX_TIMESTAMP); List> storefileMaxTimestampPairs = - Lists.newArrayListWithCapacity(Iterables.size(storeFiles)); + Lists.newArrayListWithCapacity(Iterables.size(storeFileList)); long maxTimestampSeen = Long.MIN_VALUE; - for (StoreFile storeFile : storeFiles) { + for (StoreFile storeFile : storeFileList) { // if there is out-of-order data, // we put them in the same window as the last file in increasing order - maxTimestampSeen = Math.max(maxTimestampSeen, storeFile.getMaximumTimestamp()); + maxTimestampSeen = Math.max(maxTimestampSeen, + storeFile.getMaximumTimestamp() == null? Long.MIN_VALUE : storeFile.getMaximumTimestamp()); storefileMaxTimestampPairs.add(new Pair(storeFile, maxTimestampSeen)); } - Collections.reverse(storefileMaxTimestampPairs); + + Window window = getIncomingWindow(now, comConf.getBaseWindowMillis()); + int minThreshold = comConf.getIncomingWindowMin(); PeekingIterator> it = Iterators.peekingIterator(storefileMaxTimestampPairs.iterator()); - while (it.hasNext()) { int compResult = window.compareToTimestamp(it.peek().getSecond()); if (compResult > 0) { // If the file is too old for the window, switch to the next window - window = window.nextWindow(windowsPerTier); + window = window.nextWindow(comConf.getWindowsPerTier(), + oldestToCompact); + minThreshold = comConf.getMinFilesToCompact(); } else { // The file is within the target window - ArrayList bucket = Lists.newArrayList(); - // Add all files in the same window to current bucket. For incoming window + ArrayList fileList = Lists.newArrayList(); + // Add all files in the same window. For incoming window // we tolerate files with future data although it is sub-optimal while (it.hasNext() && window.compareToTimestamp(it.peek().getSecond()) <= 0) { - bucket.add(it.next().getFirst()); + fileList.add(it.next().getFirst()); } - if (!bucket.isEmpty()) { - buckets.add(bucket); + if (!fileList.isEmpty()) { + if (fileList.size() >= minThreshold) { + DateTieredCompactionRequest request = generateCompactionRequest(fileList, window, + mayUseOffPeak, mayBeStuck, minThreshold); + if (request != null) { + return request; + } + } } } } + // A non-null file list is expected by HStore + return new CompactionRequest(Collections. emptyList()); + } - return buckets; + private DateTieredCompactionRequest generateCompactionRequest(ArrayList storeFiles, + Window window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold) + throws IOException { + // The files has to be in ascending order for ratio-based compaction to work right + // and removeExcessFile to exclude youngest files. + Collections.reverse(storeFiles); + + // Compact everything in the window if have more files than comConf.maxBlockingFiles + compactionPolicyPerWindow.setMinThreshold(minThreshold); + ArrayList storeFileSelection = mayBeStuck ? storeFiles + : compactionPolicyPerWindow.applyCompactionPolicy(storeFiles, mayUseOffPeak, false); + if (storeFileSelection != null && !storeFileSelection.isEmpty()) { + // If there is any file in the window excluded from compaction, + // only one file will be output from compaction. + boolean singleOutput = storeFiles.size() != storeFileSelection.size() || + comConf.useSingleOutputForMinorCompaction(); + List boundaries = getCompactionBoundariesForMinor(window, singleOutput); + DateTieredCompactionRequest result = new DateTieredCompactionRequest(storeFileSelection, + boundaries); + return result; + } + return null; + } + + /** + * Return a list of boundaries for multiple compaction output + * in ascending order. + */ + private List getCompactBoundariesForMajor(Collection filesToCompact, + long oldestToCompact, long now) { + long minTimestamp = Long.MAX_VALUE; + for (StoreFile file : filesToCompact) { + minTimestamp = Math.min(minTimestamp, + file.getMinimumTimestamp() == null? Long.MAX_VALUE : file.getMinimumTimestamp()); + } + + List boundaries = new ArrayList(); + + // Add startMillis of all windows between now and min timestamp + for (Window window = getIncomingWindow(now, comConf.getBaseWindowMillis()); + window.compareToTimestamp(minTimestamp) > 0; + window = window.nextWindow(comConf.getWindowsPerTier(), oldestToCompact)) { + boundaries.add(window.startMillis()); + } + boundaries.add(Long.MIN_VALUE); + Collections.reverse(boundaries); + return boundaries; + } + + /** + * @return a list of boundaries for multiple compaction output + * from minTimestamp to maxTimestamp. + */ + private static List getCompactionBoundariesForMinor(Window window, boolean singleOutput) { + List boundaries = new ArrayList(); + boundaries.add(Long.MIN_VALUE); + if (!singleOutput) { + boundaries.add(window.startMillis()); + } + return boundaries; } /** * Removes all store files with max timestamp older than (current - maxAge). * @param storeFiles all store files to consider * @param maxAge the age in milliseconds when a store file stops participating in compaction. - * @param now current time. store files with max timestamp less than (now - maxAge) are filtered. * @return a list of storeFiles with the store file older than maxAge excluded */ - private static Iterable filterOldStoreFiles(List storeFiles, long maxAge, - long now) { - if (maxAge == 0) return ImmutableList.of(); - final long cutoff = now - maxAge; + private static Iterable filterOldStoreFiles(List storeFiles, + final long cutoff) { return Iterables.filter(storeFiles, new Predicate() { @Override public boolean apply(StoreFile storeFile) { - // This is for findbugs' issue with Guava. We know this won't happen. + // Known findbugs issue to guava. SuppressWarning or Nonnull annotation don't work. if (storeFile == null) { return false; } - return storeFile.getMaximumTimestamp() >= cutoff; + Long maxTimestamp = storeFile.getMaximumTimestamp(); + return maxTimestamp == null ? true : maxTimestamp >= cutoff; } }); } + + private static Window getIncomingWindow(long now, long baseWindowMillis) { + return new Window(baseWindowMillis, now / baseWindowMillis); + } + private static long getOldestToCompact(long maxAgeMillis, long now) { + try { + return LongMath.checkedSubtract(now, maxAgeMillis); + } catch (ArithmeticException ae) { + LOG.warn("Value for " + CompactionConfiguration.MAX_AGE_MILLIS_KEY + ": " + maxAgeMillis + + ". All the files will be eligible for minor compaction."); + return Long.MIN_VALUE; + } + } + /** * This is the class we use to partition from epoch time to now into tiers of exponential sizes of * windows. */ - private static Window getInitialWindow(long now, long timeUnit) { - return new Window(timeUnit, now / timeUnit); - } - - private static class Window { + private static final class Window { /** * How big a range of timestamps fit inside the window in milliseconds. */ private final long windowMillis; + /** * A timestamp t is within the window iff t / size == divPosition. */ private final long divPosition; - public Window(long baseWindowMillis, long divPosition) { - this.windowMillis = baseWindowMillis; + private Window(long baseWindowMillis, long divPosition) { + windowMillis = baseWindowMillis; this.divPosition = divPosition; } - + /** * Compares the window to a timestamp. * @param timestamp the timestamp to compare. @@ -264,6 +370,13 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy { * or after than the timestamp. */ public int compareToTimestamp(long timestamp) { + if (timestamp < 0) { + try { + timestamp = LongMath.checkedSubtract(timestamp, windowMillis - 1); + } catch (ArithmeticException ae) { + timestamp = Long.MIN_VALUE; + } + } long pos = timestamp / windowMillis; return divPosition == pos ? 0 : divPosition < pos ? -1 : 1; } @@ -275,9 +388,42 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy { * following those will be tierBase times as big. * @return The next window */ - public Window nextWindow(int windowsPerTier) { - if (divPosition % windowsPerTier > 0) return new Window(windowMillis, divPosition - 1); - else return new Window(windowMillis * windowsPerTier, divPosition / windowsPerTier - 1); + public Window nextWindow(int windowsPerTier, long oldestToCompact) { + // Don't promote to the next tier if there is not even 1 window at current tier + // or if the next window crosses the max age. + if (divPosition % windowsPerTier > 0 || + startMillis() - windowMillis * windowsPerTier < oldestToCompact) { + return new Window(windowMillis, divPosition - 1); + } else { + return new Window(windowMillis * windowsPerTier, divPosition / windowsPerTier - 1); + } + } + + /** + * Inclusive lower bound + */ + public long startMillis() { + try { + return LongMath.checkedMultiply(windowMillis, divPosition); + } catch (ArithmeticException ae) { + return Long.MIN_VALUE; + } + } + + /** + * Exclusive upper bound + */ + public long endMillis() { + try { + return LongMath.checkedMultiply(windowMillis, (divPosition + 1)); + } catch (ArithmeticException ae) { + return Long.MAX_VALUE; + } + } + + @Override + public String toString() { + return "[" + startMillis() + ", " + endMillis() + ")"; } } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java new file mode 100644 index 0000000..c4b54a5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.hbase.regionserver.StoreFile; + +public class DateTieredCompactionRequest extends CompactionRequest { + private List boundaries; + + public DateTieredCompactionRequest(Collection files, List boundaryList) { + super(files); + boundaries = boundaryList; + } + + public List getBoundaries() { + return boundaries; + } + + @Override + public String toString() { + return super.toString() + " boundaries=" + Arrays.toString(boundaries.toArray()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java index dbd4adb..06185fa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java @@ -51,7 +51,7 @@ public class ExploringCompactionPolicy extends RatioBasedCompactionPolicy { } @Override - final ArrayList applyCompactionPolicy(final ArrayList candidates, + protected final ArrayList applyCompactionPolicy(final ArrayList candidates, final boolean mayUseOffPeak, final boolean mightBeStuck) throws IOException { return new ArrayList(applyCompactionPolicy(candidates, mightBeStuck, mayUseOffPeak, comConf.getMinFilesToCompact(), comConf.getMaxFilesToCompact())); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java index 8948782..477b08e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java @@ -76,11 +76,11 @@ public class FIFOCompactionPolicy extends ExploringCompactionPolicy { } @Override - public boolean isMajorCompaction(Collection filesToCompact) throws IOException { + public boolean shouldPerformMajorCompaction(Collection filesToCompact) throws IOException { boolean isAfterSplit = StoreUtils.hasReferences(filesToCompact); if(isAfterSplit){ LOG.info("Split detected, delegate to the parent policy."); - return super.isMajorCompaction(filesToCompact); + return super.shouldPerformMajorCompaction(filesToCompact); } return false; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java index e632a79..de550eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java @@ -1,230 +1,135 @@ /** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. */ - package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreUtils; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.Collections2; /** - * The default algorithm for selecting files for compaction. - * Combines the compaction configuration and the provisional file selection that - * it's given to produce the list of suitable candidates for compaction. + * The default algorithm for selecting files for compaction. Combines the compaction configuration + * and the provisional file selection that it's given to produce the list of suitable candidates for + * compaction. */ @InterfaceAudience.Private -public class RatioBasedCompactionPolicy extends CompactionPolicy { +public class RatioBasedCompactionPolicy extends SortedCompactionPolicy { private static final Log LOG = LogFactory.getLog(RatioBasedCompactionPolicy.class); - public RatioBasedCompactionPolicy(Configuration conf, - StoreConfigInformation storeConfigInfo) { + public RatioBasedCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) { super(conf, storeConfigInfo); } - private ArrayList getCurrentEligibleFiles( - ArrayList candidateFiles, final List filesCompacting) { - // candidates = all storefiles not already in compaction queue - if (!filesCompacting.isEmpty()) { - // exclude all files older than the newest file we're currently - // compacting. this allows us to preserve contiguity (HBASE-2856) - StoreFile last = filesCompacting.get(filesCompacting.size() - 1); - int idx = candidateFiles.indexOf(last); - Preconditions.checkArgument(idx != -1); - candidateFiles.subList(0, idx + 1).clear(); - } - return candidateFiles; - } - - public List preSelectCompactionForCoprocessor( - final Collection candidates, final List filesCompacting) { - return getCurrentEligibleFiles(new ArrayList(candidates), filesCompacting); - } - - /** - * @param candidateFiles candidate files, ordered from oldest to newest by seqId. We rely on - * DefaultStoreFileManager to sort the files by seqId to guarantee contiguous compaction based - * on seqId for data consistency. - * @return subset copy of candidate list that meets compaction criteria - * @throws java.io.IOException + /* + * @param filesToCompact Files to compact. Can be null. + * @return True if we should run a major compaction. */ - public CompactionRequest selectCompaction(Collection candidateFiles, - final List filesCompacting, final boolean isUserCompaction, - final boolean mayUseOffPeak, final boolean forceMajor) throws IOException { - // Preliminary compaction subject to filters - ArrayList candidateSelection = new ArrayList(candidateFiles); - // Stuck and not compacting enough (estimate). It is not guaranteed that we will be - // able to compact more if stuck and compacting, because ratio policy excludes some - // non-compacting files from consideration during compaction (see getCurrentEligibleFiles). - int futureFiles = filesCompacting.isEmpty() ? 0 : 1; - boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles) - >= storeConfigInfo.getBlockingFileCount(); - candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting); - LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " + - filesCompacting.size() + " compacting, " + candidateSelection.size() + - " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking"); - - if (!forceMajor) { - candidateSelection = skipLargeFiles(candidateSelection); + @Override + public boolean shouldPerformMajorCompaction(final Collection filesToCompact) + throws IOException { + boolean result = false; + long mcTime = getNextMajorCompactTime(filesToCompact); + if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) { + return result; } + // TODO: Use better method for determining stamp of last major (HBASE-2990) + long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); + long now = System.currentTimeMillis(); + if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) { + // Major compaction time has elapsed. + long cfTtl = this.storeConfigInfo.getStoreFileTtl(); + if (filesToCompact.size() == 1) { + // Single file + StoreFile sf = filesToCompact.iterator().next(); + Long minTimestamp = sf.getMinimumTimestamp(); + long oldest = (minTimestamp == null) ? Long.MIN_VALUE : now - minTimestamp.longValue(); + if (sf.isMajorCompaction() && (cfTtl == HConstants.FOREVER || oldest < cfTtl)) { + float blockLocalityIndex = + sf.getHDFSBlockDistribution().getBlockLocalityIndex( + HRegionServer.getHostname(comConf.conf)); + if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Major compaction triggered on only store " + this + + "; to make hdfs blocks local, current blockLocalityIndex is " + + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")"); + } + result = true; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping major compaction of " + this + + " because one (major) compacted file only, oldestTime " + oldest + + "ms is < ttl=" + cfTtl + " and blockLocalityIndex is " + blockLocalityIndex + + " (min " + comConf.getMinLocalityToForceCompact() + ")"); + } + } + } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) { + LOG.debug("Major compaction triggered on store " + this + + ", because keyvalues outdated; time since last major compaction " + + (now - lowTimestamp) + "ms"); + result = true; + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Major compaction triggered on store " + this + + "; time since last major compaction " + (now - lowTimestamp) + "ms"); + } + result = true; + } + } + return result; + } - // Force a major compaction if this is a user-requested major compaction, - // or if we do not have too many files to compact and this was requested - // as a major compaction. - // Or, if there are any references among the candidates. - boolean majorCompaction = ( - (forceMajor && isUserCompaction) - || ((forceMajor || isMajorCompaction(candidateSelection)) - && (candidateSelection.size() < comConf.getMaxFilesToCompact())) - || StoreUtils.hasReferences(candidateSelection) - ); - - if (!majorCompaction) { - // we're doing a minor compaction, let's see what files are applicable + @Override + protected CompactionRequest getCompactionRequest(ArrayList candidateSelection, + boolean tryingMajor, boolean isUserCompaction, boolean mayUseOffPeak, boolean mayBeStuck) + throws IOException { + if (!tryingMajor) { candidateSelection = filterBulk(candidateSelection); candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck); - candidateSelection = checkMinFilesCriteria(candidateSelection); + candidateSelection = + checkMinFilesCriteria(candidateSelection, comConf.getMinFilesToCompact()); } - candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction); + removeExcessFiles(candidateSelection, isUserCompaction, tryingMajor); CompactionRequest result = new CompactionRequest(candidateSelection); - result.setOffPeak(!candidateSelection.isEmpty() && !majorCompaction && mayUseOffPeak); + result.setOffPeak(!candidateSelection.isEmpty() && !tryingMajor && mayUseOffPeak); return result; } /** * @param candidates pre-filtrate - * @return filtered subset - * exclude all files above maxCompactSize - * Also save all references. We MUST compact them - */ - protected ArrayList skipLargeFiles(ArrayList candidates) { - int pos = 0; - while (pos < candidates.size() && !candidates.get(pos).isReference() - && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize())) { - ++pos; - } - if (pos > 0) { - LOG.debug("Some files are too large. Excluding " + pos - + " files from compaction candidates"); - candidates.subList(0, pos).clear(); - } - return candidates; - } - - /** - * @param candidates pre-filtrate - * @return filtered subset - * exclude all bulk load files if configured - */ - protected ArrayList filterBulk(ArrayList candidates) { - candidates.removeAll(Collections2.filter(candidates, - new Predicate() { - @Override - public boolean apply(StoreFile input) { - return input.excludeFromMinorCompaction(); - } - })); - return candidates; - } - - /** - * @param candidates pre-filtrate - * @return filtered subset - * take upto maxFilesToCompact from the start - */ - private ArrayList removeExcessFiles(ArrayList candidates, - boolean isUserCompaction, boolean isMajorCompaction) { - int excess = candidates.size() - comConf.getMaxFilesToCompact(); - if (excess > 0) { - if (isMajorCompaction && isUserCompaction) { - LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() + - " files because of a user-requested major compaction"); - } else { - LOG.debug("Too many admissible files. Excluding " + excess - + " files from compaction candidates"); - candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear(); - } - } - return candidates; - } - /** - * @param candidates pre-filtrate - * @return filtered subset - * forget the compactionSelection if we don't have enough files + * @return filtered subset -- Default minor compaction selection algorithm: choose + * CompactSelection from candidates -- First exclude bulk-load files if indicated in + * configuration. Start at the oldest file and stop when you find the first file that + * meets compaction criteria: (1) a recently-flushed, small file (i.e. <= minCompactSize) + * OR (2) within the compactRatio of sum(newer_files) Given normal skew, any newer files + * will also meet this criteria + *

+ * Additional Note: If fileSizes.size() >> maxFilesToCompact, we will recurse on + * compact(). Consider the oldest files first to avoid a situation where we always compact + * [end-threshold,end). Then, the last file becomes an aggregate of the previous + * compactions. normal skew: older ----> newer (increasing seqID) _ | | _ | | | | _ --|-|- + * |-|- |-|---_-------_------- minCompactSize | | | | | | | | _ | | | | | | | | | | | | | + * | | | | | | | | | | | | | */ - protected ArrayList checkMinFilesCriteria(ArrayList candidates) { - int minFiles = comConf.getMinFilesToCompact(); - if (candidates.size() < minFiles) { - if(LOG.isDebugEnabled()) { - LOG.debug("Not compacting files because we only have " + candidates.size() + - " files ready for compaction. Need " + minFiles + " to initiate."); - } - candidates.clear(); - } - return candidates; - } - - /** - * @param candidates pre-filtrate - * @return filtered subset - * -- Default minor compaction selection algorithm: - * choose CompactSelection from candidates -- - * First exclude bulk-load files if indicated in configuration. - * Start at the oldest file and stop when you find the first file that - * meets compaction criteria: - * (1) a recently-flushed, small file (i.e. <= minCompactSize) - * OR - * (2) within the compactRatio of sum(newer_files) - * Given normal skew, any newer files will also meet this criteria - *

- * Additional Note: - * If fileSizes.size() >> maxFilesToCompact, we will recurse on - * compact(). Consider the oldest files first to avoid a - * situation where we always compact [end-threshold,end). Then, the - * last file becomes an aggregate of the previous compactions. - * - * normal skew: - * - * older ----> newer (increasing seqID) - * _ - * | | _ - * | | | | _ - * --|-|- |-|- |-|---_-------_------- minCompactSize - * | | | | | | | | _ | | - * | | | | | | | | | | | | - * | | | | | | | | | | | | - */ - ArrayList applyCompactionPolicy(ArrayList candidates, + protected ArrayList applyCompactionPolicy(ArrayList candidates, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { if (candidates.isEmpty()) { return candidates; @@ -247,20 +152,19 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { fileSizes[i] = file.getReader().length(); // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo int tooFar = i + comConf.getMaxFilesToCompact() - 1; - sumSize[i] = fileSizes[i] - + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0) - - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0); + sumSize[i] = + fileSizes[i] + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0) + - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0); } - - while (countOfFiles - start >= comConf.getMinFilesToCompact() && - fileSizes[start] > Math.max(comConf.getMinCompactSize(), + while (countOfFiles - start >= comConf.getMinFilesToCompact() + && fileSizes[start] > Math.max(comConf.getMinCompactSize(), (long) (sumSize[start + 1] * ratio))) { ++start; } if (start < countOfFiles) { LOG.info("Default compaction algorithm has selected " + (countOfFiles - start) - + " files from " + countOfFiles + " candidates"); + + " files from " + countOfFiles + " candidates"); } else if (mayBeStuck) { // We may be stuck. Compact the latest files if we can. int filesToLeave = candidates.size() - comConf.getMinFilesToCompact(); @@ -272,109 +176,11 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { return candidates; } - /* - * @param filesToCompact Files to compact. Can be null. - * @return True if we should run a major compaction. - */ - public boolean isMajorCompaction(final Collection filesToCompact) - throws IOException { - boolean result = false; - long mcTime = getNextMajorCompactTime(filesToCompact); - if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) { - return result; - } - // TODO: Use better method for determining stamp of last major (HBASE-2990) - long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); - long now = System.currentTimeMillis(); - if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) { - // Major compaction time has elapsed. - long cfTtl = this.storeConfigInfo.getStoreFileTtl(); - if (filesToCompact.size() == 1) { - // Single file - StoreFile sf = filesToCompact.iterator().next(); - Long minTimestamp = sf.getMinimumTimestamp(); - long oldest = (minTimestamp == null) - ? Long.MIN_VALUE - : now - minTimestamp.longValue(); - if (sf.isMajorCompaction() && - (cfTtl == HConstants.FOREVER || oldest < cfTtl)) { - float blockLocalityIndex = sf.getHDFSBlockDistribution() - .getBlockLocalityIndex(HRegionServer.getHostname(comConf.conf)); - if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Major compaction triggered on only store " + this + - "; to make hdfs blocks local, current blockLocalityIndex is " + - blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + - ")"); - } - result = true; - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping major compaction of " + this + - " because one (major) compacted file only, oldestTime " + - oldest + "ms is < ttl=" + cfTtl + " and blockLocalityIndex is " + - blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + - ")"); - } - } - } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) { - LOG.debug("Major compaction triggered on store " + this + - ", because keyvalues outdated; time since last major compaction " + - (now - lowTimestamp) + "ms"); - result = true; - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Major compaction triggered on store " + this + - "; time since last major compaction " + (now - lowTimestamp) + "ms"); - } - result = true; - } - } - return result; - } - - public long getNextMajorCompactTime(final Collection filesToCompact) { - // default = 24hrs - long ret = comConf.getMajorCompactionPeriod(); - if (ret > 0) { - // default = 20% = +/- 4.8 hrs - double jitterPct = comConf.getMajorCompactionJitter(); - if (jitterPct > 0) { - long jitter = Math.round(ret * jitterPct); - // deterministic jitter avoids a major compaction storm on restart - Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact); - if (seed != null) { - double rnd = (new Random(seed)).nextDouble(); - ret += jitter - Math.round(2L * jitter * rnd); - } else { - ret = 0; // no storefiles == no major compaction - } - } - } - return ret; - } - - /** - * @param compactionSize Total size of some compaction - * @return whether this should be a large or small compaction - */ - public boolean throttleCompaction(long compactionSize) { - return compactionSize > comConf.getThrottlePoint(); - } - - public boolean needsCompaction(final Collection storeFiles, - final List filesCompacting) { - int numCandidates = storeFiles.size() - filesCompacting.size(); - return numCandidates >= comConf.getMinFilesToCompact(); - } - /** * Overwrite min threshold for compaction * @param minThreshold */ - public void setMinThreshold(int minThreshold) - { + public void setMinThreshold(int minThreshold) { comConf.setMinFilesToCompact(minThreshold); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java new file mode 100644 index 0000000..1c60d7a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java @@ -0,0 +1,232 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreUtils; + +/** + * An abstract compaction policy that select files on seq id order. + */ +@InterfaceAudience.Private +public abstract class SortedCompactionPolicy extends CompactionPolicy { + + private static final Log LOG = LogFactory.getLog(SortedCompactionPolicy.class); + + public SortedCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) { + super(conf, storeConfigInfo); + } + + public List preSelectCompactionForCoprocessor(final Collection candidates, + final List filesCompacting) { + return getCurrentEligibleFiles(new ArrayList(candidates), filesCompacting); + } + + /** + * @param candidateFiles candidate files, ordered from oldest to newest by seqId. We rely on + * DefaultStoreFileManager to sort the files by seqId to guarantee contiguous compaction based + * on seqId for data consistency. + * @return subset copy of candidate list that meets compaction criteria + * @throws java.io.IOException + */ + public CompactionRequest selectCompaction(Collection candidateFiles, + final List filesCompacting, final boolean isUserCompaction, + final boolean mayUseOffPeak, final boolean forceMajor) throws IOException { + // Preliminary compaction subject to filters + ArrayList candidateSelection = new ArrayList(candidateFiles); + // Stuck and not compacting enough (estimate). It is not guaranteed that we will be + // able to compact more if stuck and compacting, because ratio policy excludes some + // non-compacting files from consideration during compaction (see getCurrentEligibleFiles). + int futureFiles = filesCompacting.isEmpty() ? 0 : 1; + boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles) + >= storeConfigInfo.getBlockingFileCount(); + + candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting); + LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " + + filesCompacting.size() + " compacting, " + candidateSelection.size() + + " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking"); + + if (!forceMajor) { + candidateSelection = skipLargeFiles(candidateSelection); + } + + // Force a major compaction if this is a user-requested major compaction, + // or if we do not have too many files to compact and this was requested + // as a major compaction. + // Or, if there are any references among the candidates. + boolean tryingMajor = (forceMajor && isUserCompaction) + || ((forceMajor || shouldPerformMajorCompaction(candidateSelection)) + && (candidateSelection.size() < comConf.getMaxFilesToCompact())) + || StoreUtils.hasReferences(candidateSelection); + + if (tryingMajor) { + LOG.debug("Trying to select files for major compaction with forceMajor:" + + forceMajor + ", userCompaction:" + isUserCompaction); + } + + return getCompactionRequest(candidateSelection, tryingMajor, isUserCompaction, + mayUseOffPeak, mayBeStuck); + } + + protected abstract CompactionRequest getCompactionRequest(ArrayList candidateSelection, + boolean tryingMajor, boolean isUserCompaction, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException; + + /* + * @param filesToCompact Files to compact. Can be null. + * @return True if we should run a major compaction. + */ + public abstract boolean shouldPerformMajorCompaction(final Collection filesToCompact) throws IOException; + + public long getNextMajorCompactTime(final Collection filesToCompact) { + // default = 24hrs + long ret = comConf.getMajorCompactionPeriod(); + if (ret > 0) { + // default jitter = 20% = +/- 4.8 hrs + double jitterPct = comConf.getMajorCompactionJitter(); + if (jitterPct > 0) { + long jitter = Math.round(ret * jitterPct); + // deterministic jitter avoids a major compaction storm on restart + Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact); + if (seed != null) { + double rnd = (new Random(seed)).nextDouble(); + ret += jitter - Math.round(2L * jitter * rnd); + } else { + ret = 0; // no storefiles == no major compaction + } + } + } + return ret; + } + + /** + * @param compactionSize Total size of some compaction + * @return whether this should be a large or small compaction + */ + public boolean throttleCompaction(long compactionSize) { + return compactionSize > comConf.getThrottlePoint(); + } + + /** + * A heuristic method to decide whether to schedule a compaction request + * @param storeFiles files in the store. + * @param filesCompacting files being scheduled to compact. + * @return true to schedule a request. + */ + public boolean needsCompaction(final Collection storeFiles, + final List filesCompacting) { + int numCandidates = storeFiles.size() - filesCompacting.size(); + return numCandidates >= comConf.getMinFilesToCompact(); + } + + + protected ArrayList getCurrentEligibleFiles(ArrayList candidateFiles, + final List filesCompacting) { + // candidates = all storefiles not already in compaction queue + if (!filesCompacting.isEmpty()) { + // exclude all files older than the newest file we're currently + // compacting. this allows us to preserve contiguity (HBASE-2856) + StoreFile last = filesCompacting.get(filesCompacting.size() - 1); + int idx = candidateFiles.indexOf(last); + Preconditions.checkArgument(idx != -1); + candidateFiles.subList(0, idx + 1).clear(); + } + return candidateFiles; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset exclude all files above maxCompactSize Also save all references. We + * MUST compact them + */ + protected ArrayList skipLargeFiles(ArrayList candidates) { + int pos = 0; + while (pos < candidates.size() && !candidates.get(pos).isReference() + && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize())) { + ++pos; + } + if (pos > 0) { + LOG.debug("Some files are too large. Excluding " + pos + " files from compaction candidates"); + candidates.subList(0, pos).clear(); + } + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset exclude all bulk load files if configured + */ + protected ArrayList filterBulk(ArrayList candidates) { + candidates.removeAll(Collections2.filter(candidates, new Predicate() { + @Override + public boolean apply(StoreFile input) { + return input.excludeFromMinorCompaction(); + } + })); + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset take up to maxFilesToCompact from the start + */ + protected void removeExcessFiles(ArrayList candidates, + boolean isUserCompaction, boolean isMajorCompaction) { + int excess = candidates.size() - comConf.getMaxFilesToCompact(); + if (excess > 0) { + if (isMajorCompaction && isUserCompaction) { + LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() + + " files because of a user-requested major compaction"); + } else { + LOG.debug("Too many admissible files. Excluding " + excess + + " files from compaction candidates"); + candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear(); + } + } + } + + /** + * @param candidates pre-filtrate + * @return filtered subset forget the compactionSelection if we don't have enough files + */ + protected ArrayList checkMinFilesCriteria(ArrayList candidates, + int minFiles) { + if (candidates.size() < minFiles) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not compacting files because we only have " + candidates.size() + + " files ready for compaction. Need " + minFiles + " to initiate."); + } + candidates.clear(); + } + return candidates; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java index 54a08ac..e13e6b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java @@ -165,7 +165,7 @@ public class StripeCompactionPolicy extends CompactionPolicy { } @Override - public boolean isMajorCompaction(Collection filesToCompact) throws IOException { + public boolean shouldPerformMajorCompaction(Collection filesToCompact) throws IOException { return false; // there's never a major compaction! } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java index db25edb..3636c48 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java @@ -24,9 +24,11 @@ import java.util.TreeMap; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** A mock used so our tests don't deal with actual StoreFiles */ public class MockStoreFile extends StoreFile { @@ -38,7 +40,10 @@ public class MockStoreFile extends StoreFile { byte[] splitPoint = null; TimeRangeTracker timeRangeTracker; long entryCount; - + boolean isMajor; + HDFSBlocksDistribution hdfsBlocksDistribution; + long modificationTime; + MockStoreFile(HBaseTestingUtility testUtil, Path testPath, long length, long ageInDisk, boolean isRef, long sequenceid) throws IOException { super(testUtil.getTestFileSystem(), testPath, testUtil.getConfiguration(), @@ -47,6 +52,11 @@ public class MockStoreFile extends StoreFile { this.isRef = isRef; this.ageInDisk = ageInDisk; this.sequenceid = sequenceid; + this.isMajor = false; + hdfsBlocksDistribution = new HDFSBlocksDistribution(); + hdfsBlocksDistribution.addHostsAndBlockWeight( + new String[] { HRegionServer.getHostname(testUtil.getConfiguration()) }, 1); + modificationTime = EnvironmentEdgeManager.currentTimeMillis(); } void setLength(long newLen) { @@ -65,16 +75,19 @@ public class MockStoreFile extends StoreFile { @Override public boolean isMajorCompaction() { - return false; + return isMajor; } + public void setIsMajor(boolean isMajor) { + this.isMajor = isMajor; + } @Override public boolean isReference() { return this.isRef; } @Override - boolean isBulkLoadResult() { + public boolean isBulkLoadResult() { return false; } @@ -96,18 +109,28 @@ public class MockStoreFile extends StoreFile { } public Long getMinimumTimestamp() { - return (timeRangeTracker == null) ? - null : - timeRangeTracker.getMinimumTimestamp(); + return (timeRangeTracker == null) ? + null : + timeRangeTracker.getMinimumTimestamp(); } - + public Long getMaximumTimestamp() { - return (timeRangeTracker == null) ? - null : - timeRangeTracker.getMaximumTimestamp(); + return (timeRangeTracker == null) ? + null : + timeRangeTracker.getMaximumTimestamp(); } @Override + public HDFSBlocksDistribution getHDFSBlockDistribution() { + return hdfsBlocksDistribution; + } + + @Override + public long getModificationTimeStamp() { + return modificationTime; + } + + @Override public StoreFile.Reader getReader() { final long len = this.length; final TimeRangeTracker timeRange = this.timeRangeTracker; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompaction.java index 8afe621..21480d5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompaction.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -26,9 +25,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -37,6 +40,10 @@ import org.junit.experimental.categories.Category; public class TestDateTieredCompaction extends TestCompactionPolicy { ArrayList sfCreate(long[] minTimestamps, long[] maxTimestamps, long[] sizes) throws IOException { + ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(timeMachine); + // Has to be > 0 and < now. + timeMachine.setValue(1); ArrayList ageInDisk = new ArrayList(); for (int i = 0; i < sizes.length; i++) { ageInDisk.add(0L); @@ -57,29 +64,47 @@ public class TestDateTieredCompaction extends TestCompactionPolicy { super.config(); // Set up policy - conf.setLong(CompactionConfiguration.MAX_AGE_KEY, 100); + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, + "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine"); + conf.setLong(CompactionConfiguration.MAX_AGE_MILLIS_KEY, 100); conf.setLong(CompactionConfiguration.INCOMING_WINDOW_MIN_KEY, 3); conf.setLong(CompactionConfiguration.BASE_WINDOW_MILLIS_KEY, 6); conf.setInt(CompactionConfiguration.WINDOWS_PER_TIER_KEY, 4); - conf.set(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, - DateTieredCompactionPolicy.class.getName()); + conf.setBoolean(CompactionConfiguration.SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, false); // Special settings for compaction policy per window - this.conf.setInt(CompactionConfiguration.MIN_KEY, 2); - this.conf.setInt(CompactionConfiguration.MAX_KEY, 12); - this.conf.setFloat(CompactionConfiguration.RATIO_KEY, 1.2F); + conf.setInt(CompactionConfiguration.MIN_KEY, 2); + conf.setInt(CompactionConfiguration.MAX_KEY, 12); + conf.setFloat(CompactionConfiguration.RATIO_KEY, 1.2F); + + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 20); + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 10); } - void compactEquals(long now, ArrayList candidates, long... expected) - throws IOException { - Assert.assertTrue(((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy()) - .needsCompaction(candidates, ImmutableList. of(), now)); - - List actual = - ((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy()) - .applyCompactionPolicy(candidates, false, false, now); - - Assert.assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual))); + void compactEquals(long now, ArrayList candidates, long[] expectedFileSizes, + long[] expectedBoundaries, boolean isMajor, boolean toCompact) throws IOException { + ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(timeMachine); + timeMachine.setValue(now); + DateTieredCompactionRequest request; + if (isMajor) { + for (StoreFile file : candidates) { + ((MockStoreFile)file).setIsMajor(true); + } + Assert.assertEquals(toCompact, ((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy()) + .shouldPerformMajorCompaction(candidates)); + request = (DateTieredCompactionRequest) ((DateTieredCompactionPolicy) store.storeEngine + .getCompactionPolicy()).selectMajorCompaction(candidates); + } else { + Assert.assertEquals(toCompact, ((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy()) + .needsCompaction(candidates, ImmutableList. of())); + request = (DateTieredCompactionRequest) ((DateTieredCompactionPolicy) store.storeEngine + .getCompactionPolicy()).selectMinorCompaction(candidates, false, false); + } + List actual = Lists.newArrayList(request.getFiles()); + Assert.assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual))); + Assert.assertEquals(Arrays.toString(expectedBoundaries), + Arrays.toString(request.getBoundaries().toArray())); } /** @@ -92,7 +117,8 @@ public class TestDateTieredCompaction extends TestCompactionPolicy { long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 }; long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 }; - compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 13, 12, 11, 10); + compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 10, 11, 12, 13 }, + new long[] { Long.MIN_VALUE, 12 }, false, true); } /** @@ -105,7 +131,22 @@ public class TestDateTieredCompaction extends TestCompactionPolicy { long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 }; long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11 }; - compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 25, 24, 23, 22, 21, 20); + compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 20, 21, 22, 23, + 24, 25 }, new long[] { Long.MIN_VALUE, 6}, false, true); + } + + /** + * Test for file on the upper bound of incoming window + * @throws IOException with error + */ + @Test + public void OnUpperBoundOfIncomingWindow() throws IOException { + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 18 }; + long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 }; + + compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 10, 11, 12, 13 }, + new long[] { Long.MIN_VALUE, 12 }, false, true); } /** @@ -115,23 +156,25 @@ public class TestDateTieredCompaction extends TestCompactionPolicy { @Test public void NewerThanIncomingWindow() throws IOException { long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; - long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 18 }; + long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 19 }; long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 }; - compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 13, 12, 11, 10); + compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 10, 11, 12, 13 }, + new long[] { Long.MIN_VALUE, 12}, false, true); } /** - * If there is no T1 window, we don't build 2 + * If there is no T1 window, we don't build T2 * @throws IOException with error */ @Test public void NoT2() throws IOException { long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; - long[] maxTimestamps = new long[] { 44, 60, 61, 92, 95, 100 }; + long[] maxTimestamps = new long[] { 44, 60, 61, 97, 100, 193 }; long[] sizes = new long[] { 0, 20, 21, 22, 23, 1 }; - compactEquals(100, sfCreate(minTimestamps, maxTimestamps, sizes), 23, 22); + compactEquals(194, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 22, 23 }, + new long[] { Long.MIN_VALUE, 96}, false, true); } @Test @@ -140,7 +183,8 @@ public class TestDateTieredCompaction extends TestCompactionPolicy { long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 120, 124, 143, 145, 157 }; long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 30, 31, 32, 2, 1 }; - compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), 32, 31, 30); + compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 30, 31, 32 }, + new long[] { Long.MIN_VALUE, 120 }, false, true); } /** @@ -153,7 +197,8 @@ public class TestDateTieredCompaction extends TestCompactionPolicy { long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }; long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 280, 23, 24, 1 }; - compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 22, 21, 20); + compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 20, 21, 22 }, + new long[] { Long.MIN_VALUE }, false, true); } /** @@ -166,7 +211,8 @@ public class TestDateTieredCompaction extends TestCompactionPolicy { long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 120, 124, 143, 145, 157 }; long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 350, 30, 31, 2, 1 }; - compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), 31, 30); + compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 30, 31 }, + new long[] { Long.MIN_VALUE }, false, true); } /** @@ -179,33 +225,101 @@ public class TestDateTieredCompaction extends TestCompactionPolicy { long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 8, 9, 10, 11, 12 }; long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 280, 23, 24, 1 }; - compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 24, 23); + compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 23, 24 }, + new long[] { Long.MIN_VALUE }, false, true); } /** - * Older than now(161) - maxAge(100) - * @throws IOException with error - */ - @Test - public void olderThanMaxAge() throws IOException { - long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; - long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 105, 106, 113, 145, 157 }; - long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 }; - - compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), 31, 30, 33, 42, 41, 40); - } + * Older than now(161) - maxAge(100) + * @throws IOException with error + */ + @Test + public void olderThanMaxAge() throws IOException { + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 105, 106, 113, 145, 157 }; + long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 }; + + compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 40, 41, 42, 33, + 30, 31 }, new long[] { Long.MIN_VALUE, 96 }, false, true); + } /** * Out-of-order data * @throws IOException with error */ @Test - public void OutOfOrder() throws IOException { + public void outOfOrder() throws IOException { long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; long[] maxTimestamps = new long[] { 0, 13, 3, 10, 11, 1, 2, 12, 14, 15 }; long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 28, 23, 24, 1 }; - compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 1, 24, 23, 28, 22, 34, - 33, 32, 31); + compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 31, 32, 33, 34, + 22, 28, 23, 24, 1 }, new long[] { Long.MIN_VALUE, 12 }, false, true); + } + + /** + * Negative epoch time + * @throws IOException with error + */ + @Test + public void negativeEpochtime() throws IOException { + long[] minTimestamps = + new long[] { -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000 }; + long[] maxTimestamps = new long[] { -28, -11, -10, -9, -8, -7, -6, -5, -4, -3 }; + long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 25, 23, 24, 1 }; + + compactEquals(1, sfCreate(minTimestamps, maxTimestamps, sizes), + new long[] { 31, 32, 33, 34, 22, 25, 23, 24, 1 }, + new long[] { Long.MIN_VALUE, -24 }, false, true); + } + + /** + * Major compaction + * @throws IOException with error + */ + @Test + public void majorCompation() throws IOException { + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 105, 106, 113, 145, 157 }; + long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 }; + + compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 0, 50, 51, 40,41, 42, + 33, 30, 31, 2, 1 }, new long[] { Long.MIN_VALUE, 24, 48, 72, 96, 120, 144, 150, 156 }, true, true); + } + + /** + * Major compaction with negative numbers + * @throws IOException with error + */ + @Test + public void negativeForMajor() throws IOException { + long[] minTimestamps = + new long[] { -155, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100 }; + long[] maxTimestamps = new long[] { -8, -7, -6, -5, -4, -3, -2, -1, 0, 6, 13 }; + long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 }; + + compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 0, 50, 51, 40, + 41, 42, 33, 30, 31, 2, 1 }, + new long[] { Long.MIN_VALUE, -144, -120, -96, -72, -48, -24, 0, 6, 12 }, true, true); + } + + /** + * Major compaction with maximum values + * @throws IOException with error + */ + @Test + public void maxValuesForMajor() throws IOException { + conf.setLong(CompactionConfiguration.BASE_WINDOW_MILLIS_KEY, Long.MAX_VALUE / 2); + conf.setInt(CompactionConfiguration.WINDOWS_PER_TIER_KEY, 2); + store.storeEngine.getCompactionPolicy().setConf(conf); + long[] minTimestamps = + new long[] { Long.MIN_VALUE, -100 }; + long[] maxTimestamps = new long[] { -8, Long.MAX_VALUE }; + long[] sizes = new long[] { 0, 1 }; + + compactEquals(Long.MAX_VALUE, sfCreate(minTimestamps, maxTimestamps, sizes), + new long[] { 0, 1 }, + new long[] { Long.MIN_VALUE, -4611686018427387903L, 0, 4611686018427387903L, + 9223372036854775806L }, true, true); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java index 11a63b4..4448572 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -89,6 +91,10 @@ public class TestDefaultCompactSelection extends TestCompactionPolicy { conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1); conf.setFloat("hbase.hregion.majorcompaction.jitter", 0); store.storeEngine.getCompactionPolicy().setConf(conf); + ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(timeMachine); + // Has to be > 0 and < now. + timeMachine.setValue(1); try { // trigger an aged major compaction compactEquals(sfCreate(50,25,12,12), 50, 25, 12, 12); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/EverythingPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/EverythingPolicy.java index 412e5a7..9a4bb8e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/EverythingPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/EverythingPolicy.java @@ -42,7 +42,7 @@ public class EverythingPolicy extends RatioBasedCompactionPolicy { } @Override - final ArrayList applyCompactionPolicy(final ArrayList candidates, + protected final ArrayList applyCompactionPolicy(final ArrayList candidates, final boolean mayUseOffPeak, final boolean mayBeStuck) throws IOException { if (candidates.size() < comConf.getMinFilesToCompact()) { -- 1.9.3 (Apple Git-50)