From 7bf0cc50f542e78dc0d7b0e28868937bb52b5bad Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sun, 10 Apr 2016 17:51:44 +0800 Subject: [PATCH] HBASE-15454 Archive store files older than max age --- hbase-server/pom.xml | 5 +- .../hbase/regionserver/DateTieredStoreEngine.java | 15 +- .../compactions/CompactionConfiguration.java | 55 +++- .../compactions/DateTieredCompactionPolicy.java | 285 ++++++++++++++++++--- .../AbstractTestDateTieredCompactionPolicy.java | 74 ++++++ .../TestDateTieredCompactionPolicy.java | 54 +--- .../TestDateTieredCompactionPolicyArchive.java | 113 ++++++++ pom.xml | 6 + 8 files changed, 516 insertions(+), 91 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyArchive.java diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index d5f1e30..a17a9a2 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -535,7 +535,10 @@ io.netty netty-all - + + joda-time + joda-time + org.apache.htrace diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java index 773baab..c8201a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.Collection; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -44,8 +45,9 @@ public class DateTieredStoreEngine extends StoreEngine { @Override public boolean needsCompaction(List filesCompacting) { - return compactionPolicy.needsCompaction(storeFileManager.getStorefiles(), - filesCompacting); + Collection storeFiles = storeFileManager.getStorefiles(); + return compactionPolicy.needsCompaction(storeFiles, filesCompacting) + || compactionPolicy.needsArchive(storeFiles, filesCompacting); } @Override @@ -74,8 +76,13 @@ public class DateTieredStoreEngine extends StoreEngine filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak, boolean forceMajor) throws IOException { - request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(), filesCompacting, - isUserCompaction, mayUseOffPeak, forceMajor); + Collection storeFiles = storeFileManager.getStorefiles(); + request = compactionPolicy.selectCompaction(storeFiles, filesCompacting, isUserCompaction, + mayUseOffPeak, forceMajor); + if (request != null) { + return true; + } + request = compactionPolicy.selectArchive(storeFiles, filesCompacting); return request != null; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java index 97cc404..5139327 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -19,12 +19,19 @@ package org.apache.hadoop.hbase.regionserver.compactions; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; +import org.joda.time.DateTimeFieldType; +import org.joda.time.DateTimeZone; +import org.joda.time.DurationFieldType; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableBiMap; /** *

@@ -86,6 +93,14 @@ public class CompactionConfiguration { private static final Class DEFAULT_TIER_COMPACTION_POLICY_CLASS = ExploringCompactionPolicy.class; + public static final String ARCHIVE_UNIT = "hbase.hstore.compaction.date.tiered.archive.unit"; + public static final String ARCHIVE_UNIT_TIME_ZONE = "hbase.hstore.compaction.date.tiered.archive.unit.time.zone"; + + private static final ImmutableBiMap UNIT_TO_FIELD = ImmutableBiMap + .of("Y", new ArchiveUnit(DurationFieldType.years(), DateTimeFieldType.dayOfYear()), "M", + new ArchiveUnit(DurationFieldType.months(), DateTimeFieldType.dayOfMonth()), "D", + new ArchiveUnit(DurationFieldType.days(), DateTimeFieldType.hourOfDay())); + Configuration conf; StoreConfigInformation storeConfigInfo; @@ -109,6 +124,19 @@ public class CompactionConfiguration { private final String compactionPolicyForTieredWindow; private final boolean singleOutputForMinorCompaction; + public static class ArchiveUnit { + public final DurationFieldType durationType; + public final DateTimeFieldType fieldToSetMinimum; + + public ArchiveUnit(DurationFieldType durationType, DateTimeFieldType fieldToSetMinimum) { + this.durationType = durationType; + this.fieldToSetMinimum = fieldToSetMinimum; + } + } + + private final ArchiveUnit archiveUnit; + private final DateTimeZone archiveUnitTimeZone; + CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) { this.conf = conf; this.storeConfigInfo = storeConfigInfo; @@ -139,7 +167,17 @@ public class CompactionConfiguration { DEFAULT_TIER_COMPACTION_POLICY_CLASS.getName()); singleOutputForMinorCompaction = conf.getBoolean(SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, true); - + String archiveUnitId = conf.get(ARCHIVE_UNIT); + if (!StringUtils.isBlank(archiveUnitId)) { + archiveUnit = Preconditions.checkNotNull(UNIT_TO_FIELD.get(archiveUnitId.toUpperCase()), + "Unrecognized archive unit: %s", archiveUnitId); + String timeZoneId = Preconditions.checkNotNull(conf.get(ARCHIVE_UNIT_TIME_ZONE), + "must specify time zone when using archive"); + archiveUnitTimeZone = DateTimeZone.forID(timeZoneId); + } else { + archiveUnit = null; + archiveUnitTimeZone = null; + } LOG.info(this); } @@ -149,7 +187,7 @@ public class CompactionConfiguration { "size [%d, %d, %d); files [%d, %d); ratio %f; off-peak ratio %f; throttle point %d;" + " major period %d, major jitter %f, min locality to compact %f;" + " tiered compaction: max_age %d, base window in milliseconds %d, windows per tier %d," - + "incoming window min %d", + + "incoming window min %d, archive unit %s", minCompactSize, maxCompactSize, offPeakMaxCompactSize, @@ -164,7 +202,10 @@ public class CompactionConfiguration { maxStoreFileAgeMillis, baseWindowMillis, windowsPerTier, - incomingWindowMin); + incomingWindowMin, + archiveUnit == null ? "none" : UNIT_TO_FIELD.inverse().get(archiveUnit) + " " + + archiveUnitTimeZone.getID() + ); } /** @@ -284,4 +325,12 @@ public class CompactionConfiguration { public boolean useSingleOutputForMinorCompaction() { return singleOutputForMinorCompaction; } + + public ArchiveUnit getArchiveUnit() { + return archiveUnit; + } + + public DateTimeZone getArchiveUnitTimeZone() { + return archiveUnitTimeZone; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java index d61af42..eefacff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java @@ -18,14 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver.compactions; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import com.google.common.collect.PeekingIterator; -import com.google.common.math.LongMath; - import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -46,6 +38,15 @@ import org.apache.hadoop.hbase.regionserver.StoreUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.joda.time.DateTime; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.PeekingIterator; +import com.google.common.math.LongMath; /** * HBASE-15181 This is a simple implementation of date-based tiered compaction similar to @@ -217,8 +218,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { int compResult = window.compareToTimestamp(it.peek().getSecond()); if (compResult > 0) { // If the file is too old for the window, switch to the next window - window = window.nextWindow(comConf.getWindowsPerTier(), - oldestToCompact); + window = window.nextWindow(oldestToCompact); minThreshold = comConf.getMinFilesToCompact(); } else { // The file is within the target window @@ -266,6 +266,62 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { return null; } + public DateTieredCompactionRequest selectArchive(final Collection storeFiles, + final List filesCompacting) { + if (comConf.getArchiveUnit() == null) { + return null; + } + long now = EnvironmentEdgeManager.currentTime(); + long newestToArchive = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now); + ArrayList candidates = new ArrayList(storeFiles); + Collections.sort(candidates, StoreFile.Comparators.SEQ_ID_MAX_TIMESTAMP); + for (ArchiveWindow window = new ArchiveWindow( + candidates.get(0).getMinimumTimestamp().longValue()); window + .endMillis() <= newestToArchive; window = new ArchiveWindow(window.endMillis())) { + int first = 0, n = candidates.size(); + for (; first < n; first++) { + StoreFile file = candidates.get(first); + if (window.compareToTimestamp(file.getMinimumTimestamp().longValue()) == 0 + || window.compareToTimestamp(file.getMaximumTimestamp().longValue()) == 0) { + break; + } + } + if (first == n) { + continue; + } + int last = n - 1; + for (; last > first; last--) { + StoreFile file = candidates.get(last); + if (window.compareToTimestamp(file.getMinimumTimestamp().longValue()) == 0 + || window.compareToTimestamp(file.getMaximumTimestamp().longValue()) == 0) { + break; + } + } + if (last == first) { + continue; + } + if (!filesCompacting.isEmpty() + && candidates.get(last).getMaxSequenceId() >= filesCompacting.get(0).getMaxSequenceId()) { + continue; + } + if (last - first + 1 > comConf.getMaxFilesToCompact()) { + LOG.warn("too many files(got " + (last - first + 1) + ", expected less than or equal to " + + comConf.getMaxFilesToCompact() + ") to compact when archiving " + window + + ", give up"); + continue; + } + List filesToCompact = Lists.newArrayList(candidates.subList(first, last + 1)); + return new DateTieredCompactionRequest(filesToCompact, + getCompactBoundariesForArchive(filesToCompact, newestToArchive, now)); + } + return null; + } + + public boolean needsArchive(final Collection storeFiles, + final List filesCompacting) { + return selectArchive(storeFiles, filesCompacting) != null; + } + /** * Return a list of boundaries for multiple compaction output * in ascending order. @@ -273,20 +329,26 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { private List getCompactBoundariesForMajor(Collection filesToCompact, long oldestToCompact, long now) { long minTimestamp = Long.MAX_VALUE; + long maxTimestamp = Long.MIN_VALUE; for (StoreFile file : filesToCompact) { - minTimestamp = Math.min(minTimestamp, - file.getMinimumTimestamp() == null? Long.MAX_VALUE : file.getMinimumTimestamp()); + minTimestamp = Math.min(minTimestamp, file.getMinimumTimestamp().longValue()); + maxTimestamp = Math.max(maxTimestamp, file.getMaximumTimestamp().longValue()); } - List boundaries = new ArrayList(); + List boundaries = Lists.newArrayList(); + Window window = getIncomingWindow(now, comConf.getBaseWindowMillis()); + // find the first window that covers the max timestamp. + while (window.compareToTimestamp(maxTimestamp) > 0) { + window = window.nextWindow(oldestToCompact); + } + boundaries.add(window.startMillis()); - // Add startMillis of all windows between now and min timestamp - for (Window window = getIncomingWindow(now, comConf.getBaseWindowMillis()); - window.compareToTimestamp(minTimestamp) > 0; - window = window.nextWindow(comConf.getWindowsPerTier(), oldestToCompact)) { + // Add startMillis of all windows between overall max and min timestamp + for (window = window.nextWindow(oldestToCompact); window + .compareToTimestamp(minTimestamp) > 0; window = window.nextWindow(oldestToCompact)) { boundaries.add(window.startMillis()); } - boundaries.add(Long.MIN_VALUE); + boundaries.add(minTimestamp); Collections.reverse(boundaries); return boundaries; } @@ -304,6 +366,25 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { return boundaries; } + private List getCompactBoundariesForArchive(Collection filesToCompact, + long oldestToCompact, long now) { + long minTimestamp = Long.MAX_VALUE; + long maxTimestamp = Long.MIN_VALUE; + for (StoreFile file : filesToCompact) { + minTimestamp = Math.min(minTimestamp, file.getMinimumTimestamp().longValue()); + maxTimestamp = Math.max(maxTimestamp, file.getMaximumTimestamp().longValue()); + } + List boundaries = Lists.newArrayList(); + + for (ArchiveWindow window = new ArchiveWindow(minTimestamp);; window = new ArchiveWindow( + window.endMillis())) { + boundaries.add(window.startMillis()); + if (window.endMillis() >= maxTimestamp) { + break; + } + } + return boundaries; + } /** * Removes all store files with max timestamp older than (current - maxAge). * @param storeFiles all store files to consider @@ -325,25 +406,65 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { }); } - private static Window getIncomingWindow(long now, long baseWindowMillis) { - return new Window(baseWindowMillis, now / baseWindowMillis); + private Window getIncomingWindow(long now, long baseWindowMillis) { + return new TieredWindow(baseWindowMillis, now / baseWindowMillis); + } + + private long getNearestArchiveWindowBoundary(long millis) { + return new DateTime(millis, comConf.getArchiveUnitTimeZone()) + .property(comConf.getArchiveUnit().fieldToSetMinimum).withMinimumValue().withTimeAtStartOfDay() + .getMillis(); } - private static long getOldestToCompact(long maxAgeMillis, long now) { + private long getOldestToCompact(long maxAgeMillis, long now) { try { - return LongMath.checkedSubtract(now, maxAgeMillis); + long oldestToCompact = LongMath.checkedSubtract(now, maxAgeMillis); + return comConf.getArchiveUnit() == null ? oldestToCompact + : getNearestArchiveWindowBoundary(oldestToCompact); } catch (ArithmeticException ae) { LOG.warn("Value for " + CompactionConfiguration.MAX_AGE_MILLIS_KEY + ": " + maxAgeMillis - + ". All the files will be eligible for minor compaction."); + + ". All the files will be eligible for minor compaction."); return Long.MIN_VALUE; } } + private abstract class Window { + + /** + * Compares the window to a timestamp. + * @param timestamp the timestamp to compare. + * @return a negative integer, zero, or a positive integer as the window lies before, covering, + * or after than the timestamp. + */ + public abstract int compareToTimestamp(long timestamp); + + /** + * Move to the new window of the same tier or of the next tier, which represents an earlier time + * span. + * @return The next window + */ + public abstract Window nextWindow(long oldestToCompact); + + /** + * Inclusive lower bound + */ + public abstract long startMillis(); + + /** + * Exclusive upper bound + */ + public abstract long endMillis(); + + @Override + public String toString() { + return "[" + startMillis() + ", " + endMillis() + ")"; + } + } /** * This is the class we use to partition from epoch time to now into tiers of exponential sizes of * windows. */ - private static final class Window { + private final class TieredWindow extends Window { /** * How big a range of timestamps fit inside the window in milliseconds. */ @@ -354,7 +475,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { */ private final long divPosition; - private Window(long baseWindowMillis, long divPosition) { + private TieredWindow(long baseWindowMillis, long divPosition) { windowMillis = baseWindowMillis; this.divPosition = divPosition; } @@ -377,21 +498,41 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { return divPosition == pos ? 0 : divPosition < pos ? -1 : 1; } + private Window getWindowBeforeMaxAge(long oldestToCompact) { + if (comConf.getArchiveUnit() == null) { + // Don't promote to the next tier if the next window crosses the max age. + return new TieredWindow(windowMillis, divPosition - 1); + } else { + if (oldestToCompact == startMillis()) { + // we are just on the last archive boundary + return new ArchiveWindow(oldestToCompact - 1); + } else { + return new LastTieredWindow(oldestToCompact, startMillis()); + } + } + } + /** * Move to the new window of the same tier or of the next tier, which represents an earlier time * span. - * @param windowsPerTier The number of contiguous windows that will have the same size. Windows - * following those will be tierBase times as big. * @return The next window */ - public Window nextWindow(int windowsPerTier, long oldestToCompact) { + public Window nextWindow(long oldestToCompact) { + int windowsPerTier = comConf.getWindowsPerTier(); // Don't promote to the next tier if there is not even 1 window at current tier - // or if the next window crosses the max age. - if (divPosition % windowsPerTier > 0 || - startMillis() - windowMillis * windowsPerTier < oldestToCompact) { - return new Window(windowMillis, divPosition - 1); + if (divPosition % windowsPerTier > 0) { + if (startMillis() - windowMillis < oldestToCompact) { + return getWindowBeforeMaxAge(oldestToCompact); + } else { + return new TieredWindow(windowMillis, divPosition - 1); + } } else { - return new Window(windowMillis * windowsPerTier, divPosition / windowsPerTier - 1); + long nextWindowMillis = this.windowMillis * windowsPerTier; + if (startMillis() - nextWindowMillis < oldestToCompact) { + return getWindowBeforeMaxAge(oldestToCompact); + } else { + return new TieredWindow(nextWindowMillis, divPosition / windowsPerTier - 1); + } } } @@ -416,10 +557,84 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { return Long.MAX_VALUE; } } + } + + private class LastTieredWindow extends Window { + + private final long startMillis; + + private final long endMillis; + + public LastTieredWindow(long startMillis, long endMillis) { + this.startMillis = startMillis; + this.endMillis = endMillis; + } @Override - public String toString() { - return "[" + startMillis() + ", " + endMillis() + ")"; + public int compareToTimestamp(long timestamp) { + if (timestamp < startMillis) { + return 1; + } + if (timestamp >= endMillis) { + return -1; + } + return 0; + } + + @Override + public Window nextWindow(long oldestToCompact) { + assert comConf.getArchiveUnit() != null; + assert startMillis == oldestToCompact; + return new ArchiveWindow(oldestToCompact - 1); + } + + @Override + public long startMillis() { + return startMillis; + } + + @Override + public long endMillis() { + return endMillis; + } + } + + private class ArchiveWindow extends Window { + + private final long startMillis; + + private final long endMillis; + + public ArchiveWindow(long millisInWindow) { + this.startMillis = getNearestArchiveWindowBoundary(millisInWindow); + this.endMillis = new DateTime(startMillis, comConf.getArchiveUnitTimeZone()) + .withFieldAdded(comConf.getArchiveUnit().durationType, 1).getMillis(); + } + + @Override + public int compareToTimestamp(long timestamp) { + if (timestamp >= endMillis) { + return -1; + } + if (timestamp < startMillis) { + return 1; + } + return 0; + } + + @Override + public ArchiveWindow nextWindow(long oldestToCompact) { + return new ArchiveWindow(startMillis - 1); + } + + @Override + public long startMillis() { + return startMillis; + } + + @Override + public long endMillis() { + return endMillis; } } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java new file mode 100644 index 0000000..db2435e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; + +import com.google.common.collect.Lists; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; + +public abstract class AbstractTestDateTieredCompactionPolicy extends TestCompactionPolicy { + + protected ArrayList sfCreate(long[] minTimestamps, long[] maxTimestamps, long[] sizes) + throws IOException { + ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(timeMachine); + // Has to be > 0 and < now. + timeMachine.setValue(1); + ArrayList ageInDisk = new ArrayList(); + for (int i = 0; i < sizes.length; i++) { + ageInDisk.add(0L); + } + + ArrayList ret = Lists.newArrayList(); + for (int i = 0; i < sizes.length; i++) { + MockStoreFile msf = + new MockStoreFile(TEST_UTIL, TEST_FILE, sizes[i], ageInDisk.get(i), false, i); + msf.setTimeRangeTracker(new TimeRangeTracker(minTimestamps[i], maxTimestamps[i])); + ret.add(msf); + } + return ret; + } + + @Override + protected void config() { + super.config(); + + // Set up policy + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, + "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine"); + conf.setLong(CompactionConfiguration.MAX_AGE_MILLIS_KEY, 100); + conf.setLong(CompactionConfiguration.INCOMING_WINDOW_MIN_KEY, 3); + conf.setLong(CompactionConfiguration.BASE_WINDOW_MILLIS_KEY, 6); + conf.setInt(CompactionConfiguration.WINDOWS_PER_TIER_KEY, 4); + conf.setBoolean(CompactionConfiguration.SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, false); + + // Special settings for compaction policy per window + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2); + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 12); + this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F); + + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 20); + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 10); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java index ecccbdd..9b733bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java @@ -17,18 +17,18 @@ */ package org.apache.hadoop.hbase.regionserver; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.hadoop.hbase.HConstants; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; @@ -36,50 +36,8 @@ import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category(SmallTests.class) -public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { - ArrayList sfCreate(long[] minTimestamps, long[] maxTimestamps, long[] sizes) - throws IOException { - ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); - EnvironmentEdgeManager.injectEdge(timeMachine); - // Has to be > 0 and < now. - timeMachine.setValue(1); - ArrayList ageInDisk = new ArrayList(); - for (int i = 0; i < sizes.length; i++) { - ageInDisk.add(0L); - } - - ArrayList ret = Lists.newArrayList(); - for (int i = 0; i < sizes.length; i++) { - MockStoreFile msf = - new MockStoreFile(TEST_UTIL, TEST_FILE, sizes[i], ageInDisk.get(i), false, i); - msf.setTimeRangeTracker(new TimeRangeTracker(minTimestamps[i], maxTimestamps[i])); - ret.add(msf); - } - return ret; - } - - @Override - protected void config() { - super.config(); - - // Set up policy - conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, - "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine"); - conf.setLong(CompactionConfiguration.MAX_AGE_MILLIS_KEY, 100); - conf.setLong(CompactionConfiguration.INCOMING_WINDOW_MIN_KEY, 3); - conf.setLong(CompactionConfiguration.BASE_WINDOW_MILLIS_KEY, 6); - conf.setInt(CompactionConfiguration.WINDOWS_PER_TIER_KEY, 4); - conf.setBoolean(CompactionConfiguration.SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, false); - - // Special settings for compaction policy per window - this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2); - this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 12); - this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F); - - conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 20); - conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 10); - } +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestDateTieredCompactionPolicy extends AbstractTestDateTieredCompactionPolicy { void compactEquals(long now, ArrayList candidates, long[] expectedFileSizes, long[] expectedBoundaries, boolean isMajor, boolean toCompact) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyArchive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyArchive.java new file mode 100644 index 0000000..4fe35a4 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyArchive.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestDateTieredCompactionPolicyArchive extends AbstractTestDateTieredCompactionPolicy { + private static long BASE_WINDOW_MILLIS = 7L * 24 * 60 * 60 * 1000; + private static long MAX_AGE_MILLIS = 180L * 24 * 60 * 60 * 1000; + + private static DateTimeZone ZONE = DateTimeZone.forID("+08:00"); + + @Override + protected void config() { + super.config(); + conf.setLong(CompactionConfiguration.BASE_WINDOW_MILLIS_KEY, BASE_WINDOW_MILLIS); + conf.setLong(CompactionConfiguration.MAX_AGE_MILLIS_KEY, MAX_AGE_MILLIS); + conf.set(CompactionConfiguration.ARCHIVE_UNIT, "Y"); + conf.set(CompactionConfiguration.ARCHIVE_UNIT_TIME_ZONE, ZONE.getID()); + } + + void majorCompactEquals(long now, ArrayList candidates, long[] expectedFileSizes, + long[] expectedBoundaries) throws IOException { + ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(timeMachine); + timeMachine.setValue(now); + DateTieredCompactionRequest request = + (DateTieredCompactionRequest) ((DateTieredCompactionPolicy) store.storeEngine + .getCompactionPolicy()).selectCompaction(candidates, ImmutableList. of(), + true, false, true); + List actual = Lists.newArrayList(request.getFiles()); + Assert.assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual))); + Assert.assertEquals(Arrays.toString(expectedBoundaries), + Arrays.toString(request.getBoundaries().toArray())); + } + + void archiveEquals(long now, ArrayList candidates, long[] expectedFileSizes, + long[] expectedBoundaries) { + ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(timeMachine); + timeMachine.setValue(now); + DateTieredCompactionRequest request = + (DateTieredCompactionRequest) ((DateTieredCompactionPolicy) store.storeEngine + .getCompactionPolicy()).selectArchive(candidates, ImmutableList. of()); + List actual = Lists.newArrayList(request.getFiles()); + Assert.assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual))); + Assert.assertEquals(Arrays.toString(expectedBoundaries), + Arrays.toString(request.getBoundaries().toArray())); + } + + @Test + public void test() throws IOException { + long now = System.currentTimeMillis(); + long firstArchiveWindowEndMillis = + new DateTime(now - MAX_AGE_MILLIS, ZONE).dayOfYear().withMinimumValue() + .withTimeAtStartOfDay().getMillis(); + long firstArchiveWindowStartMillis = + new DateTime(firstArchiveWindowEndMillis - 1, ZONE).dayOfYear().withMinimumValue() + .withTimeAtStartOfDay().getMillis(); + long SecondArchiveWindowStartMillis = + new DateTime(firstArchiveWindowStartMillis - 1, ZONE).dayOfYear().withMinimumValue() + .withTimeAtStartOfDay().getMillis(); + + long[] minTimestamps = + new long[] { SecondArchiveWindowStartMillis + 1000, firstArchiveWindowStartMillis + 1000, + firstArchiveWindowEndMillis - 1000, firstArchiveWindowEndMillis - 100 }; + long[] maxTimestamps = + new long[] { firstArchiveWindowStartMillis - 1000, firstArchiveWindowStartMillis + 2000, + firstArchiveWindowEndMillis - 100, firstArchiveWindowEndMillis + 1000 }; + long[] sizes = new long[] { 100, 100, 10000, 200 }; + archiveEquals(now, sfCreate(minTimestamps, maxTimestamps, sizes), + new long[] { 100, 10000, 200 }, new long[] { firstArchiveWindowStartMillis, + firstArchiveWindowEndMillis }); + majorCompactEquals(now, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 100, 100, + 10000, 200 }, new long[] { SecondArchiveWindowStartMillis + 1000, + firstArchiveWindowStartMillis, firstArchiveWindowEndMillis }); + } +} diff --git a/pom.xml b/pom.xml index 0324c1c..aaed6eb 100644 --- a/pom.xml +++ b/pom.xml @@ -1233,6 +1233,7 @@ 1.0.8 2.11.6 1.46 + 2.9.2 2.4 1.8 @@ -1563,6 +1564,11 @@ ${netty.version} + joda-time + joda-time + ${joda-time.version} + + org.apache.thrift libthrift ${thrift.version} -- 1.9.1