From 51fe267e9763b939be46621ba71fa70b04d3445a Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 27 Apr 2016 13:04:29 +0800 Subject: [PATCH] HBASE-15454 Archive store files older than max age --- .../regionserver/DateTieredMultiFileWriter.java | 52 +++++- .../hbase/regionserver/DateTieredStoreEngine.java | 12 +- .../regionserver/DateTieredStoreFileManager.java | 50 ++++++ .../hbase/regionserver/DefaultStoreEngine.java | 4 +- .../regionserver/DefaultStoreFileManager.java | 11 +- .../compactions/CompactionConfiguration.java | 20 ++- .../compactions/DateTieredCompactionPolicy.java | 195 +++++++++++++++++---- .../compactions/DateTieredCompactionRequest.java | 16 +- .../compactions/DateTieredCompactor.java | 13 +- .../compactions/RatioBasedCompactionPolicy.java | 6 +- .../compactions/SortedCompactionPolicy.java | 30 ++-- .../AbstractTestDateTieredCompactionPolicy.java | 40 +++-- .../TestDateTieredCompactionPolicy.java | 28 +-- .../TestDateTieredCompactionPolicyFreeze.java | 92 ++++++++++ .../TestDateTieredCompactionPolicyOverflow.java | 2 +- .../regionserver/compactions/TestCompactor.java | 16 ++ .../compactions/TestDateTieredCompactor.java | 16 +- 17 files changed, 486 insertions(+), 117 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreFileManager.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyFreeze.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java index 2cea92f..c0afbd0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java @@ -17,8 +17,13 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.regionserver.DateTieredStoreFileManager.FREEZING_WINDOW_END_TIMESTAMP; +import static org.apache.hadoop.hbase.regionserver.DateTieredStoreFileManager.FREEZING_WINDOW_START_TIMESTAMP; + import java.io.IOException; import java.util.Collection; +import java.util.IdentityHashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -26,6 +31,8 @@ import java.util.TreeMap; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; /** * class for cell sink that separates the provided cells into multiple files for date tiered @@ -37,16 +44,34 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter { private final NavigableMap lowerBoundary2Writer = new TreeMap(); + private final IdentityHashMap> writer2FreezingWindow + = new IdentityHashMap>(); + + private final Long highestBoundary; + + private final long freezeWindowOlderThan; + private final boolean needEmptyFile; /** * @param needEmptyFile whether need to create an empty store file if we haven't written out * anything. */ - public DateTieredMultiFileWriter(List lowerBoundaries, boolean needEmptyFile) { - for (Long lowerBoundary : lowerBoundaries) { - lowerBoundary2Writer.put(lowerBoundary, null); + public DateTieredMultiFileWriter(List boundaries, long freezeWindowOlderThan, + boolean needEmptyFile) { + assert boundaries.size() >= 2; + Iterator iter = boundaries.iterator(); + lowerBoundary2Writer.put(iter.next(), null); + for (;;) { + Long boundary = iter.next(); + if (iter.hasNext()) { + lowerBoundary2Writer.put(boundary, null); + } else { + highestBoundary = boundary; + break; + } } + this.freezeWindowOlderThan = freezeWindowOlderThan; this.needEmptyFile = needEmptyFile; } @@ -57,6 +82,16 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter { if (writer == null) { writer = writerFactory.createWriter(); lowerBoundary2Writer.put(entry.getKey(), writer); + if (entry.getKey().longValue() < freezeWindowOlderThan) { + // maybe an freezing window + Long higherBoundary = lowerBoundary2Writer.higherKey(entry.getKey()); + if (higherBoundary == null) { + higherBoundary = highestBoundary; + } + if (higherBoundary.longValue() <= freezeWindowOlderThan) { + writer2FreezingWindow.put(writer, Pair.newPair(entry.getKey(), higherBoundary)); + } + } } writer.append(cell); } @@ -67,6 +102,17 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter { } @Override + protected void preCloseWriter(StoreFileWriter writer) throws IOException { + Pair freezingWindow = writer2FreezingWindow.get(writer); + if (freezingWindow != null) { + writer.appendFileInfo(FREEZING_WINDOW_START_TIMESTAMP, + Bytes.toBytes(freezingWindow.getFirst().longValue())); + writer.appendFileInfo(FREEZING_WINDOW_END_TIMESTAMP, + Bytes.toBytes(freezingWindow.getSecond().longValue())); + } + } + + @Override protected void preCommitWriters() throws IOException { if (!needEmptyFile) { return; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java index 2d86e39..d764af9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java @@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.security.User; */ @InterfaceAudience.Private public class DateTieredStoreEngine extends StoreEngine { + DateTieredCompactionPolicy, DateTieredCompactor, DateTieredStoreFileManager> { @Override public boolean needsCompaction(List filesCompacting) { return compactionPolicy.needsCompaction(storeFileManager.getStorefiles(), @@ -57,9 +57,8 @@ public class DateTieredStoreEngine extends StoreEngine compact(ThroughputController throughputController, User user) throws IOException { if (request instanceof DateTieredCompactionRequest) { - return compactor.compact(request, ((DateTieredCompactionRequest) request).getBoundaries(), - throughputController, user); + DateTieredCompactionRequest dateTieredRequest = (DateTieredCompactionRequest) request; + return compactor.compact(request, dateTieredRequest.getBoundaries(), + dateTieredRequest.getFreezeWindowOlderThan(), throughputController, user); } else { throw new IllegalArgumentException("DateTieredCompactionRequest is expected. Actual: " + request.getClass().getCanonicalName()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreFileManager.java new file mode 100644 index 0000000..f678dbc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreFileManager.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Comparator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * StoreFileManager for date tiered compaction. + */ +@InterfaceAudience.Private +public class DateTieredStoreFileManager extends DefaultStoreFileManager { + /** + * The file metadata fields that contain the freezing window information. + */ + public static final byte[] FREEZING_WINDOW_START_TIMESTAMP = Bytes + .toBytes("FREEZING_WINDOW_START_TIMESTAMP"); + public static final byte[] FREEZING_WINDOW_END_TIMESTAMP = Bytes + .toBytes("FREEZING_WINDOW_END_TIMESTAMP"); + + public DateTieredStoreFileManager(CellComparator kvComparator, Configuration conf, + CompactionConfiguration comConf) { + super(kvComparator, conf, comConf); + } + + @Override + public Comparator getStoreFileComparator() { + return StoreFile.Comparators.SEQ_ID_MAX_TIMESTAMP; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java index 8e94e2f..5bd6122 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java @@ -68,9 +68,7 @@ public class DefaultStoreEngine extends StoreEngine< createCompactor(conf, store); createCompactionPolicy(conf, store); createStoreFlusher(conf, store); - storeFileManager = - new DefaultStoreFileManager(kvComparator, StoreFile.Comparators.SEQ_ID, conf, - compactionPolicy.getConf()); + storeFileManager = new DefaultStoreFileManager(kvComparator, conf, compactionPolicy.getConf()); } protected void createCompactor(Configuration conf, Store store) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java index 2217034..d12067d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java @@ -49,7 +49,6 @@ class DefaultStoreFileManager implements StoreFileManager { private final CellComparator kvComparator; private final CompactionConfiguration comConf; private final int blockingFileCount; - private final Comparator storeFileComparator; /** * List of store files inside this store. This is an immutable list that * is atomically replaced when its contents change. @@ -63,11 +62,9 @@ class DefaultStoreFileManager implements StoreFileManager { */ private volatile List compactedfiles = null; - public DefaultStoreFileManager(CellComparator kvComparator, - Comparator storeFileComparator, Configuration conf, + public DefaultStoreFileManager(CellComparator kvComparator, Configuration conf, CompactionConfiguration comConf) { this.kvComparator = kvComparator; - this.storeFileComparator = storeFileComparator; this.comConf = comConf; this.blockingFileCount = conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT); @@ -213,13 +210,13 @@ class DefaultStoreFileManager implements StoreFileManager { } private void sortAndSetStoreFiles(List storeFiles) { - Collections.sort(storeFiles, storeFileComparator); + Collections.sort(storeFiles, getStoreFileComparator()); storefiles = ImmutableList.copyOf(storeFiles); } private List sortCompactedfiles(List storefiles) { // Sorting may not be really needed here for the compacted files? - Collections.sort(storefiles, storeFileComparator); + Collections.sort(storefiles, getStoreFileComparator()); return new ArrayList(storefiles); } @@ -235,7 +232,7 @@ class DefaultStoreFileManager implements StoreFileManager { @Override public Comparator getStoreFileComparator() { - return storeFileComparator; + return StoreFile.Comparators.SEQ_ID; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java index edf6cae..5bd83b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -88,6 +88,11 @@ public class CompactionConfiguration { private static final Class DEFAULT_DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS = ExponentialCompactionWindowFactory.class; + public static final String FREEZE_DATE_TIERED_WINDOW_OLDER_THAN_MAX_AGE_KEY = + "hbase.hstore.compaction.date.tiered.freeze.window.older.than.max.age"; + + private static final boolean DEFAULT_FREEZE_DATE_TIERED_WINDOW_OLDER_THAN_MAX_AGE = false; + Configuration conf; StoreConfigInformation storeConfigInfo; @@ -109,6 +114,7 @@ public class CompactionConfiguration { private final String compactionPolicyForDateTieredWindow; private final boolean dateTieredSingleOutputForMinorCompaction; private final String dateTieredCompactionWindowFactory; + private final boolean freezeDateTieredWindowOlderThanMaxAge; CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) { this.conf = conf; @@ -139,9 +145,12 @@ public class CompactionConfiguration { DEFAULT_COMPACTION_POLICY_CLASS_FOR_DATE_TIERED_WINDOWS.getName()); dateTieredSingleOutputForMinorCompaction = conf .getBoolean(DATE_TIERED_SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, true); - this.dateTieredCompactionWindowFactory = conf.get( + dateTieredCompactionWindowFactory = conf.get( DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS_KEY, DEFAULT_DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS.getName()); + freezeDateTieredWindowOlderThanMaxAge = conf.getBoolean( + FREEZE_DATE_TIERED_WINDOW_OLDER_THAN_MAX_AGE_KEY, + DEFAULT_FREEZE_DATE_TIERED_WINDOW_OLDER_THAN_MAX_AGE); LOG.info(this); } @@ -152,7 +161,7 @@ public class CompactionConfiguration { + " major period %d, major jitter %f, min locality to compact %f;" + " tiered compaction: max_age %d, incoming window min %d," + " compaction policy for tiered window %s, single output for minor %b," - + " compaction window factory %s", + + " compaction window factory %s, freeze window older than max age %b", minCompactSize, maxCompactSize, offPeakMaxCompactSize, @@ -168,7 +177,8 @@ public class CompactionConfiguration { dateTieredIncomingWindowMin, compactionPolicyForDateTieredWindow, dateTieredSingleOutputForMinorCompaction, - dateTieredCompactionWindowFactory + dateTieredCompactionWindowFactory, + freezeDateTieredWindowOlderThanMaxAge ); } @@ -285,4 +295,8 @@ public class CompactionConfiguration { public String getDateTieredCompactionWindowFactory() { return dateTieredCompactionWindowFactory; } + + public boolean freezeDateTieredWindowOlderThanMaxAge() { + return freezeDateTieredWindowOlderThanMaxAge; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java index 037bc80..3d4ae80 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java @@ -66,6 +66,9 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { private static final Log LOG = LogFactory.getLog(DateTieredCompactionPolicy.class); + private static DateTieredCompactionRequest EMPTY_REQUEST = new DateTieredCompactionRequest( + Collections. emptyList(), Collections. emptyList(), Long.MIN_VALUE); + private final RatioBasedCompactionPolicy compactionPolicyPerWindow; private final CompactionWindowFactory windowFactory; @@ -97,11 +100,12 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { */ @Override @VisibleForTesting - public boolean needsCompaction(final Collection storeFiles, + public boolean needsCompaction(final Collection allFiles, final List filesCompacting) { - ArrayList candidates = new ArrayList(storeFiles); + ArrayList candidates = new ArrayList(allFiles); try { - return !selectMinorCompaction(candidates, false, true).getFiles().isEmpty(); + return !selectMinorCompaction(allFiles, filesCompacting, candidates, false, true).getFiles() + .isEmpty(); } catch (Exception e) { LOG.error("Can not check for compaction: ", e); return false; @@ -130,10 +134,9 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { } long cfTTL = this.storeConfigInfo.getStoreFileTtl(); - HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); List boundaries = getCompactBoundariesForMajor(filesToCompact, now); boolean[] filesInWindow = new boolean[boundaries.size()]; - + HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); for (StoreFile file: filesToCompact) { Long minTimestamp = file.getMinimumTimestamp(); long oldest = (minTimestamp == null) ? Long.MIN_VALUE : now - minTimestamp.longValue(); @@ -150,9 +153,9 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { } int lowerWindowIndex = Collections.binarySearch(boundaries, - minTimestamp == null ? (Long)Long.MAX_VALUE : minTimestamp); + minTimestamp == null ? (Long) Long.MAX_VALUE : minTimestamp); int upperWindowIndex = Collections.binarySearch(boundaries, - file.getMaximumTimestamp() == null ? (Long)Long.MAX_VALUE : file.getMaximumTimestamp()); + file.getMaximumTimestamp() == null ? (Long) Long.MAX_VALUE : file.getMaximumTimestamp()); if (lowerWindowIndex != upperWindowIndex) { LOG.debug("Major compaction triggered on store " + this + "; because file " + file.getPath() + " has data with timestamps cross window boundaries"); @@ -182,10 +185,12 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { } @Override - protected CompactionRequest createCompactionRequest(ArrayList candidateSelection, - boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { + protected CompactionRequest createCompactionRequest(Collection allFiles, + List filesCompacting, ArrayList candidateSelection, boolean tryingMajor, + boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { CompactionRequest result = tryingMajor ? selectMajorCompaction(candidateSelection) - : selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck); + : selectMinorCompaction(allFiles, filesCompacting, candidateSelection, mayUseOffPeak, + mayBeStuck); if (LOG.isDebugEnabled()) { LOG.debug("Generated compaction request: " + result); } @@ -194,8 +199,122 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { public CompactionRequest selectMajorCompaction(ArrayList candidateSelection) { long now = EnvironmentEdgeManager.currentTime(); + long oldestToCompact = getOldestToCompact(comConf.getDateTieredMaxStoreFileAgeMillis(), now); return new DateTieredCompactionRequest(candidateSelection, - this.getCompactBoundariesForMajor(candidateSelection, now)); + this.getCompactBoundariesForMajor(candidateSelection, now), oldestToCompact); + } + + private boolean shouldIncludeForFreezing(long startMillis, long endMillis, StoreFile file) { + if (file.getMinimumTimestamp() == null) { + if (file.getMaximumTimestamp() == null) { + return true; + } else { + return file.getMaximumTimestamp().longValue() >= startMillis; + } + } else { + if (file.getMaximumTimestamp() == null) { + return file.getMinimumTimestamp().longValue() < endMillis; + } else { + return file.getMinimumTimestamp().longValue() < endMillis + && file.getMaximumTimestamp().longValue() >= startMillis; + } + } + } + + private boolean canDropWholeFile(long now, long cfTTL, StoreFile file) { + long expiredBefore = now - cfTTL; + return cfTTL != Long.MAX_VALUE && file.getMaximumTimestamp() != null + && file.getMaximumTimestamp().longValue() < expiredBefore; + } + + private boolean fitsInWindow(long startMillis, long endMillis, StoreFile file) { + return file.getMinimumTimestamp() != null && file.getMaximumTimestamp() != null + && file.getMinimumTimestamp().longValue() >= startMillis + && file.getMaximumTimestamp().longValue() < endMillis; + } + + private CompactionRequest freezeOldWindows(Collection allFiles, + List filesCompacting, CompactionWindow youngestFreezingWindow, + long oldestToCompact, long now) { + if (!comConf.freezeDateTieredWindowOlderThanMaxAge()) { + // A non-null file list is expected by HStore + return EMPTY_REQUEST; + } + long minTimestamp = Long.MAX_VALUE; + for (StoreFile file : allFiles) { + minTimestamp = Math.min(minTimestamp, file.getMinimumTimestamp() == null ? Long.MAX_VALUE + : file.getMinimumTimestamp().longValue()); + } + List freezingWindowBoundaries = Lists.newArrayList(); + freezingWindowBoundaries.add(youngestFreezingWindow.endMillis()); + for (CompactionWindow window = youngestFreezingWindow; window + .compareToTimestamp(minTimestamp) >= 0; window = window.nextEarlierWindow()) { + freezingWindowBoundaries.add(window.startMillis()); + } + Collections.reverse(freezingWindowBoundaries); + if (freezingWindowBoundaries.size() < 2) { + return EMPTY_REQUEST; + } + List candidates = Lists.newArrayList(allFiles); + // from old to young + for (int i = 0, n = freezingWindowBoundaries.size() - 1; i < n; i++) { + long startMillis = freezingWindowBoundaries.get(i); + long endMillis = freezingWindowBoundaries.get(i + 1); + int first = 0, total = candidates.size(); + for (; first < total; first++) { + if (shouldIncludeForFreezing(startMillis, endMillis, candidates.get(first))) { + break; + } + } + if (first == total) { + continue; + } + int last = total - 1; + for (; last > first; last--) { + if (shouldIncludeForFreezing(startMillis, endMillis, candidates.get(last))) { + break; + } + } + if (last == first) { + StoreFile file = candidates.get(first); + // If we could drop the whole file due to TTL then we can create a compaction request. + // And also check if the only file fits in the window. Otherwise we still need a compaction + // to move the data that does not belong to this window to other windows. + if (!canDropWholeFile(now, storeConfigInfo.getStoreFileTtl(), file) + && fitsInWindow(startMillis, endMillis, file)) { + continue; + } + } + if (!filesCompacting.isEmpty()) { + // check if we are overlapped with filesCompacting. + int firstCompactingIdx = candidates.indexOf(filesCompacting.get(0)); + int lastCompactingIdx = candidates.indexOf(filesCompacting.get(filesCompacting.size() - 1)); + assert firstCompactingIdx >= 0; + assert lastCompactingIdx >= 0; + if (last >= firstCompactingIdx && first <= lastCompactingIdx) { + continue; + } + } + if (last - first + 1 > comConf.getMaxFilesToCompact()) { + LOG.warn("Too many files(got " + (last - first + 1) + ", expected less than or equal to " + + comConf.getMaxFilesToCompact() + ") to compact when freezing [" + startMillis + ", " + + endMillis + "), give up"); + continue; + } + for (int j = first; j <= last; j++) { + StoreFile file = candidates.get(j); + if (file.excludeFromMinorCompaction()) { + LOG.warn("Find bulk load file " + file.getPath() + + " which is configured to be excluded from minor compaction when archiving [" + + startMillis + ", " + endMillis + ") give up"); + continue; + } + } + List filesToCompact = Lists.newArrayList(candidates.subList(first, last + 1)); + return new DateTieredCompactionRequest(filesToCompact, + getCompactBoundariesForMajor(filesToCompact, now), oldestToCompact); + } + return EMPTY_REQUEST; } /** @@ -206,7 +325,8 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { * by seqId and maxTimestamp in descending order and build the time windows. All the out-of-order * data into the same compaction windows, guaranteeing contiguous compaction based on sequence id. */ - public CompactionRequest selectMinorCompaction(ArrayList candidateSelection, + public CompactionRequest selectMinorCompaction(Collection allFiles, + List filesCompacting, ArrayList candidateSelection, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { long now = EnvironmentEdgeManager.currentTime(); long oldestToCompact = getOldestToCompact(comConf.getDateTieredMaxStoreFileAgeMillis(), now); @@ -229,6 +349,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { Iterators.peekingIterator(storefileMaxTimestampPairs.iterator()); while (it.hasNext()) { if (window.compareToTimestamp(oldestToCompact) < 0) { + // the whole window lies before oldestToCompact break; } int compResult = window.compareToTimestamp(it.peek().getSecond()); @@ -248,21 +369,26 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { if (LOG.isDebugEnabled()) { LOG.debug("Processing files: " + fileList + " for window: " + window); } - DateTieredCompactionRequest request = generateCompactionRequest(fileList, window, - mayUseOffPeak, mayBeStuck, minThreshold); + DateTieredCompactionRequest request = generateCompactionRequestForMinorCompaction( + fileList, window, mayUseOffPeak, mayBeStuck, minThreshold); if (request != null) { return request; } } } } - // A non-null file list is expected by HStore - return new CompactionRequest(Collections. emptyList()); + if (comConf.freezeDateTieredWindowOlderThanMaxAge()) { + return freezeOldWindows(allFiles, filesCompacting, window, oldestToCompact, now); + } else { + // A non-null file list is expected by HStore + return EMPTY_REQUEST; + } + } - private DateTieredCompactionRequest generateCompactionRequest(ArrayList storeFiles, - CompactionWindow window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold) - throws IOException { + private DateTieredCompactionRequest generateCompactionRequestForMinorCompaction( + ArrayList storeFiles, CompactionWindow window, boolean mayUseOffPeak, + boolean mayBeStuck, int minThreshold) throws IOException { // The files has to be in ascending order for ratio-based compaction to work right // and removeExcessFile to exclude youngest files. Collections.reverse(storeFiles); @@ -277,34 +403,40 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { boolean singleOutput = storeFiles.size() != storeFileSelection.size() || comConf.useDateTieredSingleOutputForMinorCompaction(); List boundaries = getCompactionBoundariesForMinor(window, singleOutput); + // minor compaction will not compact window older than max age, so pass Long.MIN_VALUE is OK. DateTieredCompactionRequest result = new DateTieredCompactionRequest(storeFileSelection, - boundaries); + boundaries, Long.MIN_VALUE); return result; } return null; } /** - * Return a list of boundaries for multiple compaction output - * in ascending order. + * Return a list of boundaries for multiple compaction output in ascending order. */ private List getCompactBoundariesForMajor(Collection filesToCompact, long now) { long minTimestamp = Long.MAX_VALUE; + long maxTimestamp = Long.MIN_VALUE; for (StoreFile file : filesToCompact) { - minTimestamp = - Math.min(minTimestamp, - file.getMinimumTimestamp() == null ? Long.MAX_VALUE : file.getMinimumTimestamp()); + minTimestamp = Math.min(minTimestamp, file.getMinimumTimestamp() == null ? Long.MAX_VALUE + : file.getMinimumTimestamp().longValue()); + maxTimestamp = Math.max(maxTimestamp, file.getMaximumTimestamp() == null ? Long.MIN_VALUE + : file.getMaximumTimestamp().longValue()); } - List boundaries = new ArrayList(); + List boundaries = Lists.newArrayList(); + CompactionWindow window = getIncomingWindow(now); + // find the first window that covers the max timestamp. + while (window.compareToTimestamp(maxTimestamp) > 0) { + window = window.nextEarlierWindow(); + } + boundaries.add(window.endMillis()); - // Add startMillis of all windows between now and min timestamp - for (CompactionWindow window = getIncomingWindow(now); - window.compareToTimestamp(minTimestamp) > 0; - window = window.nextEarlierWindow()) { + // Add startMillis of all windows between overall max and min timestamp + for (; window.compareToTimestamp(minTimestamp) > 0; window = window.nextEarlierWindow()) { boundaries.add(window.startMillis()); } - boundaries.add(Long.MIN_VALUE); + boundaries.add(minTimestamp); Collections.reverse(boundaries); return boundaries; } @@ -319,6 +451,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { if (!singleOutput) { boundaries.add(window.startMillis()); } + boundaries.add(Long.MAX_VALUE); return boundaries; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java index b33663f..fbcc30c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java @@ -26,19 +26,29 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_DOESNT_OVERRIDE_EQUALS", justification="It is intended to use the same equal method as superclass") public class DateTieredCompactionRequest extends CompactionRequest { + private List boundaries; - public DateTieredCompactionRequest(Collection files, List boundaryList) { + private final long freezeWindowOlderThan; + + public DateTieredCompactionRequest(Collection files, List boundaries, + long freezeWindowOlderThan) { super(files); - boundaries = boundaryList; + this.boundaries = boundaries; + this.freezeWindowOlderThan = freezeWindowOlderThan; } public List getBoundaries() { return boundaries; } + public long getFreezeWindowOlderThan() { + return freezeWindowOlderThan; + } + @Override public String toString() { - return super.toString() + " boundaries=" + Arrays.toString(boundaries.toArray()); + return super.toString() + " boundaries=" + Arrays.toString(boundaries.toArray()) + + " freezeWindowOlderThan=" + freezeWindowOlderThan; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java index b1203c5..807acba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java @@ -50,11 +50,12 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor compact(final CompactionRequest request, final List lowerBoundaries, - ThroughputController throughputController, User user) throws IOException { + public List compact(final CompactionRequest request, final List boundaries, + final long freezeWindowOlderThan, ThroughputController throughputController, User user) + throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("Executing compaction with " + lowerBoundaries.size() - + "windows, lower boundaries: " + lowerBoundaries); + LOG.debug( + "Executing compaction with " + boundaries.size() + " windows, boundaries: " + boundaries); } return compact(request, defaultScannerFactory, @@ -63,8 +64,8 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor - candidateSelection, boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) - throws IOException { + protected CompactionRequest createCompactionRequest(Collection allFiles, + List filesCompacting, ArrayList candidateSelection, boolean tryingMajor, + boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { if (!tryingMajor) { candidateSelection = filterBulk(candidateSelection); candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java index 77b0af8..3a6fa5a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java @@ -47,33 +47,33 @@ public abstract class SortedCompactionPolicy extends CompactionPolicy { } /** - * @param candidateFiles candidate files, ordered from oldest to newest by seqId. We rely on + * @param allFiles candidate files, ordered from oldest to newest by seqId. We rely on * DefaultStoreFileManager to sort the files by seqId to guarantee contiguous compaction based * on seqId for data consistency. * @return subset copy of candidate list that meets compaction criteria */ - public CompactionRequest selectCompaction(Collection candidateFiles, + public CompactionRequest selectCompaction(Collection allFiles, final List filesCompacting, final boolean isUserCompaction, final boolean mayUseOffPeak, final boolean forceMajor) throws IOException { // Preliminary compaction subject to filters - ArrayList candidateSelection = new ArrayList(candidateFiles); + ArrayList candidateSelection = new ArrayList(allFiles); // Stuck and not compacting enough (estimate). It is not guaranteed that we will be // able to compact more if stuck and compacting, because ratio policy excludes some // non-compacting files from consideration during compaction (see getCurrentEligibleFiles). int futureFiles = filesCompacting.isEmpty() ? 0 : 1; - boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles) + boolean mayBeStuck = (allFiles.size() - filesCompacting.size() + futureFiles) >= storeConfigInfo.getBlockingFileCount(); candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting); - LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " + + LOG.debug("Selecting compaction from " + allFiles.size() + " store files, " + filesCompacting.size() + " compacting, " + candidateSelection.size() + " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking"); // If we can't have all files, we cannot do major anyway - boolean isAllFiles = candidateFiles.size() == candidateSelection.size(); + boolean isAllFiles = allFiles.size() == candidateSelection.size(); if (!(forceMajor && isAllFiles)) { candidateSelection = skipLargeFiles(candidateSelection, mayUseOffPeak); - isAllFiles = candidateFiles.size() == candidateSelection.size(); + isAllFiles = allFiles.size() == candidateSelection.size(); } // Try a major compaction if this is a user-requested major compaction, @@ -84,25 +84,25 @@ public abstract class SortedCompactionPolicy extends CompactionPolicy { // Or, if there are any references among the candidates. boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection); - CompactionRequest result = createCompactionRequest(candidateSelection, - isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck); + CompactionRequest result = createCompactionRequest(allFiles, filesCompacting, + candidateSelection, isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck); ArrayList filesToCompact = Lists.newArrayList(result.getFiles()); removeExcessFiles(filesToCompact, isUserCompaction, isTryingMajor); result.updateFiles(filesToCompact); - isAllFiles = (candidateFiles.size() == filesToCompact.size()); + isAllFiles = (allFiles.size() == filesToCompact.size()); result.setOffPeak(!filesToCompact.isEmpty() && !isAllFiles && mayUseOffPeak); result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles); return result; } - protected abstract CompactionRequest createCompactionRequest(ArrayList - candidateSelection, boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) - throws IOException; + protected abstract CompactionRequest createCompactionRequest(Collection allFiles, + List filesCompacting, ArrayList candidateSelection, boolean tryingMajor, + boolean mayUseOffPeak, boolean mayBeStuck) throws IOException; - /* + /** * @param filesToCompact Files to compact. Can be null. * @return True if we should run a major compaction. */ @@ -152,7 +152,7 @@ public abstract class SortedCompactionPolicy extends CompactionPolicy { return compactionSize > comConf.getThrottlePoint(); } - public abstract boolean needsCompaction(final Collection storeFiles, + public abstract boolean needsCompaction(final Collection allFiles, final List filesCompacting); protected ArrayList getCurrentEligibleFiles(ArrayList candidateFiles, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java index 4dce696..87d9f4f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java @@ -17,19 +17,22 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.List; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; -import static org.junit.Assert.*; public class AbstractTestDateTieredCompactionPolicy extends TestCompactionPolicy { @@ -46,8 +49,8 @@ public class AbstractTestDateTieredCompactionPolicy extends TestCompactionPolicy ArrayList ret = Lists.newArrayList(); for (int i = 0; i < sizes.length; i++) { - MockStoreFile msf = - new MockStoreFile(TEST_UTIL, TEST_FILE, sizes[i], ageInDisk.get(i), false, i); + MockStoreFile msf = new MockStoreFile(TEST_UTIL, TEST_FILE, sizes[i], ageInDisk.get(i), false, + i); msf.setTimeRangeTracker(new TimeRangeTracker(minTimestamps[i], maxTimestamps[i])); ret.add(msf); } @@ -56,22 +59,31 @@ public class AbstractTestDateTieredCompactionPolicy extends TestCompactionPolicy protected void compactEquals(long now, ArrayList candidates, long[] expectedFileSizes, long[] expectedBoundaries, boolean isMajor, boolean toCompact) throws IOException { + compactEquals(now, candidates, Collections. emptyList(), expectedFileSizes, + expectedBoundaries, isMajor, toCompact); + } + + protected void compactEquals(long now, Collection candidateFiles, + List filesCompacting, long[] expectedFileSizes, long[] expectedBoundaries, + boolean isMajor, boolean toCompact) throws IOException { ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); EnvironmentEdgeManager.injectEdge(timeMachine); timeMachine.setValue(now); DateTieredCompactionRequest request; - DateTieredCompactionPolicy policy = - (DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy(); + DateTieredCompactionPolicy policy = (DateTieredCompactionPolicy) store.storeEngine + .getCompactionPolicy(); if (isMajor) { - for (StoreFile file : candidates) { + for (StoreFile file : candidateFiles) { ((MockStoreFile) file).setIsMajor(true); } - assertEquals(toCompact, policy.shouldPerformMajorCompaction(candidates)); - request = (DateTieredCompactionRequest) policy.selectMajorCompaction(candidates); + assertEquals(toCompact, policy.shouldPerformMajorCompaction(candidateFiles)); + request = (DateTieredCompactionRequest) policy + .selectMajorCompaction(Lists.newArrayList(candidateFiles)); } else { - assertEquals(toCompact, policy.needsCompaction(candidates, ImmutableList. of())); - request = - (DateTieredCompactionRequest) policy.selectMinorCompaction(candidates, false, false); + assertEquals(toCompact, + policy.needsCompaction(candidateFiles, ImmutableList. of())); + request = (DateTieredCompactionRequest) policy.selectMinorCompaction(candidateFiles, + filesCompacting, Lists.newArrayList(candidateFiles), false, false); } List actual = Lists.newArrayList(request.getFiles()); assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual))); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java index eb52a84..73723de 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java @@ -64,7 +64,7 @@ public class TestDateTieredCompactionPolicy extends AbstractTestDateTieredCompac long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 }; compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 10, 11, 12, 13 }, - new long[] { Long.MIN_VALUE, 12 }, false, true); + new long[] { Long.MIN_VALUE, 12, Long.MAX_VALUE }, false, true); } /** @@ -78,7 +78,7 @@ public class TestDateTieredCompactionPolicy extends AbstractTestDateTieredCompac long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11 }; compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 20, 21, 22, 23, - 24, 25 }, new long[] { Long.MIN_VALUE, 6}, false, true); + 24, 25 }, new long[] { Long.MIN_VALUE, 6, Long.MAX_VALUE}, false, true); } /** @@ -92,7 +92,7 @@ public class TestDateTieredCompactionPolicy extends AbstractTestDateTieredCompac long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 }; compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 10, 11, 12, 13 }, - new long[] { Long.MIN_VALUE, 12 }, false, true); + new long[] { Long.MIN_VALUE, 12, Long.MAX_VALUE }, false, true); } /** @@ -106,7 +106,7 @@ public class TestDateTieredCompactionPolicy extends AbstractTestDateTieredCompac long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 }; compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 10, 11, 12, 13 }, - new long[] { Long.MIN_VALUE, 12}, false, true); + new long[] { Long.MIN_VALUE, 12, Long.MAX_VALUE}, false, true); } /** @@ -120,7 +120,7 @@ public class TestDateTieredCompactionPolicy extends AbstractTestDateTieredCompac long[] sizes = new long[] { 0, 20, 21, 22, 23, 1 }; compactEquals(194, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 22, 23 }, - new long[] { Long.MIN_VALUE, 96}, false, true); + new long[] { Long.MIN_VALUE, 96, Long.MAX_VALUE}, false, true); } @Test @@ -130,7 +130,7 @@ public class TestDateTieredCompactionPolicy extends AbstractTestDateTieredCompac long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 30, 31, 32, 2, 1 }; compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 30, 31, 32 }, - new long[] { Long.MIN_VALUE, 120 }, false, true); + new long[] { Long.MIN_VALUE, 120, Long.MAX_VALUE }, false, true); } /** @@ -144,7 +144,7 @@ public class TestDateTieredCompactionPolicy extends AbstractTestDateTieredCompac long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 280, 23, 24, 1 }; compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 20, 21, 22 }, - new long[] { Long.MIN_VALUE }, false, true); + new long[] { Long.MIN_VALUE, Long.MAX_VALUE }, false, true); } /** @@ -158,7 +158,7 @@ public class TestDateTieredCompactionPolicy extends AbstractTestDateTieredCompac long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 350, 30, 31, 2, 1 }; compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 30, 31 }, - new long[] { Long.MIN_VALUE }, false, true); + new long[] { Long.MIN_VALUE, Long.MAX_VALUE }, false, true); } /** @@ -172,7 +172,7 @@ public class TestDateTieredCompactionPolicy extends AbstractTestDateTieredCompac long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 280, 23, 24, 1 }; compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 23, 24 }, - new long[] { Long.MIN_VALUE }, false, true); + new long[] { Long.MIN_VALUE, Long.MAX_VALUE }, false, true); } /** @@ -186,7 +186,7 @@ public class TestDateTieredCompactionPolicy extends AbstractTestDateTieredCompac long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 }; compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 40, 41, 42, 33, - 30, 31 }, new long[] { Long.MIN_VALUE, 96 }, false, true); + 30, 31 }, new long[] { Long.MIN_VALUE, 96, Long.MAX_VALUE }, false, true); } /** @@ -200,7 +200,7 @@ public class TestDateTieredCompactionPolicy extends AbstractTestDateTieredCompac long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 28, 23, 24, 1 }; compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 31, 32, 33, 34, - 22, 28, 23, 24, 1 }, new long[] { Long.MIN_VALUE, 12 }, false, true); + 22, 28, 23, 24, 1 }, new long[] { Long.MIN_VALUE, 12, Long.MAX_VALUE }, false, true); } /** @@ -216,7 +216,7 @@ public class TestDateTieredCompactionPolicy extends AbstractTestDateTieredCompac compactEquals(1, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 31, 32, 33, 34, 22, 25, 23, 24, 1 }, - new long[] { Long.MIN_VALUE, -24 }, false, true); + new long[] { Long.MIN_VALUE, -24, Long.MAX_VALUE }, false, true); } /** @@ -231,7 +231,7 @@ public class TestDateTieredCompactionPolicy extends AbstractTestDateTieredCompac compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 }, - new long[] { Long.MIN_VALUE, 24, 48, 72, 96, 120, 144, 150, 156 }, true, true); + new long[] { 0, 24, 48, 72, 96, 120, 144, 150, 156, 162 }, true, true); } /** @@ -247,6 +247,6 @@ public class TestDateTieredCompactionPolicy extends AbstractTestDateTieredCompac compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 }, - new long[] { Long.MIN_VALUE, -144, -120, -96, -72, -48, -24, 0, 6, 12 }, true, true); + new long[] { -155, -144, -120, -96, -72, -48, -24, 0, 6, 12, 18 }, true, true); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyFreeze.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyFreeze.java new file mode 100644 index 0000000..cc138bd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyFreeze.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.google.common.collect.Lists; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionWindowFactory; +import org.apache.hadoop.hbase.regionserver.compactions.ExponentialCompactionWindowFactory; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestDateTieredCompactionPolicyFreeze extends AbstractTestDateTieredCompactionPolicy { + @Override + protected void config() { + super.config(); + + // Set up policy + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, + "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine"); + conf.setLong(CompactionConfiguration.DATE_TIERED_MAX_AGE_MILLIS_KEY, 100); + conf.setLong(CompactionConfiguration.DATE_TIERED_INCOMING_WINDOW_MIN_KEY, 3); + conf.setClass(CompactionConfiguration.DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS_KEY, + ExponentialCompactionWindowFactory.class, CompactionWindowFactory.class); + conf.setLong(ExponentialCompactionWindowFactory.BASE_WINDOW_MILLIS_KEY, 6); + conf.setInt(ExponentialCompactionWindowFactory.WINDOWS_PER_TIER_KEY, 4); + conf.setBoolean(CompactionConfiguration.DATE_TIERED_SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, + false); + conf.setBoolean(CompactionConfiguration.FREEZE_DATE_TIERED_WINDOW_OLDER_THAN_MAX_AGE_KEY, true); + // Special settings for compaction policy per window + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2); + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 12); + this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F); + + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 20); + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 10); + } + + @Test + public void test() throws IOException { + long[] minTimestamps = new long[] { 0, 24, 32, 150 }; + long[] maxTimestamps = new long[] { 12, 30, 56, 160 }; + long[] sizes = new long[] { 10, 20, 10, 20 }; + + compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 20, 10 }, + new long[] { 24, 48, 72 }, false, true); + } + + @Test + public void testOneFileButOverlap() throws IOException { + long[] minTimestamps = new long[] { 0, 24, 150 }; + long[] maxTimestamps = new long[] { 12, 56, 160 }; + long[] sizes = new long[] { 10, 20, 10 }; + + compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 20 }, + new long[] { 24, 48, 72 }, false, true); + } + + @Test + public void testCompacting() throws IOException { + long[] minTimestamps = new long[] { 0, 12, 24, 32, 150 }; + long[] maxTimestamps = new long[] { 12, 23, 30, 56, 160 }; + long[] sizes = new long[] { 10, 20, 30, 40, 10 }; + + List candidateFiles = sfCreate(minTimestamps, maxTimestamps, sizes); + + compactEquals(161, candidateFiles, Lists.newArrayList(candidateFiles.subList(0, 2)), + new long[] { 30, 40 }, new long[] { 24, 48, 72 }, false, true); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyOverflow.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyOverflow.java index 530d284..272ebb7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyOverflow.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyOverflow.java @@ -65,6 +65,6 @@ public class TestDateTieredCompactionPolicyOverflow extends AbstractTestDateTier compactEquals(Long.MAX_VALUE, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 0, 1 }, new long[] { Long.MIN_VALUE, -4611686018427387903L, 0, - 4611686018427387903L, 9223372036854775806L }, true, true); + 4611686018427387903L, 9223372036854775806L, Long.MAX_VALUE }, true, true); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java index 7707116..3a59cca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver.compactions; +import static org.apache.hadoop.hbase.regionserver.DateTieredStoreFileManager.*; import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_END_KEY; import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_START_KEY; import static org.junit.Assert.assertArrayEquals; @@ -183,6 +184,21 @@ public class TestCompactor { } } + public void verifyBoundaries(List boundaries, long freezeWindowOlderThan) { + assertEquals(boundaries.size() - 1, writers.size()); + for (int i = 0; i < writers.size(); ++i) { + if (boundaries.get(i + 1) <= freezeWindowOlderThan) { + assertEquals("i = " + i, boundaries.get(i).longValue(), + Bytes.toLong(writers.get(i).data.get(FREEZING_WINDOW_START_TIMESTAMP))); + assertEquals("i = " + i, boundaries.get(i + 1).longValue(), + Bytes.toLong(writers.get(i).data.get(FREEZING_WINDOW_END_TIMESTAMP))); + } else { + assertNull(writers.get(i).data.get(FREEZING_WINDOW_START_TIMESTAMP)); + assertNull(writers.get(i).data.get(FREEZING_WINDOW_END_TIMESTAMP)); + } + } + } + public List getWriters() { return writers; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java index 38d9f99..06a088e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java @@ -122,14 +122,14 @@ public class TestDateTieredCompactor { }; } - private void verify(KeyValue[] input, List boundaries, KeyValue[][] output, - boolean allFiles) throws Exception { + private void verify(KeyValue[] input, List boundaries, long freezeWindowOlderThan, + KeyValue[][] output, boolean allFiles) throws Exception { StoreFileWritersCapture writers = new StoreFileWritersCapture(); StoreFile sf1 = createDummyStoreFile(1L); StoreFile sf2 = createDummyStoreFile(2L); DateTieredCompactor dtc = createCompactor(writers, input, Arrays.asList(sf1, sf2)); - List paths = dtc.compact(new CompactionRequest(Arrays.asList(sf1)), - boundaries.subList(0, boundaries.size() - 1), NoLimitThroughputController.INSTANCE, null); + List paths = dtc.compact(new CompactionRequest(Arrays.asList(sf1)), boundaries, + freezeWindowOlderThan, NoLimitThroughputController.INSTANCE, null); writers.verifyKvs(output, allFiles, boundaries); if (allFiles) { assertEquals(output.length, paths.size()); @@ -143,11 +143,11 @@ public class TestDateTieredCompactor { @Test public void test() throws Exception { - verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(100L, 200L, 300L, 400L, 500L), + verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(100L, 200L, 300L, 400L, 500L), 300L, a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)), true); verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, 200L, Long.MAX_VALUE), - a(a(KV_A), a(KV_B, KV_C, KV_D)), false); - verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE), + Long.MIN_VALUE, a(a(KV_A), a(KV_B, KV_C, KV_D)), false); + verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE), Long.MIN_VALUE, new KeyValue[][] { a(KV_A, KV_B, KV_C, KV_D) }, false); } @@ -158,7 +158,7 @@ public class TestDateTieredCompactor { DateTieredCompactor dtc = createCompactor(writers, new KeyValue[0], new ArrayList(request.getFiles())); List paths = dtc.compact(request, Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE), - NoLimitThroughputController.INSTANCE, null); + Long.MIN_VALUE, NoLimitThroughputController.INSTANCE, null); assertEquals(1, paths.size()); List dummyWriters = writers.getWriters(); assertEquals(1, dummyWriters.size()); -- 1.9.1