From 2c6dac39163d1477b6011e8665c93a5ccea646eb Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sun, 17 Apr 2016 21:58:14 +0800 Subject: [PATCH] HBASE-15368 Add pluggable window support --- .../compactions/CompactionConfiguration.java | 53 +++--- .../regionserver/compactions/CompactionWindow.java | 60 ++++++ .../compactions/CompactionWindowFactory.java | 29 +++ .../compactions/DateTieredCompactionPolicy.java | 201 ++++++--------------- .../ExponentialCompactionWindowFactory.java | 137 ++++++++++++++ .../AbstractTestDateTieredCompactionPolicy.java | 81 +++++++++ .../TestDateTieredCompactionPolicy.java | 87 +-------- .../TestDateTieredCompactionPolicyOverflow.java | 68 +++++++ 8 files changed, 466 insertions(+), 250 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindow.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindowFactory.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExponentialCompactionWindowFactory.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyOverflow.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java index 97cc404..5d85a63 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -72,10 +72,6 @@ public class CompactionConfiguration { */ public static final String MAX_AGE_MILLIS_KEY = "hbase.hstore.compaction.date.tiered.max.storefile.age.millis"; - public static final String BASE_WINDOW_MILLIS_KEY = - "hbase.hstore.compaction.date.tiered.base.window.millis"; - public static final String WINDOWS_PER_TIER_KEY = - "hbase.hstore.compaction.date.tiered.windows.per.tier"; public static final String INCOMING_WINDOW_MIN_KEY = "hbase.hstore.compaction.date.tiered.incoming.window.min"; public static final String COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY = @@ -86,6 +82,15 @@ public class CompactionConfiguration { private static final Class DEFAULT_TIER_COMPACTION_POLICY_CLASS = ExploringCompactionPolicy.class; + public static final String COMPACTION_WINDOW_FACTORY_CLASS = + "hbase.hstore.compaction.date.tiered.window.factory.class"; + + private static final Class DEFAULT_COMPACTION_WINDOW_FACTORY_CLASS = + ExponentialCompactionWindowFactory.class; + + public static final String ARCHIVE_FILES_OLDER_THAN_MAX_AGE = + "hbase.hstore.compaction.date.tiered.archive.files.older.than.max.age"; + Configuration conf; StoreConfigInformation storeConfigInfo; @@ -103,11 +108,11 @@ public class CompactionConfiguration { private final float majorCompactionJitter; private final float minLocalityToForceCompact; private final long maxStoreFileAgeMillis; - private final long baseWindowMillis; - private final int windowsPerTier; private final int incomingWindowMin; private final String compactionPolicyForTieredWindow; private final boolean singleOutputForMinorCompaction; + private final String compactionWindowFactory; + private final boolean archiveFilesOlderThanMaxAge; CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) { this.conf = conf; @@ -132,14 +137,14 @@ public class CompactionConfiguration { minLocalityToForceCompact = conf.getFloat(HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT, 0f); maxStoreFileAgeMillis = conf.getLong(MAX_AGE_MILLIS_KEY, Long.MAX_VALUE); - baseWindowMillis = conf.getLong(BASE_WINDOW_MILLIS_KEY, 3600000 * 6); - windowsPerTier = conf.getInt(WINDOWS_PER_TIER_KEY, 4); incomingWindowMin = conf.getInt(INCOMING_WINDOW_MIN_KEY, 6); compactionPolicyForTieredWindow = conf.get(COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY, DEFAULT_TIER_COMPACTION_POLICY_CLASS.getName()); singleOutputForMinorCompaction = conf.getBoolean(SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, true); - + this.compactionWindowFactory = conf.get(COMPACTION_WINDOW_FACTORY_CLASS, + DEFAULT_COMPACTION_WINDOW_FACTORY_CLASS.getName()); + this.archiveFilesOlderThanMaxAge = conf.getBoolean(ARCHIVE_FILES_OLDER_THAN_MAX_AGE, false); LOG.info(this); } @@ -148,8 +153,9 @@ public class CompactionConfiguration { return String.format( "size [%d, %d, %d); files [%d, %d); ratio %f; off-peak ratio %f; throttle point %d;" + " major period %d, major jitter %f, min locality to compact %f;" - + " tiered compaction: max_age %d, base window in milliseconds %d, windows per tier %d," - + "incoming window min %d", + + " tiered compaction: max_age %d, incoming window min %d," + + " compaction policy for tiered window %s, single output for minor %b," + + " compaction window factory %s, archive files older than max age %b", minCompactSize, maxCompactSize, offPeakMaxCompactSize, @@ -162,9 +168,12 @@ public class CompactionConfiguration { majorCompactionJitter, minLocalityToForceCompact, maxStoreFileAgeMillis, - baseWindowMillis, - windowsPerTier, - incomingWindowMin); + incomingWindowMin, + compactionPolicyForTieredWindow, + singleOutputForMinorCompaction, + compactionWindowFactory, + archiveFilesOlderThanMaxAge + ); } /** @@ -265,14 +274,6 @@ public class CompactionConfiguration { return maxStoreFileAgeMillis; } - public long getBaseWindowMillis() { - return baseWindowMillis; - } - - public int getWindowsPerTier() { - return windowsPerTier; - } - public int getIncomingWindowMin() { return incomingWindowMin; } @@ -284,4 +285,12 @@ public class CompactionConfiguration { public boolean useSingleOutputForMinorCompaction() { return singleOutputForMinorCompaction; } + + public String getCompactionWindowFactory() { + return compactionWindowFactory; + } + + public boolean isArchiveFilesOlderThanMaxAge() { + return archiveFilesOlderThanMaxAge; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindow.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindow.java new file mode 100644 index 0000000..1721f3d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindow.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Base class for compaction window implementation. + *

+ * It is better make sure that the window before {@code oldestToCompact} is fixed and never promoted + * since we will archive the files in each window into a single file. + */ +@InterfaceAudience.Private +public abstract class CompactionWindow { + + /** + * Compares the window to a timestamp. + * @param timestamp the timestamp to compare. + * @return a negative integer, zero, or a positive integer as the window lies before, covering, or + * after than the timestamp. + */ + public abstract int compareToTimestamp(long timestamp); + + /** + * Move to the new window of the same tier or of the next tier, which represents an earlier time + * span. + * @return The next window + */ + public abstract CompactionWindow nextWindow(long now); + + /** + * Inclusive lower bound + */ + public abstract long startMillis(); + + /** + * Exclusive upper bound + */ + public abstract long endMillis(); + + @Override + public String toString() { + return "[" + startMillis() + ", " + endMillis() + ")"; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindowFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindowFactory.java new file mode 100644 index 0000000..01b708e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindowFactory.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * For creating compaction window. + */ +@InterfaceAudience.Private +public abstract class CompactionWindowFactory { + + public abstract CompactionWindow newIncomingWindow(long now); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java index d61af42..0b971b3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java @@ -18,20 +18,19 @@ */ package org.apache.hadoop.hbase.regionserver.compactions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.PeekingIterator; import com.google.common.math.LongMath; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -50,21 +49,28 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; /** * HBASE-15181 This is a simple implementation of date-based tiered compaction similar to * Cassandra's for the following benefits: - * 1. Improve date-range-based scan by structuring store files in date-based tiered layout. - * 2. Reduce compaction overhead. - * 3. Improve TTL efficiency. + *

    + *
  1. Improve date-range-based scan by structuring store files in date-based tiered layout.
  2. + *
  3. Reduce compaction overhead.
  4. + *
  5. Improve TTL efficiency.
  6. + *
* Perfect fit for the use cases that: - * 1. has mostly date-based data write and scan and a focus on the most recent data. - * Out-of-order writes are handled gracefully. Time range overlapping among store files is - * tolerated and the performance impact is minimized. Configuration can be set at hbase-site - * or overridden at per-table or per-column-family level by hbase shell. Design spec is at + *
    + *
  1. has mostly date-based data write and scan and a focus on the most recent data.
  2. + *
+ * Out-of-order writes are handled gracefully. Time range overlapping among store files is tolerated + * and the performance impact is minimized. Configuration can be set at hbase-site or overridden at + * per-table or per-column-family level by hbase shell. Design spec is at * https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/ */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) public class DateTieredCompactionPolicy extends SortedCompactionPolicy { + private static final Log LOG = LogFactory.getLog(DateTieredCompactionPolicy.class); - private RatioBasedCompactionPolicy compactionPolicyPerWindow; + private final RatioBasedCompactionPolicy compactionPolicyPerWindow; + + private final CompactionWindowFactory windowFactory; public DateTieredCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) throws IOException { @@ -78,6 +84,14 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { throw new IOException("Unable to load configured compaction policy '" + comConf.getCompactionPolicyForTieredWindow() + "'", e); } + try { + windowFactory = + ReflectionUtils.instantiateWithCustomCtor(comConf.getCompactionWindowFactory(), + new Class[] { CompactionConfiguration.class }, new Object[] { comConf }); + } catch (Exception e) { + throw new IOException("Unable to load configured compaction policy '" + + comConf.getCompactionPolicyForTieredWindow() + "'", e); + } } /** @@ -112,8 +126,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { long cfTTL = this.storeConfigInfo.getStoreFileTtl(); HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); - long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now); - List boundaries = getCompactBoundariesForMajor(filesToCompact, oldestToCompact, now); + List boundaries = getCompactBoundariesForMajor(filesToCompact, now); boolean[] filesInWindow = new boolean[boundaries.size()]; for (StoreFile file: filesToCompact) { @@ -174,9 +187,8 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { public CompactionRequest selectMajorCompaction(ArrayList candidateSelection) { long now = EnvironmentEdgeManager.currentTime(); - long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now); return new DateTieredCompactionRequest(candidateSelection, - this.getCompactBoundariesForMajor(candidateSelection, oldestToCompact, now)); + this.getCompactBoundariesForMajor(candidateSelection, now)); } /** @@ -192,15 +204,13 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { long now = EnvironmentEdgeManager.currentTime(); long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now); - // Make sure the store files is sorted by SeqId then maxTimestamp - List storeFileList = Lists.newArrayList(filterOldStoreFiles(candidateSelection, - oldestToCompact)); - Collections.sort(storeFileList, StoreFile.Comparators.SEQ_ID_MAX_TIMESTAMP); + // Make sure the store files is sorted by SeqId then maxTimestamp; + Collections.sort(candidateSelection, StoreFile.Comparators.SEQ_ID_MAX_TIMESTAMP); List> storefileMaxTimestampPairs = - Lists.newArrayListWithCapacity(Iterables.size(storeFileList)); + Lists.newArrayListWithCapacity(Iterables.size(candidateSelection)); long maxTimestampSeen = Long.MIN_VALUE; - for (StoreFile storeFile : storeFileList) { + for (StoreFile storeFile : candidateSelection) { // if there is out-of-order data, // we put them in the same window as the last file in increasing order maxTimestampSeen = Math.max(maxTimestampSeen, @@ -209,16 +219,18 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { } Collections.reverse(storefileMaxTimestampPairs); - Window window = getIncomingWindow(now, comConf.getBaseWindowMillis()); + CompactionWindow window = getIncomingWindow(now); int minThreshold = comConf.getIncomingWindowMin(); PeekingIterator> it = Iterators.peekingIterator(storefileMaxTimestampPairs.iterator()); while (it.hasNext()) { + if (window.compareToTimestamp(oldestToCompact) < 0) { + break; + } int compResult = window.compareToTimestamp(it.peek().getSecond()); if (compResult > 0) { // If the file is too old for the window, switch to the next window - window = window.nextWindow(comConf.getWindowsPerTier(), - oldestToCompact); + window = window.nextWindow(now); minThreshold = comConf.getMinFilesToCompact(); } else { // The file is within the target window @@ -243,8 +255,8 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { } private DateTieredCompactionRequest generateCompactionRequest(ArrayList storeFiles, - Window window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold) - throws IOException { + CompactionWindow window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold) + throws IOException { // The files has to be in ascending order for ratio-based compaction to work right // and removeExcessFile to exclude youngest files. Collections.reverse(storeFiles); @@ -270,20 +282,20 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { * Return a list of boundaries for multiple compaction output * in ascending order. */ - private List getCompactBoundariesForMajor(Collection filesToCompact, - long oldestToCompact, long now) { + private List getCompactBoundariesForMajor(Collection filesToCompact, long now) { long minTimestamp = Long.MAX_VALUE; for (StoreFile file : filesToCompact) { - minTimestamp = Math.min(minTimestamp, - file.getMinimumTimestamp() == null? Long.MAX_VALUE : file.getMinimumTimestamp()); + minTimestamp = + Math.min(minTimestamp, + file.getMinimumTimestamp() == null ? Long.MAX_VALUE : file.getMinimumTimestamp()); } List boundaries = new ArrayList(); // Add startMillis of all windows between now and min timestamp - for (Window window = getIncomingWindow(now, comConf.getBaseWindowMillis()); - window.compareToTimestamp(minTimestamp) > 0; - window = window.nextWindow(comConf.getWindowsPerTier(), oldestToCompact)) { + for (CompactionWindow window = getIncomingWindow(now); + window.compareToTimestamp(minTimestamp) > 0; + window = window.nextWindow(now)) { boundaries.add(window.startMillis()); } boundaries.add(Long.MIN_VALUE); @@ -292,10 +304,10 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { } /** - * @return a list of boundaries for multiple compaction output - * from minTimestamp to maxTimestamp. + * @return a list of boundaries for multiple compaction output from minTimestamp to maxTimestamp. */ - private static List getCompactionBoundariesForMinor(Window window, boolean singleOutput) { + private static List getCompactionBoundariesForMinor(CompactionWindow window, + boolean singleOutput) { List boundaries = new ArrayList(); boundaries.add(Long.MIN_VALUE); if (!singleOutput) { @@ -304,29 +316,8 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { return boundaries; } - /** - * Removes all store files with max timestamp older than (current - maxAge). - * @param storeFiles all store files to consider - * @param maxAge the age in milliseconds when a store file stops participating in compaction. - * @return a list of storeFiles with the store file older than maxAge excluded - */ - private static Iterable filterOldStoreFiles(List storeFiles, - final long cutoff) { - return Iterables.filter(storeFiles, new Predicate() { - @Override - public boolean apply(StoreFile storeFile) { - // Known findbugs issue to guava. SuppressWarning or Nonnull annotation don't work. - if (storeFile == null) { - return false; - } - Long maxTimestamp = storeFile.getMaximumTimestamp(); - return maxTimestamp == null ? true : maxTimestamp >= cutoff; - } - }); - } - - private static Window getIncomingWindow(long now, long baseWindowMillis) { - return new Window(baseWindowMillis, now / baseWindowMillis); + private CompactionWindow getIncomingWindow(long now) { + return windowFactory.newIncomingWindow(now); } private static long getOldestToCompact(long maxAgeMillis, long now) { @@ -338,88 +329,4 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { return Long.MIN_VALUE; } } - - /** - * This is the class we use to partition from epoch time to now into tiers of exponential sizes of - * windows. - */ - private static final class Window { - /** - * How big a range of timestamps fit inside the window in milliseconds. - */ - private final long windowMillis; - - /** - * A timestamp t is within the window iff t / size == divPosition. - */ - private final long divPosition; - - private Window(long baseWindowMillis, long divPosition) { - windowMillis = baseWindowMillis; - this.divPosition = divPosition; - } - - /** - * Compares the window to a timestamp. - * @param timestamp the timestamp to compare. - * @return a negative integer, zero, or a positive integer as the window lies before, covering, - * or after than the timestamp. - */ - public int compareToTimestamp(long timestamp) { - if (timestamp < 0) { - try { - timestamp = LongMath.checkedSubtract(timestamp, windowMillis - 1); - } catch (ArithmeticException ae) { - timestamp = Long.MIN_VALUE; - } - } - long pos = timestamp / windowMillis; - return divPosition == pos ? 0 : divPosition < pos ? -1 : 1; - } - - /** - * Move to the new window of the same tier or of the next tier, which represents an earlier time - * span. - * @param windowsPerTier The number of contiguous windows that will have the same size. Windows - * following those will be tierBase times as big. - * @return The next window - */ - public Window nextWindow(int windowsPerTier, long oldestToCompact) { - // Don't promote to the next tier if there is not even 1 window at current tier - // or if the next window crosses the max age. - if (divPosition % windowsPerTier > 0 || - startMillis() - windowMillis * windowsPerTier < oldestToCompact) { - return new Window(windowMillis, divPosition - 1); - } else { - return new Window(windowMillis * windowsPerTier, divPosition / windowsPerTier - 1); - } - } - - /** - * Inclusive lower bound - */ - public long startMillis() { - try { - return LongMath.checkedMultiply(windowMillis, divPosition); - } catch (ArithmeticException ae) { - return Long.MIN_VALUE; - } - } - - /** - * Exclusive upper bound - */ - public long endMillis() { - try { - return LongMath.checkedMultiply(windowMillis, (divPosition + 1)); - } catch (ArithmeticException ae) { - return Long.MAX_VALUE; - } - } - - @Override - public String toString() { - return "[" + startMillis() + ", " + endMillis() + ")"; - } - } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExponentialCompactionWindowFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExponentialCompactionWindowFactory.java new file mode 100644 index 0000000..abe4d83 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExponentialCompactionWindowFactory.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import com.google.common.math.LongMath; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Exponential compaction window implementation. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class ExponentialCompactionWindowFactory extends CompactionWindowFactory { + + private static final Log LOG = LogFactory.getLog(ExponentialCompactionWindowFactory.class); + + public static final String BASE_WINDOW_MILLIS_KEY = + "hbase.hstore.compaction.date.tiered.base.window.millis"; + public static final String WINDOWS_PER_TIER_KEY = + "hbase.hstore.compaction.date.tiered.windows.per.tier"; + public static final String MAX_TIER_AGE_MILLIS_KEY = + "hbase.hstore.compaction.date.tiered.max.tier.millis"; + + private final class Window extends CompactionWindow { + /** + * How big a range of timestamps fit inside the window in milliseconds. + */ + private final long windowMillis; + + /** + * A timestamp t is within the window iff t / size == divPosition. + */ + private final long divPosition; + + public Window(long baseWindowMillis, long divPosition) { + this.windowMillis = baseWindowMillis; + this.divPosition = divPosition; + } + + @Override + public int compareToTimestamp(long timestamp) { + if (timestamp < 0) { + try { + timestamp = LongMath.checkedSubtract(timestamp, windowMillis - 1); + } catch (ArithmeticException ae) { + timestamp = Long.MIN_VALUE; + } + } + long pos = timestamp / windowMillis; + return divPosition == pos ? 0 : divPosition < pos ? -1 : 1; + } + + @Override + public Window nextWindow(long now) { + // Don't promote to the next tier if there is not even 1 window at current tier + // or if the next window crosses the max age. + long maxTierAgeCutoff = getMaxTierAgeCutoff(now); + if (divPosition % windowsPerTier > 0 + || startMillis() - windowMillis * windowsPerTier < maxTierAgeCutoff) { + return new Window(windowMillis, divPosition - 1); + } else { + return new Window(windowMillis * windowsPerTier, divPosition / windowsPerTier - 1); + } + } + + @Override + public long startMillis() { + try { + return LongMath.checkedMultiply(windowMillis, divPosition); + } catch (ArithmeticException ae) { + return Long.MIN_VALUE; + } + } + + @Override + public long endMillis() { + try { + return LongMath.checkedMultiply(windowMillis, (divPosition + 1)); + } catch (ArithmeticException ae) { + return Long.MAX_VALUE; + } + } + } + + private final long baseWindowMillis; + private final int windowsPerTier; + private final long maxTierAgeMillis; + + private long getMaxTierAgeCutoff(long now) { + try { + return LongMath.checkedSubtract(now, maxTierAgeMillis); + } catch (ArithmeticException ae) { + LOG.warn("Value for " + MAX_TIER_AGE_MILLIS_KEY + ": " + maxTierAgeMillis + + ". Will always promote to next tier."); + return Long.MIN_VALUE; + } + } + + public ExponentialCompactionWindowFactory(CompactionConfiguration comConf) { + Configuration conf = comConf.conf; + baseWindowMillis = conf.getLong(BASE_WINDOW_MILLIS_KEY, 3600000 * 6); + windowsPerTier = conf.getInt(WINDOWS_PER_TIER_KEY, 4); + maxTierAgeMillis = conf.getLong(MAX_TIER_AGE_MILLIS_KEY, comConf.getMaxStoreFileAgeMillis()); + LOG.info(this); + } + + @Override + public CompactionWindow newIncomingWindow(long now) { + return new Window(baseWindowMillis, now / baseWindowMillis); + } + + @Override + public String toString() { + return String.format("%s [base window in milliseconds %d, windows per tier %d]", + getClass().getSimpleName(), baseWindowMillis, windowsPerTier); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java new file mode 100644 index 0000000..4dce696 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import static org.junit.Assert.*; + +public class AbstractTestDateTieredCompactionPolicy extends TestCompactionPolicy { + + protected ArrayList sfCreate(long[] minTimestamps, long[] maxTimestamps, long[] sizes) + throws IOException { + ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(timeMachine); + // Has to be > 0 and < now. + timeMachine.setValue(1); + ArrayList ageInDisk = new ArrayList(); + for (int i = 0; i < sizes.length; i++) { + ageInDisk.add(0L); + } + + ArrayList ret = Lists.newArrayList(); + for (int i = 0; i < sizes.length; i++) { + MockStoreFile msf = + new MockStoreFile(TEST_UTIL, TEST_FILE, sizes[i], ageInDisk.get(i), false, i); + msf.setTimeRangeTracker(new TimeRangeTracker(minTimestamps[i], maxTimestamps[i])); + ret.add(msf); + } + return ret; + } + + protected void compactEquals(long now, ArrayList candidates, long[] expectedFileSizes, + long[] expectedBoundaries, boolean isMajor, boolean toCompact) throws IOException { + ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(timeMachine); + timeMachine.setValue(now); + DateTieredCompactionRequest request; + DateTieredCompactionPolicy policy = + (DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy(); + if (isMajor) { + for (StoreFile file : candidates) { + ((MockStoreFile) file).setIsMajor(true); + } + assertEquals(toCompact, policy.shouldPerformMajorCompaction(candidates)); + request = (DateTieredCompactionRequest) policy.selectMajorCompaction(candidates); + } else { + assertEquals(toCompact, policy.needsCompaction(candidates, ImmutableList. of())); + request = + (DateTieredCompactionRequest) policy.selectMinorCompaction(candidates, false, false); + } + List actual = Lists.newArrayList(request.getFiles()); + assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual))); + assertEquals(Arrays.toString(expectedBoundaries), + Arrays.toString(request.getBoundaries().toArray())); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java index ecccbdd..69880e4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java @@ -17,47 +17,18 @@ */ package org.apache.hadoop.hbase.regionserver; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; -import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy; -import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.ExponentialCompactionWindowFactory; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; -import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category(SmallTests.class) -public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { - ArrayList sfCreate(long[] minTimestamps, long[] maxTimestamps, long[] sizes) - throws IOException { - ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); - EnvironmentEdgeManager.injectEdge(timeMachine); - // Has to be > 0 and < now. - timeMachine.setValue(1); - ArrayList ageInDisk = new ArrayList(); - for (int i = 0; i < sizes.length; i++) { - ageInDisk.add(0L); - } - - ArrayList ret = Lists.newArrayList(); - for (int i = 0; i < sizes.length; i++) { - MockStoreFile msf = - new MockStoreFile(TEST_UTIL, TEST_FILE, sizes[i], ageInDisk.get(i), false, i); - msf.setTimeRangeTracker(new TimeRangeTracker(minTimestamps[i], maxTimestamps[i])); - ret.add(msf); - } - return ret; - } +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestDateTieredCompactionPolicy extends AbstractTestDateTieredCompactionPolicy { @Override protected void config() { @@ -68,8 +39,8 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine"); conf.setLong(CompactionConfiguration.MAX_AGE_MILLIS_KEY, 100); conf.setLong(CompactionConfiguration.INCOMING_WINDOW_MIN_KEY, 3); - conf.setLong(CompactionConfiguration.BASE_WINDOW_MILLIS_KEY, 6); - conf.setInt(CompactionConfiguration.WINDOWS_PER_TIER_KEY, 4); + conf.setLong(ExponentialCompactionWindowFactory.BASE_WINDOW_MILLIS_KEY, 6); + conf.setInt(ExponentialCompactionWindowFactory.WINDOWS_PER_TIER_KEY, 4); conf.setBoolean(CompactionConfiguration.SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, false); // Special settings for compaction policy per window @@ -81,32 +52,6 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 10); } - void compactEquals(long now, ArrayList candidates, long[] expectedFileSizes, - long[] expectedBoundaries, boolean isMajor, boolean toCompact) throws IOException { - ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); - EnvironmentEdgeManager.injectEdge(timeMachine); - timeMachine.setValue(now); - DateTieredCompactionRequest request; - if (isMajor) { - for (StoreFile file : candidates) { - ((MockStoreFile)file).setIsMajor(true); - } - Assert.assertEquals(toCompact, ((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy()) - .shouldPerformMajorCompaction(candidates)); - request = (DateTieredCompactionRequest) ((DateTieredCompactionPolicy) store.storeEngine - .getCompactionPolicy()).selectMajorCompaction(candidates); - } else { - Assert.assertEquals(toCompact, ((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy()) - .needsCompaction(candidates, ImmutableList. of())); - request = (DateTieredCompactionRequest) ((DateTieredCompactionPolicy) store.storeEngine - .getCompactionPolicy()).selectMinorCompaction(candidates, false, false); - } - List actual = Lists.newArrayList(request.getFiles()); - Assert.assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual))); - Assert.assertEquals(Arrays.toString(expectedBoundaries), - Arrays.toString(request.getBoundaries().toArray())); - } - /** * Test for incoming window * @throws IOException with error @@ -302,24 +247,4 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { 41, 42, 33, 30, 31, 2, 1 }, new long[] { Long.MIN_VALUE, -144, -120, -96, -72, -48, -24, 0, 6, 12 }, true, true); } - - /** - * Major compaction with maximum values - * @throws IOException with error - */ - @Test - public void maxValuesForMajor() throws IOException { - conf.setLong(CompactionConfiguration.BASE_WINDOW_MILLIS_KEY, Long.MAX_VALUE / 2); - conf.setInt(CompactionConfiguration.WINDOWS_PER_TIER_KEY, 2); - store.storeEngine.getCompactionPolicy().setConf(conf); - long[] minTimestamps = - new long[] { Long.MIN_VALUE, -100 }; - long[] maxTimestamps = new long[] { -8, Long.MAX_VALUE }; - long[] sizes = new long[] { 0, 1 }; - - compactEquals(Long.MAX_VALUE, sfCreate(minTimestamps, maxTimestamps, sizes), - new long[] { 0, 1 }, - new long[] { Long.MIN_VALUE, -4611686018427387903L, 0, 4611686018427387903L, - 9223372036854775806L }, true, true); - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyOverflow.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyOverflow.java new file mode 100644 index 0000000..aba40ca --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyOverflow.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.regionserver.compactions.ExponentialCompactionWindowFactory; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestDateTieredCompactionPolicyOverflow extends AbstractTestDateTieredCompactionPolicy { + @Override + protected void config() { + super.config(); + + // Set up policy + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, + "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine"); + conf.setLong(CompactionConfiguration.MAX_AGE_MILLIS_KEY, 100); + conf.setLong(CompactionConfiguration.INCOMING_WINDOW_MIN_KEY, 3); + conf.setLong(ExponentialCompactionWindowFactory.BASE_WINDOW_MILLIS_KEY, Long.MAX_VALUE / 2); + conf.setInt(ExponentialCompactionWindowFactory.WINDOWS_PER_TIER_KEY, 2); + conf.setBoolean(CompactionConfiguration.SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, false); + + // Special settings for compaction policy per window + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2); + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 12); + this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F); + + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 20); + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 10); + } + + /** + * Major compaction with maximum values + * @throws IOException with error + */ + @Test + public void maxValuesForMajor() throws IOException { + long[] minTimestamps = new long[] { Long.MIN_VALUE, -100 }; + long[] maxTimestamps = new long[] { -8, Long.MAX_VALUE }; + long[] sizes = new long[] { 0, 1 }; + + compactEquals(Long.MAX_VALUE, sfCreate(minTimestamps, maxTimestamps, sizes), + new long[] { 0, 1 }, new long[] { Long.MIN_VALUE, -4611686018427387903L, 0, + 4611686018427387903L, 9223372036854775806L }, true, true); + } +} -- 1.9.1