From d777b645c7e51fb097579ab069b57f14247e4780 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 13 Apr 2016 20:44:57 +0800 Subject: [PATCH] HBASE-15454 Archive store files older than max age --- hbase-server/pom.xml | 5 +- .../hbase/regionserver/DateTieredStoreEngine.java | 8 +- ...endricalHotWarmColdCompactionWindowFactory.java | 184 ++++++++++++ .../compactions/CompactionConfiguration.java | 53 ++-- .../regionserver/compactions/CompactionWindow.java | 60 ++++ .../compactions/CompactionWindowFactory.java | 29 ++ .../compactions/DateTieredCompactionPolicy.java | 317 +++++++++++---------- .../ExponentialCompactionWindowFactory.java | 122 ++++++++ .../compactions/RatioBasedCompactionPolicy.java | 6 +- .../compactions/SortedCompactionPolicy.java | 26 +- .../AbstractTestDateTieredCompactionPolicy.java | 93 ++++++ .../TestDateTieredCompactionPolicy.java | 94 +----- .../TestDateTieredCompactionPolicyArchive.java | 92 ++++++ .../TestDateTieredCompactionPolicyOverflow.java | 73 +++++ pom.xml | 6 + 15 files changed, 892 insertions(+), 276 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CalendricalHotWarmColdCompactionWindowFactory.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindow.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindowFactory.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExponentialCompactionWindowFactory.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyArchive.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyOverflow.java diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index d5f1e30..a17a9a2 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -535,7 +535,10 @@ io.netty netty-all - + + joda-time + joda-time + org.apache.htrace diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java index 773baab..b4a9f97 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java @@ -42,10 +42,10 @@ import org.apache.hadoop.hbase.security.User; @InterfaceAudience.Private public class DateTieredStoreEngine extends StoreEngine { + @Override public boolean needsCompaction(List filesCompacting) { - return compactionPolicy.needsCompaction(storeFileManager.getStorefiles(), - filesCompacting); + return compactionPolicy.needsCompaction(storeFileManager.getStorefiles(), filesCompacting); } @Override @@ -74,8 +74,8 @@ public class DateTieredStoreEngine extends StoreEngine filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak, boolean forceMajor) throws IOException { - request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(), filesCompacting, - isUserCompaction, mayUseOffPeak, forceMajor); + request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(), filesCompacting, isUserCompaction, + mayUseOffPeak, forceMajor); return request != null; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CalendricalHotWarmColdCompactionWindowFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CalendricalHotWarmColdCompactionWindowFactory.java new file mode 100644 index 0000000..13fc8c9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CalendricalHotWarmColdCompactionWindowFactory.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.joda.time.DateTimeField; +import org.joda.time.DateTimeFieldType; +import org.joda.time.DateTimeZone; +import org.joda.time.chrono.ISOChronology; + +/** + * One hot window tier, one warm window tier(only one window per tier), and one cold window tier(the + * number of window in this tier grow as time elapsed). + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class CalendricalHotWarmColdCompactionWindowFactory extends CompactionWindowFactory { + + private static final Log LOG = LogFactory.getLog(ExponentialCompactionWindowFactory.class); + + private static final ImmutableMap UNIT_TO_TYPE = ImmutableMap.of("Y", + DateTimeFieldType.year(), "M", DateTimeFieldType.monthOfYear(), "W", + DateTimeFieldType.weekOfWeekyear(), "D", DateTimeFieldType.dayOfMonth(), "H", + DateTimeFieldType.hourOfDay()); + + public static final String WINDOW_TIME_ZONE = "hbase.hstore.compaction.date.tiered.calendrical.time.zone"; + public static final String HOT_WINDOW_UNIT = "hbase.hstore.compaction.date.tiered.calendrical.base.hot.window.unit"; + public static final String HOT_WINDOWS_PER_TIER = "hbase.hstore.compaction.date.tiered.calendrical.hot.windows.per.tier"; + public static final String COLD_WINDOW_UNIT = "hbase.hstore.compaction.date.tiered.calendrical.code.unit"; + + private static abstract class BaseWindow extends CompactionWindow { + + protected final long startMillis; + + protected final long endMillis; + + protected BaseWindow(long startMillis, long endMillis) { + this.startMillis = startMillis; + this.endMillis = endMillis; + } + + @Override + public int compareToTimestamp(long timestamp) { + if (timestamp >= endMillis) { + return -1; + } + if (timestamp < startMillis) { + return 1; + } + return 0; + } + + @Override + public long startMillis() { + return startMillis; + } + + @Override + public long endMillis() { + return endMillis; + } + } + + private final class HotWindow extends BaseWindow { + + private final int pos; + + public HotWindow(int pos, long startMillis, long endMillis) { + super(startMillis, endMillis); + this.pos = pos; + } + + @Override + public CompactionWindow nextWindow(long oldestToCompact) { + int nextPos = pos + 1; + if (nextPos == hotWindowsPerTier) { + long nearestColdWindowBoundary = coldWindowUnit.roundFloor(oldestToCompact); + if (startMillis == nearestColdWindowBoundary) { + // no warm tier at all + return new ColdWindow(coldWindowUnit.add(nearestColdWindowBoundary, -1), + nearestColdWindowBoundary); + } else { + return new WarmWindow(nearestColdWindowBoundary, startMillis); + } + } else { + return new HotWindow(nextPos, hotWindowUnit.add(startMillis, -1), startMillis); + } + } + } + + private final class WarmWindow extends BaseWindow { + + public WarmWindow(long startMillis, long endMillis) { + super(startMillis, endMillis); + } + + @Override + public CompactionWindow nextWindow(long oldestToCompact) { + return new ColdWindow(coldWindowUnit.add(startMillis, -1), startMillis); + } + } + + private final class ColdWindow extends BaseWindow { + + public ColdWindow(long startMillis, long endMillis) { + super(startMillis, endMillis); + } + + @Override + public CompactionWindow nextWindow(long oldestToCompact) { + return new ColdWindow(coldWindowUnit.add(startMillis, -1), startMillis); + } + } + + private final DateTimeZone zone; + + private final DateTimeField hotWindowUnit; + + private final int hotWindowsPerTier; + + private final DateTimeField coldWindowUnit; + + private void sanityCheckFactoryConfig(CompactionConfiguration comConf) { + Preconditions.checkArgument(hotWindowsPerTier > 0, "Negative hot windows per tier: %d", + hotWindowsPerTier); + Preconditions.checkArgument( + hotWindowUnit.getDurationField().getMillis(hotWindowsPerTier) <= comConf + .getMaxStoreFileAgeMillis(), + "hot window tier %d %s(s) is larger than max age", hotWindowsPerTier, hotWindowUnit, + comConf.getMaxStoreFileAgeMillis()); + } + + public CalendricalHotWarmColdCompactionWindowFactory(CompactionConfiguration comConf) { + Configuration conf = comConf.conf; + String zoneId = conf.get(WINDOW_TIME_ZONE); + if (StringUtils.isBlank(zoneId)) { + this.zone = DateTimeZone.getDefault(); + } else { + this.zone = DateTimeZone.forID(zoneId); + } + ISOChronology chronology = ISOChronology.getInstance(zone); + this.hotWindowUnit = UNIT_TO_TYPE.get(conf.get(HOT_WINDOW_UNIT, "D")).getField(chronology); + this.hotWindowsPerTier = conf.getInt(HOT_WINDOWS_PER_TIER, 4); + this.coldWindowUnit = UNIT_TO_TYPE.get(conf.get(HOT_WINDOW_UNIT, "M")).getField(chronology); + sanityCheckFactoryConfig(comConf); + LOG.info(this); + } + + @Override + public CompactionWindow newIncomingWindow(long now) { + long startMillis = hotWindowUnit.roundFloor(now); + long endMillis = hotWindowUnit.add(startMillis, 1); + return new HotWindow(0, startMillis, endMillis); + } + + @Override + public String toString() { + return String.format( + "%s [time zone %s, hot window unit %d, hot windows per tier %d," + " cold window unit %s]", + getClass().getSimpleName(), zone, hotWindowUnit, hotWindowsPerTier, coldWindowUnit); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java index 97cc404..837b5b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -72,10 +72,6 @@ public class CompactionConfiguration { */ public static final String MAX_AGE_MILLIS_KEY = "hbase.hstore.compaction.date.tiered.max.storefile.age.millis"; - public static final String BASE_WINDOW_MILLIS_KEY = - "hbase.hstore.compaction.date.tiered.base.window.millis"; - public static final String WINDOWS_PER_TIER_KEY = - "hbase.hstore.compaction.date.tiered.windows.per.tier"; public static final String INCOMING_WINDOW_MIN_KEY = "hbase.hstore.compaction.date.tiered.incoming.window.min"; public static final String COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY = @@ -86,6 +82,15 @@ public class CompactionConfiguration { private static final Class DEFAULT_TIER_COMPACTION_POLICY_CLASS = ExploringCompactionPolicy.class; + public static final String COMPACTION_WINDOW_FACTORY_CLASS = + "hbase.hstore.compaction.date.tiered.window.factory.class"; + + private static final Class DEFAULT_COMPACTION_WINDOW_FACTORY_CLASS = + ExponentialCompactionWindowFactory.class; + + public static final String ARCHIVE_FILES_OLDER_THAN_MAX_AGE = + "hbase.hstore.compaction.date.tiered.archive.files.older.than.max.age"; + Configuration conf; StoreConfigInformation storeConfigInfo; @@ -103,11 +108,11 @@ public class CompactionConfiguration { private final float majorCompactionJitter; private final float minLocalityToForceCompact; private final long maxStoreFileAgeMillis; - private final long baseWindowMillis; - private final int windowsPerTier; private final int incomingWindowMin; private final String compactionPolicyForTieredWindow; private final boolean singleOutputForMinorCompaction; + private final String compactionWindowFactory; + private final boolean archiveFilesOlderThanMaxAge; CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) { this.conf = conf; @@ -132,14 +137,14 @@ public class CompactionConfiguration { minLocalityToForceCompact = conf.getFloat(HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT, 0f); maxStoreFileAgeMillis = conf.getLong(MAX_AGE_MILLIS_KEY, Long.MAX_VALUE); - baseWindowMillis = conf.getLong(BASE_WINDOW_MILLIS_KEY, 3600000 * 6); - windowsPerTier = conf.getInt(WINDOWS_PER_TIER_KEY, 4); incomingWindowMin = conf.getInt(INCOMING_WINDOW_MIN_KEY, 6); compactionPolicyForTieredWindow = conf.get(COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY, DEFAULT_TIER_COMPACTION_POLICY_CLASS.getName()); singleOutputForMinorCompaction = conf.getBoolean(SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, true); - + this.compactionWindowFactory = conf.get(COMPACTION_WINDOW_FACTORY_CLASS, + DEFAULT_COMPACTION_WINDOW_FACTORY_CLASS.getName()); + this.archiveFilesOlderThanMaxAge = conf.getBoolean(ARCHIVE_FILES_OLDER_THAN_MAX_AGE, false); LOG.info(this); } @@ -148,8 +153,9 @@ public class CompactionConfiguration { return String.format( "size [%d, %d, %d); files [%d, %d); ratio %f; off-peak ratio %f; throttle point %d;" + " major period %d, major jitter %f, min locality to compact %f;" - + " tiered compaction: max_age %d, base window in milliseconds %d, windows per tier %d," - + "incoming window min %d", + + " tiered compaction: max_age %d, incoming window min %d," + + " compaction policy for tiered window %s, single output for minor %b," + + " compaction window factory %s, archive files older than max age", minCompactSize, maxCompactSize, offPeakMaxCompactSize, @@ -162,9 +168,12 @@ public class CompactionConfiguration { majorCompactionJitter, minLocalityToForceCompact, maxStoreFileAgeMillis, - baseWindowMillis, - windowsPerTier, - incomingWindowMin); + incomingWindowMin, + compactionPolicyForTieredWindow, + singleOutputForMinorCompaction, + compactionWindowFactory, + archiveFilesOlderThanMaxAge + ); } /** @@ -265,14 +274,6 @@ public class CompactionConfiguration { return maxStoreFileAgeMillis; } - public long getBaseWindowMillis() { - return baseWindowMillis; - } - - public int getWindowsPerTier() { - return windowsPerTier; - } - public int getIncomingWindowMin() { return incomingWindowMin; } @@ -284,4 +285,12 @@ public class CompactionConfiguration { public boolean useSingleOutputForMinorCompaction() { return singleOutputForMinorCompaction; } + + public String getCompactionWindowFactory() { + return compactionWindowFactory; + } + + public boolean isArchiveFilesOlderThanMaxAge() { + return archiveFilesOlderThanMaxAge; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindow.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindow.java new file mode 100644 index 0000000..171afc1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindow.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Base class for compaction window implementation. + *

+ * It is better make sure that the window before {@code oldestToCompact} is fixed and never promoted + * since we will archive the files in each window into a single file. + */ +@InterfaceAudience.Private +public abstract class CompactionWindow { + + /** + * Compares the window to a timestamp. + * @param timestamp the timestamp to compare. + * @return a negative integer, zero, or a positive integer as the window lies before, covering, or + * after than the timestamp. + */ + public abstract int compareToTimestamp(long timestamp); + + /** + * Move to the new window of the same tier or of the next tier, which represents an earlier time + * span. + * @return The next window + */ + public abstract CompactionWindow nextWindow(long oldestToCompact); + + /** + * Inclusive lower bound + */ + public abstract long startMillis(); + + /** + * Exclusive upper bound + */ + public abstract long endMillis(); + + @Override + public String toString() { + return "[" + startMillis() + ", " + endMillis() + ")"; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindowFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindowFactory.java new file mode 100644 index 0000000..01b708e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindowFactory.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * For creating compaction window. + */ +@InterfaceAudience.Private +public abstract class CompactionWindowFactory { + + public abstract CompactionWindow newIncomingWindow(long now); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java index d61af42..0e25705 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java @@ -18,20 +18,19 @@ */ package org.apache.hadoop.hbase.regionserver.compactions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.PeekingIterator; import com.google.common.math.LongMath; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -50,21 +49,31 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; /** * HBASE-15181 This is a simple implementation of date-based tiered compaction similar to * Cassandra's for the following benefits: - * 1. Improve date-range-based scan by structuring store files in date-based tiered layout. - * 2. Reduce compaction overhead. - * 3. Improve TTL efficiency. + *

    + *
  1. Improve date-range-based scan by structuring store files in date-based tiered layout.
  2. + *
  3. Reduce compaction overhead.
  4. + *
  5. Improve TTL efficiency.
  6. + *
* Perfect fit for the use cases that: - * 1. has mostly date-based data write and scan and a focus on the most recent data. - * Out-of-order writes are handled gracefully. Time range overlapping among store files is - * tolerated and the performance impact is minimized. Configuration can be set at hbase-site - * or overridden at per-table or per-column-family level by hbase shell. Design spec is at + *
    + *
  1. has mostly date-based data write and scan and a focus on the most recent data.
  2. + *
+ * Out-of-order writes are handled gracefully. Time range overlapping among store files is tolerated + * and the performance impact is minimized. Configuration can be set at hbase-site or overridden at + * per-table or per-column-family level by hbase shell. Design spec is at * https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/ */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) public class DateTieredCompactionPolicy extends SortedCompactionPolicy { + private static final Log LOG = LogFactory.getLog(DateTieredCompactionPolicy.class); - private RatioBasedCompactionPolicy compactionPolicyPerWindow; + private static DateTieredCompactionRequest EMPTY_REQUEST = new DateTieredCompactionRequest( + Collections. emptyList(), Collections. emptyList()); + + private final RatioBasedCompactionPolicy compactionPolicyPerWindow; + + private final CompactionWindowFactory windowFactory; public DateTieredCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) throws IOException { @@ -78,6 +87,14 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { throw new IOException("Unable to load configured compaction policy '" + comConf.getCompactionPolicyForTieredWindow() + "'", e); } + try { + windowFactory = ReflectionUtils.instantiateWithCustomCtor( + comConf.getCompactionWindowFactory(), new Class[] { CompactionConfiguration.class }, + new Object[] { comConf }); + } catch (Exception e) { + throw new IOException("Unable to load configured compaction policy '" + + comConf.getCompactionPolicyForTieredWindow() + "'", e); + } } /** @@ -87,9 +104,11 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { @VisibleForTesting public boolean needsCompaction(final Collection storeFiles, final List filesCompacting) { - ArrayList candidates = new ArrayList(storeFiles); + ArrayList candidates = getCurrentEligibleFiles(new ArrayList(storeFiles), + filesCompacting); try { - return selectMinorCompaction(candidates, false, true) != null; + return !selectMinorCompaction(storeFiles, filesCompacting, candidates, false, true).getFiles() + .isEmpty(); } catch (Exception e) { LOG.error("Can not check for compaction: ", e); return false; @@ -164,10 +183,12 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { } @Override - protected CompactionRequest createCompactionRequest(ArrayList candidateSelection, - boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { + protected CompactionRequest createCompactionRequest(Collection candidateFiles, + List filesCompacting, ArrayList candidateSelection, boolean tryingMajor, + boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { CompactionRequest result = tryingMajor ? selectMajorCompaction(candidateSelection) - : selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck); + : selectMinorCompaction(candidateFiles, filesCompacting, candidateSelection, mayUseOffPeak, + mayBeStuck); LOG.debug("Generated compaction request: " + result); return result; } @@ -179,6 +200,94 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { this.getCompactBoundariesForMajor(candidateSelection, oldestToCompact, now)); } + private boolean shouldIncludeInArchive(long startMillis, long endMillis, StoreFile file) { + if (file.getMinimumTimestamp() == null) { + if (file.getMaximumTimestamp() == null) { + return true; + } else { + return file.getMaximumTimestamp().longValue() >= startMillis; + } + } else { + if (file.getMaximumTimestamp() == null) { + return file.getMinimumTimestamp().longValue() < endMillis; + } else { + return file.getMinimumTimestamp().longValue() < endMillis + && file.getMaximumTimestamp().longValue() >= startMillis; + } + } + } + + private CompactionRequest tryArchive(Collection candidateFiles, + List filesCompacting, CompactionWindow newestArchiveWindow, long oldestToCompact, + long now) { + if (!comConf.isArchiveFilesOlderThanMaxAge()) { + // A non-null file list is expected by HStore + return EMPTY_REQUEST; + } + long minTimestamp = Long.MAX_VALUE; + for (StoreFile file : candidateFiles) { + minTimestamp = Math.min(minTimestamp, file.getMinimumTimestamp() == null ? Long.MAX_VALUE + : file.getMinimumTimestamp().longValue()); + } + List archiveBoundaries = Lists.newArrayList(); + archiveBoundaries.add(newestArchiveWindow.endMillis()); + for (CompactionWindow window = newestArchiveWindow; window + .compareToTimestamp(minTimestamp) >= 0; window = window.nextWindow(oldestToCompact)) { + archiveBoundaries.add(window.startMillis()); + } + Collections.reverse(archiveBoundaries); + if (archiveBoundaries.size() < 2) { + return EMPTY_REQUEST; + } + List candidates = Lists.newArrayList(candidateFiles); + for (int i = 0, n = archiveBoundaries.size() - 1; i < n; i++) { + long startMillis = archiveBoundaries.get(i); + long endMillis = archiveBoundaries.get(i + 1); + int first = 0, total = candidates.size(); + for (; first < total; first++) { + if (shouldIncludeInArchive(startMillis, endMillis, candidates.get(first))) { + break; + } + } + if (first == total) { + continue; + } + int last = total - 1; + for (; last > first; last--) { + if (shouldIncludeInArchive(startMillis, endMillis, candidates.get(last))) { + break; + } + } + if (last == first) { + // check if the only file fits in the window. Otherwise we still need a compaction to move + // the data that does not belongs to this window to other windows. + StoreFile file = candidates.get(first); + if (file.getMinimumTimestamp() != null && file.getMaximumTimestamp() != null + && file.getMinimumTimestamp().longValue() >= startMillis + && file.getMaximumTimestamp().longValue() < endMillis) { + continue; + } + } + if (!filesCompacting.isEmpty()) { + // check if we are overlapped with filesCompacting. + int idx = candidates.indexOf(filesCompacting.get(0)); + assert idx >= 0; + if (last >= idx && first < idx + filesCompacting.size()) { + continue; + } + } + if (last - first + 1 > comConf.getMaxFilesToCompact()) { + LOG.warn("too many files(got " + (last - first + 1) + ", expected less than or equal to " + + comConf.getMaxFilesToCompact() + ") to compact when archiving [" + startMillis + ", " + + endMillis + "), give up"); + continue; + } + List filesToCompact = Lists.newArrayList(candidates.subList(first, last + 1)); + return new DateTieredCompactionRequest(filesToCompact, + getCompactBoundariesForMajor(filesToCompact, oldestToCompact, now)); + } + return EMPTY_REQUEST; + } /** * We receive store files sorted in ascending order by seqId then scan the list of files. If the * current file has a maxTimestamp older than last known maximum, treat this file as it carries @@ -187,14 +296,14 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { * by seqId and maxTimestamp in descending order and build the time windows. All the out-of-order * data into the same compaction windows, guaranteeing contiguous compaction based on sequence id. */ - public CompactionRequest selectMinorCompaction(ArrayList candidateSelection, + public CompactionRequest selectMinorCompaction(Collection candidateFiles, + List filesCompacting, ArrayList candidateSelection, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { long now = EnvironmentEdgeManager.currentTime(); long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now); // Make sure the store files is sorted by SeqId then maxTimestamp - List storeFileList = Lists.newArrayList(filterOldStoreFiles(candidateSelection, - oldestToCompact)); + List storeFileList = Lists.newArrayList(candidateSelection); Collections.sort(storeFileList, StoreFile.Comparators.SEQ_ID_MAX_TIMESTAMP); List> storefileMaxTimestampPairs = @@ -203,22 +312,25 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { for (StoreFile storeFile : storeFileList) { // if there is out-of-order data, // we put them in the same window as the last file in increasing order - maxTimestampSeen = Math.max(maxTimestampSeen, - storeFile.getMaximumTimestamp() == null? Long.MIN_VALUE : storeFile.getMaximumTimestamp()); + maxTimestampSeen = Math.max(maxTimestampSeen, storeFile.getMaximumTimestamp() == null + ? Long.MIN_VALUE : storeFile.getMaximumTimestamp().longValue()); storefileMaxTimestampPairs.add(new Pair(storeFile, maxTimestampSeen)); } Collections.reverse(storefileMaxTimestampPairs); - Window window = getIncomingWindow(now, comConf.getBaseWindowMillis()); + CompactionWindow window = getIncomingWindow(now); int minThreshold = comConf.getIncomingWindowMin(); PeekingIterator> it = Iterators.peekingIterator(storefileMaxTimestampPairs.iterator()); while (it.hasNext()) { + if (window.compareToTimestamp(oldestToCompact) < 0) { + // the whole window lies before oldestToCompact + break; + } int compResult = window.compareToTimestamp(it.peek().getSecond()); if (compResult > 0) { // If the file is too old for the window, switch to the next window - window = window.nextWindow(comConf.getWindowsPerTier(), - oldestToCompact); + window = window.nextWindow(oldestToCompact); minThreshold = comConf.getMinFilesToCompact(); } else { // The file is within the target window @@ -238,13 +350,12 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { } } } - // A non-null file list is expected by HStore - return new CompactionRequest(Collections. emptyList()); + return tryArchive(candidateFiles, filesCompacting, window, oldestToCompact, now); } private DateTieredCompactionRequest generateCompactionRequest(ArrayList storeFiles, - Window window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold) - throws IOException { + CompactionWindow window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold) + throws IOException { // The files has to be in ascending order for ratio-based compaction to work right // and removeExcessFile to exclude youngest files. Collections.reverse(storeFiles); @@ -267,35 +378,42 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { } /** - * Return a list of boundaries for multiple compaction output - * in ascending order. + * Return a list of boundaries for multiple compaction output in ascending order. */ private List getCompactBoundariesForMajor(Collection filesToCompact, long oldestToCompact, long now) { long minTimestamp = Long.MAX_VALUE; + long maxTimestamp = Long.MIN_VALUE; for (StoreFile file : filesToCompact) { - minTimestamp = Math.min(minTimestamp, - file.getMinimumTimestamp() == null? Long.MAX_VALUE : file.getMinimumTimestamp()); + minTimestamp = Math.min(minTimestamp, file.getMinimumTimestamp() == null ? Long.MAX_VALUE + : file.getMinimumTimestamp().longValue()); + maxTimestamp = Math.max(maxTimestamp, file.getMaximumTimestamp() == null ? Long.MIN_VALUE + : file.getMaximumTimestamp().longValue()); } - List boundaries = new ArrayList(); + List boundaries = Lists.newArrayList(); + CompactionWindow window = getIncomingWindow(now); + // find the first window that covers the max timestamp. + while (window.compareToTimestamp(maxTimestamp) > 0) { + window = window.nextWindow(oldestToCompact); + } + boundaries.add(window.startMillis()); - // Add startMillis of all windows between now and min timestamp - for (Window window = getIncomingWindow(now, comConf.getBaseWindowMillis()); - window.compareToTimestamp(minTimestamp) > 0; - window = window.nextWindow(comConf.getWindowsPerTier(), oldestToCompact)) { + // Add startMillis of all windows between overall max and min timestamp + for (window = window.nextWindow(oldestToCompact); window + .compareToTimestamp(minTimestamp) > 0; window = window.nextWindow(oldestToCompact)) { boundaries.add(window.startMillis()); } - boundaries.add(Long.MIN_VALUE); + boundaries.add(minTimestamp); Collections.reverse(boundaries); return boundaries; } /** - * @return a list of boundaries for multiple compaction output - * from minTimestamp to maxTimestamp. + * @return a list of boundaries for multiple compaction output from minTimestamp to maxTimestamp. */ - private static List getCompactionBoundariesForMinor(Window window, boolean singleOutput) { + private static List getCompactionBoundariesForMinor(CompactionWindow window, + boolean singleOutput) { List boundaries = new ArrayList(); boundaries.add(Long.MIN_VALUE); if (!singleOutput) { @@ -304,29 +422,8 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { return boundaries; } - /** - * Removes all store files with max timestamp older than (current - maxAge). - * @param storeFiles all store files to consider - * @param maxAge the age in milliseconds when a store file stops participating in compaction. - * @return a list of storeFiles with the store file older than maxAge excluded - */ - private static Iterable filterOldStoreFiles(List storeFiles, - final long cutoff) { - return Iterables.filter(storeFiles, new Predicate() { - @Override - public boolean apply(StoreFile storeFile) { - // Known findbugs issue to guava. SuppressWarning or Nonnull annotation don't work. - if (storeFile == null) { - return false; - } - Long maxTimestamp = storeFile.getMaximumTimestamp(); - return maxTimestamp == null ? true : maxTimestamp >= cutoff; - } - }); - } - - private static Window getIncomingWindow(long now, long baseWindowMillis) { - return new Window(baseWindowMillis, now / baseWindowMillis); + private CompactionWindow getIncomingWindow(long now) { + return windowFactory.newIncomingWindow(now); } private static long getOldestToCompact(long maxAgeMillis, long now) { @@ -338,88 +435,4 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { return Long.MIN_VALUE; } } - - /** - * This is the class we use to partition from epoch time to now into tiers of exponential sizes of - * windows. - */ - private static final class Window { - /** - * How big a range of timestamps fit inside the window in milliseconds. - */ - private final long windowMillis; - - /** - * A timestamp t is within the window iff t / size == divPosition. - */ - private final long divPosition; - - private Window(long baseWindowMillis, long divPosition) { - windowMillis = baseWindowMillis; - this.divPosition = divPosition; - } - - /** - * Compares the window to a timestamp. - * @param timestamp the timestamp to compare. - * @return a negative integer, zero, or a positive integer as the window lies before, covering, - * or after than the timestamp. - */ - public int compareToTimestamp(long timestamp) { - if (timestamp < 0) { - try { - timestamp = LongMath.checkedSubtract(timestamp, windowMillis - 1); - } catch (ArithmeticException ae) { - timestamp = Long.MIN_VALUE; - } - } - long pos = timestamp / windowMillis; - return divPosition == pos ? 0 : divPosition < pos ? -1 : 1; - } - - /** - * Move to the new window of the same tier or of the next tier, which represents an earlier time - * span. - * @param windowsPerTier The number of contiguous windows that will have the same size. Windows - * following those will be tierBase times as big. - * @return The next window - */ - public Window nextWindow(int windowsPerTier, long oldestToCompact) { - // Don't promote to the next tier if there is not even 1 window at current tier - // or if the next window crosses the max age. - if (divPosition % windowsPerTier > 0 || - startMillis() - windowMillis * windowsPerTier < oldestToCompact) { - return new Window(windowMillis, divPosition - 1); - } else { - return new Window(windowMillis * windowsPerTier, divPosition / windowsPerTier - 1); - } - } - - /** - * Inclusive lower bound - */ - public long startMillis() { - try { - return LongMath.checkedMultiply(windowMillis, divPosition); - } catch (ArithmeticException ae) { - return Long.MIN_VALUE; - } - } - - /** - * Exclusive upper bound - */ - public long endMillis() { - try { - return LongMath.checkedMultiply(windowMillis, (divPosition + 1)); - } catch (ArithmeticException ae) { - return Long.MAX_VALUE; - } - } - - @Override - public String toString() { - return "[" + startMillis() + ", " + endMillis() + ")"; - } - } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExponentialCompactionWindowFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExponentialCompactionWindowFactory.java new file mode 100644 index 0000000..231df6d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExponentialCompactionWindowFactory.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import com.google.common.math.LongMath; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Exponential compaction window implementation. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class ExponentialCompactionWindowFactory extends CompactionWindowFactory { + + private static final Log LOG = LogFactory.getLog(ExponentialCompactionWindowFactory.class); + + public static final String BASE_WINDOW_MILLIS_KEY = + "hbase.hstore.compaction.date.tiered.base.window.millis"; + public static final String WINDOWS_PER_TIER_KEY = + "hbase.hstore.compaction.date.tiered.windows.per.tier"; + + private final class Window extends CompactionWindow { + /** + * How big a range of timestamps fit inside the window in milliseconds. + */ + private final long windowMillis; + + /** + * A timestamp t is within the window iff t / size == divPosition. + */ + private final long divPosition; + + public Window(long baseWindowMillis, long divPosition) { + this.windowMillis = baseWindowMillis; + this.divPosition = divPosition; + } + + @Override + public int compareToTimestamp(long timestamp) { + if (timestamp < 0) { + try { + timestamp = LongMath.checkedSubtract(timestamp, windowMillis - 1); + } catch (ArithmeticException ae) { + timestamp = Long.MIN_VALUE; + } + } + long pos = timestamp / windowMillis; + return divPosition == pos ? 0 : divPosition < pos ? -1 : 1; + } + + @Override + public Window nextWindow(long oldestToCompact) { + // Don't promote to the next tier if there is not even 1 window at current tier + // or if the next window crosses the max age. + if (divPosition % windowsPerTier > 0 + || startMillis() - windowMillis * windowsPerTier < oldestToCompact) { + return new Window(windowMillis, divPosition - 1); + } else { + return new Window(windowMillis * windowsPerTier, divPosition / windowsPerTier - 1); + } + } + + @Override + public long startMillis() { + try { + return LongMath.checkedMultiply(windowMillis, divPosition); + } catch (ArithmeticException ae) { + return Long.MIN_VALUE; + } + } + + @Override + public long endMillis() { + try { + return LongMath.checkedMultiply(windowMillis, (divPosition + 1)); + } catch (ArithmeticException ae) { + return Long.MAX_VALUE; + } + } + } + + private final long baseWindowMillis; + private final int windowsPerTier; + + public ExponentialCompactionWindowFactory(CompactionConfiguration comConf) { + Configuration conf = comConf.conf; + baseWindowMillis = conf.getLong(BASE_WINDOW_MILLIS_KEY, 3600000 * 6); + windowsPerTier = conf.getInt(WINDOWS_PER_TIER_KEY, 4); + LOG.info(this); + } + + @Override + public CompactionWindow newIncomingWindow(long now) { + return new Window(baseWindowMillis, now / baseWindowMillis); + } + + @Override + public String toString() { + return String.format("%s [base window in milliseconds %d, windows per tier %d]", + getClass().getSimpleName(), baseWindowMillis, windowsPerTier); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java index 5600a4e..e366d08 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java @@ -101,9 +101,9 @@ public class RatioBasedCompactionPolicy extends SortedCompactionPolicy { } @Override - protected CompactionRequest createCompactionRequest(ArrayList - candidateSelection, boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) - throws IOException { + protected CompactionRequest createCompactionRequest(Collection candidateFiles, + List filesCompacting, ArrayList candidateSelection, boolean tryingMajor, + boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { if (!tryingMajor) { candidateSelection = filterBulk(candidateSelection); candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java index 77b0af8..f3ad629 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java @@ -10,6 +10,7 @@ */ package org.apache.hadoop.hbase.regionserver.compactions; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Collections2; @@ -65,9 +66,11 @@ public abstract class SortedCompactionPolicy extends CompactionPolicy { >= storeConfigInfo.getBlockingFileCount(); candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting); - LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " + - filesCompacting.size() + " compacting, " + candidateSelection.size() + - " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking"); + if (LOG.isDebugEnabled()) { + LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " + + filesCompacting.size() + " compacting, " + candidateSelection.size() + " eligible, " + + storeConfigInfo.getBlockingFileCount() + " blocking"); + } // If we can't have all files, we cannot do major anyway boolean isAllFiles = candidateFiles.size() == candidateSelection.size(); @@ -84,8 +87,8 @@ public abstract class SortedCompactionPolicy extends CompactionPolicy { // Or, if there are any references among the candidates. boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection); - CompactionRequest result = createCompactionRequest(candidateSelection, - isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck); + CompactionRequest result = createCompactionRequest(candidateFiles, filesCompacting, + candidateSelection, isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck); ArrayList filesToCompact = Lists.newArrayList(result.getFiles()); removeExcessFiles(filesToCompact, isUserCompaction, isTryingMajor); @@ -98,16 +101,16 @@ public abstract class SortedCompactionPolicy extends CompactionPolicy { return result; } - protected abstract CompactionRequest createCompactionRequest(ArrayList - candidateSelection, boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) - throws IOException; + protected abstract CompactionRequest createCompactionRequest(Collection candidateFiles, + List filesCompacting, ArrayList candidateSelection, boolean tryingMajor, + boolean mayUseOffPeak, boolean mayBeStuck) throws IOException; - /* + /** * @param filesToCompact Files to compact. Can be null. * @return True if we should run a major compaction. */ public abstract boolean shouldPerformMajorCompaction(final Collection filesToCompact) - throws IOException; + throws IOException; /** * Used calculation jitter @@ -155,7 +158,8 @@ public abstract class SortedCompactionPolicy extends CompactionPolicy { public abstract boolean needsCompaction(final Collection storeFiles, final List filesCompacting); - protected ArrayList getCurrentEligibleFiles(ArrayList candidateFiles, + @VisibleForTesting + public ArrayList getCurrentEligibleFiles(ArrayList candidateFiles, final List filesCompacting) { // candidates = all storefiles not already in compaction queue if (!filesCompacting.isEmpty()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java new file mode 100644 index 0000000..3898818 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import com.google.common.collect.Lists; + +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.junit.Assert; + +public abstract class AbstractTestDateTieredCompactionPolicy extends TestCompactionPolicy { + + protected ArrayList sfCreate(long[] minTimestamps, long[] maxTimestamps, long[] sizes) + throws IOException { + ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(timeMachine); + // Has to be > 0 and < now. + timeMachine.setValue(1); + ArrayList ageInDisk = new ArrayList(); + for (int i = 0; i < sizes.length; i++) { + ageInDisk.add(0L); + } + + ArrayList ret = Lists.newArrayList(); + for (int i = 0; i < sizes.length; i++) { + MockStoreFile msf = new MockStoreFile(TEST_UTIL, TEST_FILE, sizes[i], ageInDisk.get(i), false, + i); + msf.setTimeRangeTracker(new TimeRangeTracker(minTimestamps[i], maxTimestamps[i])); + ret.add(msf); + } + return ret; + } + + protected void compactEquals(long now, ArrayList candidates, long[] expectedFileSizes, + long[] expectedBoundaries, boolean isMajor, boolean toCompact) throws IOException { + compactEquals(now, candidates, Collections. emptyList(), expectedFileSizes, + expectedBoundaries, isMajor, toCompact); + } + + protected void compactEquals(long now, Collection candidateFiles, + List filesCompacting, long[] expectedFileSizes, long[] expectedBoundaries, + boolean isMajor, boolean toCompact) throws IOException { + ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(timeMachine); + timeMachine.setValue(now); + DateTieredCompactionRequest request; + DateTieredCompactionPolicy policy = (DateTieredCompactionPolicy) store.storeEngine + .getCompactionPolicy(); + if (isMajor) { + for (StoreFile file : candidateFiles) { + ((MockStoreFile) file).setIsMajor(true); + } + Assert.assertEquals(toCompact, policy.shouldPerformMajorCompaction(candidateFiles)); + request = (DateTieredCompactionRequest) policy + .selectMajorCompaction(Lists.newArrayList(candidateFiles)); + } else { + Assert.assertEquals(toCompact, + policy.needsCompaction(candidateFiles, filesCompacting)); + request = (DateTieredCompactionRequest) policy.selectMinorCompaction(candidateFiles, + filesCompacting, + policy.getCurrentEligibleFiles(Lists.newArrayList(candidateFiles), filesCompacting), false, + false); + } + List actual = Lists.newArrayList(request.getFiles()); + Assert.assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual))); + Assert.assertEquals(Arrays.toString(expectedBoundaries), + Arrays.toString(request.getBoundaries().toArray())); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java index ecccbdd..41c17d3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java @@ -17,47 +17,19 @@ */ package org.apache.hadoop.hbase.regionserver; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; -import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy; -import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionWindowFactory; +import org.apache.hadoop.hbase.regionserver.compactions.ExponentialCompactionWindowFactory; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; -import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category(SmallTests.class) -public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { - ArrayList sfCreate(long[] minTimestamps, long[] maxTimestamps, long[] sizes) - throws IOException { - ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); - EnvironmentEdgeManager.injectEdge(timeMachine); - // Has to be > 0 and < now. - timeMachine.setValue(1); - ArrayList ageInDisk = new ArrayList(); - for (int i = 0; i < sizes.length; i++) { - ageInDisk.add(0L); - } - - ArrayList ret = Lists.newArrayList(); - for (int i = 0; i < sizes.length; i++) { - MockStoreFile msf = - new MockStoreFile(TEST_UTIL, TEST_FILE, sizes[i], ageInDisk.get(i), false, i); - msf.setTimeRangeTracker(new TimeRangeTracker(minTimestamps[i], maxTimestamps[i])); - ret.add(msf); - } - return ret; - } +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestDateTieredCompactionPolicy extends AbstractTestDateTieredCompactionPolicy { @Override protected void config() { @@ -68,8 +40,10 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine"); conf.setLong(CompactionConfiguration.MAX_AGE_MILLIS_KEY, 100); conf.setLong(CompactionConfiguration.INCOMING_WINDOW_MIN_KEY, 3); - conf.setLong(CompactionConfiguration.BASE_WINDOW_MILLIS_KEY, 6); - conf.setInt(CompactionConfiguration.WINDOWS_PER_TIER_KEY, 4); + conf.setClass(CompactionConfiguration.COMPACTION_WINDOW_FACTORY_CLASS, + ExponentialCompactionWindowFactory.class, CompactionWindowFactory.class); + conf.setLong(ExponentialCompactionWindowFactory.BASE_WINDOW_MILLIS_KEY, 6); + conf.setInt(ExponentialCompactionWindowFactory.WINDOWS_PER_TIER_KEY, 4); conf.setBoolean(CompactionConfiguration.SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, false); // Special settings for compaction policy per window @@ -81,32 +55,6 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 10); } - void compactEquals(long now, ArrayList candidates, long[] expectedFileSizes, - long[] expectedBoundaries, boolean isMajor, boolean toCompact) throws IOException { - ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); - EnvironmentEdgeManager.injectEdge(timeMachine); - timeMachine.setValue(now); - DateTieredCompactionRequest request; - if (isMajor) { - for (StoreFile file : candidates) { - ((MockStoreFile)file).setIsMajor(true); - } - Assert.assertEquals(toCompact, ((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy()) - .shouldPerformMajorCompaction(candidates)); - request = (DateTieredCompactionRequest) ((DateTieredCompactionPolicy) store.storeEngine - .getCompactionPolicy()).selectMajorCompaction(candidates); - } else { - Assert.assertEquals(toCompact, ((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy()) - .needsCompaction(candidates, ImmutableList. of())); - request = (DateTieredCompactionRequest) ((DateTieredCompactionPolicy) store.storeEngine - .getCompactionPolicy()).selectMinorCompaction(candidates, false, false); - } - List actual = Lists.newArrayList(request.getFiles()); - Assert.assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual))); - Assert.assertEquals(Arrays.toString(expectedBoundaries), - Arrays.toString(request.getBoundaries().toArray())); - } - /** * Test for incoming window * @throws IOException with error @@ -284,7 +232,7 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 }; compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 0, 50, 51, 40,41, 42, - 33, 30, 31, 2, 1 }, new long[] { Long.MIN_VALUE, 24, 48, 72, 96, 120, 144, 150, 156 }, true, true); + 33, 30, 31, 2, 1 }, new long[] { 0, 24, 48, 72, 96, 120, 144, 150, 156 }, true, true); } /** @@ -300,26 +248,6 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 }, - new long[] { Long.MIN_VALUE, -144, -120, -96, -72, -48, -24, 0, 6, 12 }, true, true); - } - - /** - * Major compaction with maximum values - * @throws IOException with error - */ - @Test - public void maxValuesForMajor() throws IOException { - conf.setLong(CompactionConfiguration.BASE_WINDOW_MILLIS_KEY, Long.MAX_VALUE / 2); - conf.setInt(CompactionConfiguration.WINDOWS_PER_TIER_KEY, 2); - store.storeEngine.getCompactionPolicy().setConf(conf); - long[] minTimestamps = - new long[] { Long.MIN_VALUE, -100 }; - long[] maxTimestamps = new long[] { -8, Long.MAX_VALUE }; - long[] sizes = new long[] { 0, 1 }; - - compactEquals(Long.MAX_VALUE, sfCreate(minTimestamps, maxTimestamps, sizes), - new long[] { 0, 1 }, - new long[] { Long.MIN_VALUE, -4611686018427387903L, 0, 4611686018427387903L, - 9223372036854775806L }, true, true); + new long[] { -155, -144, -120, -96, -72, -48, -24, 0, 6, 12 }, true, true); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyArchive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyArchive.java new file mode 100644 index 0000000..ef11e29 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyArchive.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; + +import com.google.common.collect.Lists; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionWindowFactory; +import org.apache.hadoop.hbase.regionserver.compactions.ExponentialCompactionWindowFactory; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestDateTieredCompactionPolicyArchive extends AbstractTestDateTieredCompactionPolicy { + + @Override + protected void config() { + super.config(); + + // Set up policy + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, + "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine"); + conf.setLong(CompactionConfiguration.MAX_AGE_MILLIS_KEY, 100); + conf.setLong(CompactionConfiguration.INCOMING_WINDOW_MIN_KEY, 3); + conf.setClass(CompactionConfiguration.COMPACTION_WINDOW_FACTORY_CLASS, + ExponentialCompactionWindowFactory.class, CompactionWindowFactory.class); + conf.setLong(ExponentialCompactionWindowFactory.BASE_WINDOW_MILLIS_KEY, 6); + conf.setInt(ExponentialCompactionWindowFactory.WINDOWS_PER_TIER_KEY, 4); + conf.setBoolean(CompactionConfiguration.SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, false); + conf.setBoolean(CompactionConfiguration.ARCHIVE_FILES_OLDER_THAN_MAX_AGE, true); + // Special settings for compaction policy per window + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2); + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 12); + this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F); + + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 20); + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 10); + } + + @Test + public void test() throws IOException { + long[] minTimestamps = new long[] { 0, 24, 32, 150 }; + long[] maxTimestamps = new long[] { 12, 30, 56, 160 }; + long[] sizes = new long[] { 10, 20, 10, 20 }; + + compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 20, 10 }, + new long[] { 24, 48 }, false, true); + } + + @Test + public void testOneFileButOverlap() throws IOException { + long[] minTimestamps = new long[] { 0, 24, 150 }; + long[] maxTimestamps = new long[] { 12, 56, 160 }; + long[] sizes = new long[] { 10, 20, 10 }; + + compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 20 }, + new long[] { 24, 48 }, false, true); + } + + @Test + public void testCompacting() throws IOException { + long[] minTimestamps = new long[] { 0, 12, 24, 32, 150 }; + long[] maxTimestamps = new long[] { 12, 23, 30, 56, 160 }; + long[] sizes = new long[] { 10, 20, 30, 40, 10 }; + + List candidateFiles = sfCreate(minTimestamps, maxTimestamps, sizes); + + compactEquals(161, candidateFiles, Lists.newArrayList(candidateFiles.subList(0, 2)), + new long[] { 30, 40 }, new long[] { 24, 48 }, false, true); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyOverflow.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyOverflow.java new file mode 100644 index 0000000..18f0b50 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyOverflow.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionWindowFactory; +import org.apache.hadoop.hbase.regionserver.compactions.ExponentialCompactionWindowFactory; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestDateTieredCompactionPolicyOverflow extends AbstractTestDateTieredCompactionPolicy { + + @Override + protected void config() { + super.config(); + + // Set up policy + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, + "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine"); + conf.setLong(CompactionConfiguration.MAX_AGE_MILLIS_KEY, 100); + conf.setLong(CompactionConfiguration.INCOMING_WINDOW_MIN_KEY, 3); + conf.setClass(CompactionConfiguration.COMPACTION_WINDOW_FACTORY_CLASS, + ExponentialCompactionWindowFactory.class, CompactionWindowFactory.class); + conf.setLong(ExponentialCompactionWindowFactory.BASE_WINDOW_MILLIS_KEY, Long.MAX_VALUE / 2); + conf.setInt(ExponentialCompactionWindowFactory.WINDOWS_PER_TIER_KEY, 2); + conf.setBoolean(CompactionConfiguration.SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, false); + + // Special settings for compaction policy per window + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2); + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 12); + this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F); + + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 20); + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 10); + } + + /** + * Major compaction with maximum values + * @throws IOException with error + */ + @Test + public void maxValuesForMajor() throws IOException { + long[] minTimestamps = new long[] { Long.MIN_VALUE, -100 }; + long[] maxTimestamps = new long[] { -8, Long.MAX_VALUE }; + long[] sizes = new long[] { 0, 1 }; + + compactEquals(Long.MAX_VALUE, + sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 0, 1 }, new long[] { + Long.MIN_VALUE, -4611686018427387903L, 0, 4611686018427387903L, 9223372036854775806L }, + true, true); + } +} diff --git a/pom.xml b/pom.xml index 0324c1c..aaed6eb 100644 --- a/pom.xml +++ b/pom.xml @@ -1233,6 +1233,7 @@ 1.0.8 2.11.6 1.46 + 2.9.2 2.4 1.8 @@ -1563,6 +1564,11 @@ ${netty.version}
+ joda-time + joda-time + ${joda-time.version} + + org.apache.thrift libthrift ${thrift.version} -- 1.9.1