From a453af6c42309a711d6795bc4236b09d49884623 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 15 Apr 2016 20:24:15 +0800 Subject: [PATCH] HBASE-15454 Archive store files older than max age --- hbase-server/pom.xml | 5 +- .../regionserver/DateTieredMultiFileWriter.java | 58 +++- .../hbase/regionserver/DateTieredStoreEngine.java | 15 +- .../regionserver/DateTieredStoreFileManager.java | 60 ++++ .../hbase/regionserver/DefaultStoreEngine.java | 4 +- .../regionserver/DefaultStoreFileManager.java | 16 +- .../apache/hadoop/hbase/regionserver/HStore.java | 5 +- .../hbase/regionserver/StoreFileManager.java | 7 + .../hbase/regionserver/StripeStoreFileManager.java | 6 + .../CalendricalHotColdCompactionWindowFactory.java | 187 +++++++++++ .../compactions/CompactionConfiguration.java | 53 +-- .../regionserver/compactions/CompactionWindow.java | 60 ++++ .../compactions/CompactionWindowFactory.java | 29 ++ .../compactions/DateTieredCompactionPolicy.java | 360 +++++++++++---------- .../compactions/DateTieredCompactionRequest.java | 18 +- .../compactions/DateTieredCompactor.java | 13 +- .../ExponentialCompactionWindowFactory.java | 122 +++++++ .../compactions/FIFOCompactionPolicy.java | 8 +- .../compactions/RatioBasedCompactionPolicy.java | 6 +- .../compactions/SortedCompactionPolicy.java | 40 +-- .../AbstractTestDateTieredCompactionPolicy.java | 97 ++++++ .../TestDateTieredCompactionPolicy.java | 121 ++----- .../TestDateTieredCompactionPolicyOverflow.java | 73 +++++ ...tCalendricalHotColdCompactionWindowFactory.java | 123 +++++++ .../regionserver/compactions/TestCompactor.java | 16 + .../TestDateTieredCompactionPolicyArchive.java | 120 +++++++ .../compactions/TestDateTieredCompactor.java | 17 +- pom.xml | 6 + 28 files changed, 1302 insertions(+), 343 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreFileManager.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CalendricalHotColdCompactionWindowFactory.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindow.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindowFactory.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExponentialCompactionWindowFactory.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyOverflow.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCalendricalHotColdCompactionWindowFactory.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactionPolicyArchive.java diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index d5f1e30..a17a9a2 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -535,7 +535,10 @@ io.netty netty-all - + + joda-time + joda-time + org.apache.htrace diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java index 2cea92f..6f5c8fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java @@ -17,8 +17,13 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.regionserver.DateTieredStoreFileManager.ARCHIVE_WINDOW_END_TIMESTAMP; +import static org.apache.hadoop.hbase.regionserver.DateTieredStoreFileManager.ARCHIVE_WINDOW_START_TIMESTAMP; + import java.io.IOException; import java.util.Collection; +import java.util.IdentityHashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -26,6 +31,8 @@ import java.util.TreeMap; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; /** * class for cell sink that separates the provided cells into multiple files for date tiered @@ -37,16 +44,40 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter { private final NavigableMap lowerBoundary2Writer = new TreeMap(); + private final IdentityHashMap> writer2ArchiveWindow + = new IdentityHashMap>(); + + private final Long highestBoundary; + + private final long archiveWindowBefore; + private final boolean needEmptyFile; /** + * @param archiveWindowBefore the window before this timestamp are all archive windows. Typical + * minor compaction will not consider window in this area, only archive or major + * compaction can pass in window boundaries before this value and we will append + * {@link DateTieredStoreFileManager#ARCHIVE_WINDOW_START_TIMESTAMP} and + * {@link DateTieredStoreFileManager#ARCHIVE_WINDOW_END_TIMESTAMP} to the store files in + * the archive window. * @param needEmptyFile whether need to create an empty store file if we haven't written out * anything. */ - public DateTieredMultiFileWriter(List lowerBoundaries, boolean needEmptyFile) { - for (Long lowerBoundary : lowerBoundaries) { - lowerBoundary2Writer.put(lowerBoundary, null); + public DateTieredMultiFileWriter(List boundaries, long archiveWindowBefore, + boolean needEmptyFile) { + assert boundaries.size() >= 2; + Iterator iter = boundaries.iterator(); + lowerBoundary2Writer.put(iter.next(), null); + for (;;) { + Long boundary = iter.next(); + if (iter.hasNext()) { + lowerBoundary2Writer.put(boundary, null); + } else { + highestBoundary = boundary.longValue(); + break; + } } + this.archiveWindowBefore = archiveWindowBefore; this.needEmptyFile = needEmptyFile; } @@ -57,6 +88,16 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter { if (writer == null) { writer = writerFactory.createWriter(); lowerBoundary2Writer.put(entry.getKey(), writer); + if (entry.getKey().longValue() < archiveWindowBefore) { + // maybe an archive window + Long higherBoundary = lowerBoundary2Writer.higherKey(entry.getKey()); + if (higherBoundary == null) { + higherBoundary = highestBoundary; + } + if (higherBoundary.longValue() <= archiveWindowBefore) { + writer2ArchiveWindow.put(writer, Pair.newPair(entry.getKey(), higherBoundary)); + } + } } writer.append(cell); } @@ -67,6 +108,17 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter { } @Override + protected void preCloseWriter(StoreFileWriter writer) throws IOException { + Pair archiveWindow = writer2ArchiveWindow.get(writer); + if (archiveWindow != null) { + writer.appendFileInfo(ARCHIVE_WINDOW_START_TIMESTAMP, + Bytes.toBytes(archiveWindow.getFirst().longValue())); + writer.appendFileInfo(ARCHIVE_WINDOW_END_TIMESTAMP, + Bytes.toBytes(archiveWindow.getSecond().longValue())); + } + } + + @Override protected void preCommitWriters() throws IOException { if (!needEmptyFile) { return; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java index 773baab..5ca6aaa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java @@ -41,11 +41,11 @@ import org.apache.hadoop.hbase.security.User; */ @InterfaceAudience.Private public class DateTieredStoreEngine extends StoreEngine { + DateTieredCompactionPolicy, DateTieredCompactor, DateTieredStoreFileManager> { + @Override public boolean needsCompaction(List filesCompacting) { - return compactionPolicy.needsCompaction(storeFileManager.getStorefiles(), - filesCompacting); + return compactionPolicy.needsCompaction(storeFileManager.getStorefiles(), filesCompacting); } @Override @@ -57,8 +57,8 @@ public class DateTieredStoreEngine extends StoreEngine compact(ThroughputController throughputController, User user) throws IOException { if (request instanceof DateTieredCompactionRequest) { - return compactor.compact(request, ((DateTieredCompactionRequest) request).getBoundaries(), - throughputController, user); + DateTieredCompactionRequest dateTieredRequest = (DateTieredCompactionRequest) request; + return compactor.compact(request, dateTieredRequest.getBoundaries(), + dateTieredRequest.getArchiveWindowBefore(), throughputController, user); } else { throw new IllegalArgumentException("DateTieredCompactionRequest is expected. Actual: " + request.getClass().getCanonicalName()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreFileManager.java new file mode 100644 index 0000000..47fb65bd --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreFileManager.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Comparator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * StoreFileManager for date tiered compaction. + */ +@InterfaceAudience.Private +public class DateTieredStoreFileManager extends DefaultStoreFileManager { + + /** + * The file metadata fields that contain the archive window information. + */ + public static final byte[] ARCHIVE_WINDOW_START_TIMESTAMP = Bytes + .toBytes("ARCHIVE_WINDOW_START_TIMESTAMP"); + public static final byte[] ARCHIVE_WINDOW_END_TIMESTAMP = Bytes + .toBytes("ARCHIVE_WINDOW_END_TIMESTAMP"); + + public DateTieredStoreFileManager(CellComparator kvComparator, + Comparator storeFileComparator, Configuration conf, + CompactionConfiguration comConf) { + super(kvComparator, storeFileComparator, conf, comConf); + } + + private static Long getLongMetadataValue(StoreFile file, byte[] key) { + byte[] v = file.getMetadataValue(key); + return v != null ? Bytes.toLong(v) : null; + } + + public static Long getArchiveWindowStartTimestamp(StoreFile file) { + return getLongMetadataValue(file, ARCHIVE_WINDOW_START_TIMESTAMP); + } + + public static Long getArchiveWindowEndTimestamp(StoreFile file) { + return getLongMetadataValue(file, ARCHIVE_WINDOW_END_TIMESTAMP); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java index 1a059d7..cdacbce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java @@ -68,8 +68,8 @@ public class DefaultStoreEngine extends StoreEngine< createCompactor(conf, store); createCompactionPolicy(conf, store); createStoreFlusher(conf, store); - storeFileManager = new DefaultStoreFileManager(kvComparator, conf, compactionPolicy.getConf()); - + storeFileManager = new DefaultStoreFileManager(kvComparator, StoreFile.Comparators.SEQ_ID, conf, + compactionPolicy.getConf()); } protected void createCompactor(Configuration conf, Store store) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java index d38306c..792b2d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; @@ -48,7 +49,7 @@ class DefaultStoreFileManager implements StoreFileManager { private final CellComparator kvComparator; private final CompactionConfiguration comConf; private final int blockingFileCount; - + private final Comparator storeFileComparator; /** * List of store files inside this store. This is an immutable list that * is atomically replaced when its contents change. @@ -62,9 +63,11 @@ class DefaultStoreFileManager implements StoreFileManager { */ private volatile List compactedfiles = null; - public DefaultStoreFileManager(CellComparator kvComparator, Configuration conf, + public DefaultStoreFileManager(CellComparator kvComparator, + Comparator storeFileComparator, Configuration conf, CompactionConfiguration comConf) { this.kvComparator = kvComparator; + this.storeFileComparator = storeFileComparator; this.comConf = comConf; this.blockingFileCount = conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT); @@ -210,13 +213,13 @@ class DefaultStoreFileManager implements StoreFileManager { } private void sortAndSetStoreFiles(List storeFiles) { - Collections.sort(storeFiles, StoreFile.Comparators.SEQ_ID); + Collections.sort(storeFiles, storeFileComparator); storefiles = ImmutableList.copyOf(storeFiles); } private List sortCompactedfiles(List storefiles) { // Sorting may not be really needed here for the compacted files? - Collections.sort(storefiles, StoreFile.Comparators.SEQ_ID); + Collections.sort(storefiles, storeFileComparator); return new ArrayList(storefiles); } @@ -229,5 +232,10 @@ class DefaultStoreFileManager implements StoreFileManager { } return (double) (storefileCount - minFilesToCompact) / (blockingFileCount - minFilesToCompact); } + + @Override + public Comparator getStoreFileComparator() { + return storeFileComparator; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 7468be0..f82f4a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1466,7 +1466,8 @@ public class HStore implements Store { filesToCompact = filesToCompact.subList(count - N, count); isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount()); filesCompacting.addAll(filesToCompact); - Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID); + Collections.sort(filesCompacting, + storeEngine.getStoreFileManager().getStoreFileComparator()); } } finally { this.lock.readLock().unlock(); @@ -1655,7 +1656,7 @@ public class HStore implements Store { Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting); } filesCompacting.addAll(filesToAdd); - Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID); + Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator()); } private void removeUnneededFiles() throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java index 7e70547..13748f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.Collection; +import java.util.Comparator; import java.util.Iterator; import java.util.List; @@ -163,4 +164,10 @@ public interface StoreFileManager { * @see Store#getCompactionPressure() */ double getCompactionPressure(); + + /** + * @return the comparator used to sort storefiles. Usually, the + * {@link StoreFile#getMaxSequenceId()} is the first priority. + */ + Comparator getStoreFileComparator(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java index ef2c282..df1ddf2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -1072,4 +1073,9 @@ public class StripeStoreFileManager } return max; } + + @Override + public Comparator getStoreFileComparator() { + return StoreFile.Comparators.SEQ_ID; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CalendricalHotColdCompactionWindowFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CalendricalHotColdCompactionWindowFactory.java new file mode 100644 index 0000000..6a3d51a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CalendricalHotColdCompactionWindowFactory.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.joda.time.DateTimeField; +import org.joda.time.DateTimeFieldType; +import org.joda.time.DateTimeZone; +import org.joda.time.chrono.ISOChronology; + +/** + * One hot window tier, and one cold window tier(the number of window in this tier grow as time + * elapsed). + *

+ * The number of windows in hot tier can be configured using {@link #HOT_WINDOWS_PER_TIER}. If you + * set N windows in hot tier, let D be the duration of hot window unit, then the width of hot tier + * will between (N - 1) * D and N * D. So typically you should at least set the value to 2, + * otherwise you will have no hot window when you just cross the hot window unit boundary. And also, + * the width of hot tier should be less than max age. + *

+ * The first cold window may not contains a whole cold window unit, it will start from the first + * cold window unit boundary before the hot tier and end at the last hot window's start boundary. + *

+ * Notice that, the order we use here is from newer to older, so 'first' means the newest one and + * 'last' means the oldest one. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class CalendricalHotColdCompactionWindowFactory extends CompactionWindowFactory { + + private static final Log LOG = LogFactory.getLog(CalendricalHotColdCompactionWindowFactory.class); + + private static final ImmutableMap UNIT_TO_TYPE = ImmutableMap.of("Y", + DateTimeFieldType.year(), "M", DateTimeFieldType.monthOfYear(), "W", + DateTimeFieldType.weekOfWeekyear(), "D", DateTimeFieldType.dayOfMonth(), "H", + DateTimeFieldType.hourOfDay()); + + public static final String WINDOW_TIME_ZONE = + "hbase.hstore.compaction.date.tiered.calendrical.time.zone"; + public static final String HOT_WINDOW_UNIT = + "hbase.hstore.compaction.date.tiered.calendrical.base.hot.window.unit"; + public static final String HOT_WINDOWS_PER_TIER = + "hbase.hstore.compaction.date.tiered.calendrical.hot.windows.per.tier"; + public static final String COLD_WINDOW_UNIT = + "hbase.hstore.compaction.date.tiered.calendrical.cold.unit"; + + private static abstract class BaseWindow extends CompactionWindow { + + protected final long startMillis; + + protected final long endMillis; + + protected BaseWindow(long startMillis, long endMillis) { + this.startMillis = startMillis; + this.endMillis = endMillis; + } + + @Override + public int compareToTimestamp(long timestamp) { + if (timestamp >= endMillis) { + return -1; + } + if (timestamp < startMillis) { + return 1; + } + return 0; + } + + @Override + public long startMillis() { + return startMillis; + } + + @Override + public long endMillis() { + return endMillis; + } + } + + private final class HotWindow extends BaseWindow { + + private final int pos; + + public HotWindow(int pos, long startMillis, long endMillis) { + super(startMillis, endMillis); + this.pos = pos; + } + + @Override + public CompactionWindow nextWindow(long oldestToCompact) { + int nextPos = pos + 1; + if (nextPos == hotWindowsPerTier) { + long nearestColdWindowBoundary = coldWindowUnit.roundFloor(startMillis); + if (startMillis == nearestColdWindowBoundary) { + return new ColdWindow(coldWindowUnit.add(nearestColdWindowBoundary, -1), + nearestColdWindowBoundary); + } else { + return new ColdWindow(nearestColdWindowBoundary, startMillis); + } + } else { + return new HotWindow(nextPos, hotWindowUnit.add(startMillis, -1), startMillis); + } + } + } + + private final class ColdWindow extends BaseWindow { + + public ColdWindow(long startMillis, long endMillis) { + super(startMillis, endMillis); + } + + @Override + public CompactionWindow nextWindow(long oldestToCompact) { + return new ColdWindow(coldWindowUnit.add(startMillis, -1), startMillis); + } + } + + private final DateTimeZone zone; + + private final DateTimeField hotWindowUnit; + + private final int hotWindowsPerTier; + + private final DateTimeField coldWindowUnit; + + private void sanityCheckFactoryConfig(CompactionConfiguration comConf) { + Preconditions.checkArgument(hotWindowsPerTier > 0, "Negative or zero hot windows per tier: %d", + hotWindowsPerTier); + Preconditions.checkArgument( + hotWindowUnit.getDurationField().getMillis(hotWindowsPerTier) <= comConf + .getMaxStoreFileAgeMillis(), + "hot window tier %d %s(s) is larger than max age", hotWindowsPerTier, hotWindowUnit, + comConf.getMaxStoreFileAgeMillis()); + } + + public CalendricalHotColdCompactionWindowFactory(CompactionConfiguration comConf) { + Configuration conf = comConf.conf; + String zoneId = conf.get(WINDOW_TIME_ZONE); + if (StringUtils.isBlank(zoneId)) { + this.zone = DateTimeZone.getDefault(); + } else { + this.zone = DateTimeZone.forID(zoneId); + } + ISOChronology chronology = ISOChronology.getInstance(zone); + this.hotWindowUnit = UNIT_TO_TYPE.get(conf.get(HOT_WINDOW_UNIT, "D")).getField(chronology); + this.hotWindowsPerTier = conf.getInt(HOT_WINDOWS_PER_TIER, 4); + this.coldWindowUnit = UNIT_TO_TYPE.get(conf.get(COLD_WINDOW_UNIT, "M")).getField(chronology); + sanityCheckFactoryConfig(comConf); + LOG.info(this); + } + + @Override + public CompactionWindow newIncomingWindow(long now) { + long startMillis = hotWindowUnit.roundFloor(now); + long endMillis = hotWindowUnit.add(startMillis, 1); + return new HotWindow(0, startMillis, endMillis); + } + + @Override + public String toString() { + return String.format( + "%s [time zone %s, hot window unit %s, hot windows per tier %d," + " cold window unit %s]", + getClass().getSimpleName(), zone, hotWindowUnit, hotWindowsPerTier, coldWindowUnit); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java index 97cc404..5d85a63 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -72,10 +72,6 @@ public class CompactionConfiguration { */ public static final String MAX_AGE_MILLIS_KEY = "hbase.hstore.compaction.date.tiered.max.storefile.age.millis"; - public static final String BASE_WINDOW_MILLIS_KEY = - "hbase.hstore.compaction.date.tiered.base.window.millis"; - public static final String WINDOWS_PER_TIER_KEY = - "hbase.hstore.compaction.date.tiered.windows.per.tier"; public static final String INCOMING_WINDOW_MIN_KEY = "hbase.hstore.compaction.date.tiered.incoming.window.min"; public static final String COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY = @@ -86,6 +82,15 @@ public class CompactionConfiguration { private static final Class DEFAULT_TIER_COMPACTION_POLICY_CLASS = ExploringCompactionPolicy.class; + public static final String COMPACTION_WINDOW_FACTORY_CLASS = + "hbase.hstore.compaction.date.tiered.window.factory.class"; + + private static final Class DEFAULT_COMPACTION_WINDOW_FACTORY_CLASS = + ExponentialCompactionWindowFactory.class; + + public static final String ARCHIVE_FILES_OLDER_THAN_MAX_AGE = + "hbase.hstore.compaction.date.tiered.archive.files.older.than.max.age"; + Configuration conf; StoreConfigInformation storeConfigInfo; @@ -103,11 +108,11 @@ public class CompactionConfiguration { private final float majorCompactionJitter; private final float minLocalityToForceCompact; private final long maxStoreFileAgeMillis; - private final long baseWindowMillis; - private final int windowsPerTier; private final int incomingWindowMin; private final String compactionPolicyForTieredWindow; private final boolean singleOutputForMinorCompaction; + private final String compactionWindowFactory; + private final boolean archiveFilesOlderThanMaxAge; CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) { this.conf = conf; @@ -132,14 +137,14 @@ public class CompactionConfiguration { minLocalityToForceCompact = conf.getFloat(HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT, 0f); maxStoreFileAgeMillis = conf.getLong(MAX_AGE_MILLIS_KEY, Long.MAX_VALUE); - baseWindowMillis = conf.getLong(BASE_WINDOW_MILLIS_KEY, 3600000 * 6); - windowsPerTier = conf.getInt(WINDOWS_PER_TIER_KEY, 4); incomingWindowMin = conf.getInt(INCOMING_WINDOW_MIN_KEY, 6); compactionPolicyForTieredWindow = conf.get(COMPACTION_POLICY_CLASS_FOR_TIERED_WINDOWS_KEY, DEFAULT_TIER_COMPACTION_POLICY_CLASS.getName()); singleOutputForMinorCompaction = conf.getBoolean(SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, true); - + this.compactionWindowFactory = conf.get(COMPACTION_WINDOW_FACTORY_CLASS, + DEFAULT_COMPACTION_WINDOW_FACTORY_CLASS.getName()); + this.archiveFilesOlderThanMaxAge = conf.getBoolean(ARCHIVE_FILES_OLDER_THAN_MAX_AGE, false); LOG.info(this); } @@ -148,8 +153,9 @@ public class CompactionConfiguration { return String.format( "size [%d, %d, %d); files [%d, %d); ratio %f; off-peak ratio %f; throttle point %d;" + " major period %d, major jitter %f, min locality to compact %f;" - + " tiered compaction: max_age %d, base window in milliseconds %d, windows per tier %d," - + "incoming window min %d", + + " tiered compaction: max_age %d, incoming window min %d," + + " compaction policy for tiered window %s, single output for minor %b," + + " compaction window factory %s, archive files older than max age %b", minCompactSize, maxCompactSize, offPeakMaxCompactSize, @@ -162,9 +168,12 @@ public class CompactionConfiguration { majorCompactionJitter, minLocalityToForceCompact, maxStoreFileAgeMillis, - baseWindowMillis, - windowsPerTier, - incomingWindowMin); + incomingWindowMin, + compactionPolicyForTieredWindow, + singleOutputForMinorCompaction, + compactionWindowFactory, + archiveFilesOlderThanMaxAge + ); } /** @@ -265,14 +274,6 @@ public class CompactionConfiguration { return maxStoreFileAgeMillis; } - public long getBaseWindowMillis() { - return baseWindowMillis; - } - - public int getWindowsPerTier() { - return windowsPerTier; - } - public int getIncomingWindowMin() { return incomingWindowMin; } @@ -284,4 +285,12 @@ public class CompactionConfiguration { public boolean useSingleOutputForMinorCompaction() { return singleOutputForMinorCompaction; } + + public String getCompactionWindowFactory() { + return compactionWindowFactory; + } + + public boolean isArchiveFilesOlderThanMaxAge() { + return archiveFilesOlderThanMaxAge; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindow.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindow.java new file mode 100644 index 0000000..171afc1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindow.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Base class for compaction window implementation. + *

+ * It is better make sure that the window before {@code oldestToCompact} is fixed and never promoted + * since we will archive the files in each window into a single file. + */ +@InterfaceAudience.Private +public abstract class CompactionWindow { + + /** + * Compares the window to a timestamp. + * @param timestamp the timestamp to compare. + * @return a negative integer, zero, or a positive integer as the window lies before, covering, or + * after than the timestamp. + */ + public abstract int compareToTimestamp(long timestamp); + + /** + * Move to the new window of the same tier or of the next tier, which represents an earlier time + * span. + * @return The next window + */ + public abstract CompactionWindow nextWindow(long oldestToCompact); + + /** + * Inclusive lower bound + */ + public abstract long startMillis(); + + /** + * Exclusive upper bound + */ + public abstract long endMillis(); + + @Override + public String toString() { + return "[" + startMillis() + ", " + endMillis() + ")"; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindowFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindowFactory.java new file mode 100644 index 0000000..01b708e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionWindowFactory.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * For creating compaction window. + */ +@InterfaceAudience.Private +public abstract class CompactionWindowFactory { + + public abstract CompactionWindow newIncomingWindow(long now); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java index d61af42..f27674c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java @@ -18,25 +18,22 @@ */ package org.apache.hadoop.hbase.regionserver.compactions; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import com.google.common.collect.PeekingIterator; -import com.google.common.math.LongMath; - import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.PeekingIterator; +import com.google.common.math.LongMath; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.RSRpcServices; @@ -50,21 +47,31 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; /** * HBASE-15181 This is a simple implementation of date-based tiered compaction similar to * Cassandra's for the following benefits: - * 1. Improve date-range-based scan by structuring store files in date-based tiered layout. - * 2. Reduce compaction overhead. - * 3. Improve TTL efficiency. + *

    + *
  1. Improve date-range-based scan by structuring store files in date-based tiered layout.
  2. + *
  3. Reduce compaction overhead.
  4. + *
  5. Improve TTL efficiency.
  6. + *
* Perfect fit for the use cases that: - * 1. has mostly date-based data write and scan and a focus on the most recent data. - * Out-of-order writes are handled gracefully. Time range overlapping among store files is - * tolerated and the performance impact is minimized. Configuration can be set at hbase-site - * or overridden at per-table or per-column-family level by hbase shell. Design spec is at + *
    + *
  1. has mostly date-based data write and scan and a focus on the most recent data.
  2. + *
+ * Out-of-order writes are handled gracefully. Time range overlapping among store files is tolerated + * and the performance impact is minimized. Configuration can be set at hbase-site or overridden at + * per-table or per-column-family level by hbase shell. Design spec is at * https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/ */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) public class DateTieredCompactionPolicy extends SortedCompactionPolicy { + private static final Log LOG = LogFactory.getLog(DateTieredCompactionPolicy.class); - private RatioBasedCompactionPolicy compactionPolicyPerWindow; + private static DateTieredCompactionRequest EMPTY_REQUEST = new DateTieredCompactionRequest( + Collections. emptyList(), Collections. emptyList(), Long.MIN_VALUE); + + private final RatioBasedCompactionPolicy compactionPolicyPerWindow; + + private final CompactionWindowFactory windowFactory; public DateTieredCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) throws IOException { @@ -78,6 +85,14 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { throw new IOException("Unable to load configured compaction policy '" + comConf.getCompactionPolicyForTieredWindow() + "'", e); } + try { + windowFactory = ReflectionUtils.instantiateWithCustomCtor( + comConf.getCompactionWindowFactory(), new Class[] { CompactionConfiguration.class }, + new Object[] { comConf }); + } catch (Exception e) { + throw new IOException("Unable to load configured compaction policy '" + + comConf.getCompactionPolicyForTieredWindow() + "'", e); + } } /** @@ -87,9 +102,11 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { @VisibleForTesting public boolean needsCompaction(final Collection storeFiles, final List filesCompacting) { - ArrayList candidates = new ArrayList(storeFiles); + ArrayList candidates = getCurrentEligibleFiles(new ArrayList(storeFiles), + filesCompacting); try { - return selectMinorCompaction(candidates, false, true) != null; + return !selectMinorCompaction(storeFiles, filesCompacting, candidates, false, true).getFiles() + .isEmpty(); } catch (Exception e) { LOG.error("Can not check for compaction: ", e); return false; @@ -119,7 +136,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { for (StoreFile file: filesToCompact) { Long minTimestamp = file.getMinimumTimestamp(); long oldest = (minTimestamp == null) ? (Long)Long.MIN_VALUE : now - minTimestamp.longValue(); - if (cfTTL != HConstants.FOREVER && oldest >= cfTTL) { + if (cfTTL != Long.MAX_VALUE && oldest >= cfTTL) { LOG.debug("Major compaction triggered on store " + this + "; for TTL maintenance"); return true; @@ -164,10 +181,12 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { } @Override - protected CompactionRequest createCompactionRequest(ArrayList candidateSelection, - boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { + protected CompactionRequest createCompactionRequest(Collection allStoreFiles, + List filesCompacting, ArrayList candidateSelection, boolean tryingMajor, + boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { CompactionRequest result = tryingMajor ? selectMajorCompaction(candidateSelection) - : selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck); + : selectMinorCompaction(allStoreFiles, filesCompacting, candidateSelection, mayUseOffPeak, + mayBeStuck); LOG.debug("Generated compaction request: " + result); return result; } @@ -176,9 +195,121 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { long now = EnvironmentEdgeManager.currentTime(); long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now); return new DateTieredCompactionRequest(candidateSelection, - this.getCompactBoundariesForMajor(candidateSelection, oldestToCompact, now)); + this.getCompactBoundariesForMajor(candidateSelection, oldestToCompact, now), + oldestToCompact); } + private boolean shouldIncludeInArchive(long startMillis, long endMillis, StoreFile file) { + if (file.getMinimumTimestamp() == null) { + if (file.getMaximumTimestamp() == null) { + return true; + } else { + return file.getMaximumTimestamp().longValue() >= startMillis; + } + } else { + if (file.getMaximumTimestamp() == null) { + return file.getMinimumTimestamp().longValue() < endMillis; + } else { + return file.getMinimumTimestamp().longValue() < endMillis + && file.getMaximumTimestamp().longValue() >= startMillis; + } + } + } + + private boolean canDropWholeFile(long now, long cfTTL, StoreFile file) { + long expiredBefore = now - cfTTL; + return cfTTL != Long.MAX_VALUE && file.getMaximumTimestamp() != null + && file.getMaximumTimestamp().longValue() < expiredBefore; + } + + private boolean fitsInWindow(long startMillis, long endMillis, StoreFile file) { + return file.getMinimumTimestamp() != null && file.getMaximumTimestamp() != null + && file.getMinimumTimestamp().longValue() >= startMillis + && file.getMaximumTimestamp().longValue() < endMillis; + } + + private CompactionRequest tryArchive(Collection allStoreFiles, + List filesCompacting, CompactionWindow newestArchiveWindow, long oldestToCompact, + long now) { + if (!comConf.isArchiveFilesOlderThanMaxAge()) { + // A non-null file list is expected by HStore + return EMPTY_REQUEST; + } + long minTimestamp = Long.MAX_VALUE; + for (StoreFile file : allStoreFiles) { + minTimestamp = Math.min(minTimestamp, file.getMinimumTimestamp() == null ? Long.MAX_VALUE + : file.getMinimumTimestamp().longValue()); + } + List archiveBoundaries = Lists.newArrayList(); + archiveBoundaries.add(newestArchiveWindow.endMillis()); + for (CompactionWindow window = newestArchiveWindow; window + .compareToTimestamp(minTimestamp) >= 0; window = window.nextWindow(oldestToCompact)) { + archiveBoundaries.add(window.startMillis()); + } + Collections.reverse(archiveBoundaries); + if (archiveBoundaries.size() < 2) { + return EMPTY_REQUEST; + } + List candidates = Lists.newArrayList(allStoreFiles); + for (int i = 0, n = archiveBoundaries.size() - 1; i < n; i++) { + long startMillis = archiveBoundaries.get(i); + long endMillis = archiveBoundaries.get(i + 1); + int first = 0, total = candidates.size(); + for (; first < total; first++) { + if (shouldIncludeInArchive(startMillis, endMillis, candidates.get(first))) { + break; + } + } + if (first == total) { + continue; + } + int last = total - 1; + for (; last > first; last--) { + if (shouldIncludeInArchive(startMillis, endMillis, candidates.get(last))) { + break; + } + } + if (last == first) { + StoreFile file = candidates.get(first); + // If we could drop the whole file due to TTL then we can create a compaction request. + // And also check if the only file fits in the window. Otherwise we still need a compaction + // to move the data that does not belong to this window to other windows. + if (!canDropWholeFile(now, storeConfigInfo.getStoreFileTtl(), file) + && fitsInWindow(startMillis, endMillis, file)) { + continue; + } + } + if (!filesCompacting.isEmpty()) { + // check if we are overlapped with filesCompacting. + int firstCompactingIdx = candidates.indexOf(filesCompacting.get(0)); + int lastCompactingIdx = candidates.indexOf(filesCompacting.get(filesCompacting.size() - 1)); + assert firstCompactingIdx >= 0; + assert lastCompactingIdx >= 0; + if (last >= firstCompactingIdx && first <= lastCompactingIdx) { + continue; + } + } + if (last - first + 1 > comConf.getMaxFilesToCompact()) { + LOG.warn("Too many files(got " + (last - first + 1) + ", expected less than or equal to " + + comConf.getMaxFilesToCompact() + ") to compact when archiving [" + startMillis + ", " + + endMillis + "), give up"); + continue; + } + for (int j = first; j <= last; j++) { + StoreFile file = candidates.get(j); + if (file.excludeFromMinorCompaction()) { + LOG.warn("Find bulk load file " + file.getPath() + + " which is configured to be excluded from minor compaction when archiving [" + + startMillis + ", " + endMillis + ") give up"); + continue; + } + } + List filesToCompact = Lists.newArrayList(candidates.subList(first, last + 1)); + return new DateTieredCompactionRequest(filesToCompact, + getCompactBoundariesForMajor(filesToCompact, oldestToCompact, now), oldestToCompact); + } + return EMPTY_REQUEST; + } /** * We receive store files sorted in ascending order by seqId then scan the list of files. If the * current file has a maxTimestamp older than last known maximum, treat this file as it carries @@ -187,38 +318,37 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { * by seqId and maxTimestamp in descending order and build the time windows. All the out-of-order * data into the same compaction windows, guaranteeing contiguous compaction based on sequence id. */ - public CompactionRequest selectMinorCompaction(ArrayList candidateSelection, + public CompactionRequest selectMinorCompaction(Collection allStoreFiles, + List filesCompacting, ArrayList candidateSelection, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { long now = EnvironmentEdgeManager.currentTime(); long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now); - // Make sure the store files is sorted by SeqId then maxTimestamp - List storeFileList = Lists.newArrayList(filterOldStoreFiles(candidateSelection, - oldestToCompact)); - Collections.sort(storeFileList, StoreFile.Comparators.SEQ_ID_MAX_TIMESTAMP); - List> storefileMaxTimestampPairs = - Lists.newArrayListWithCapacity(Iterables.size(storeFileList)); + Lists.newArrayListWithCapacity(candidateSelection.size()); long maxTimestampSeen = Long.MIN_VALUE; - for (StoreFile storeFile : storeFileList) { + for (StoreFile storeFile : candidateSelection) { // if there is out-of-order data, // we put them in the same window as the last file in increasing order - maxTimestampSeen = Math.max(maxTimestampSeen, - storeFile.getMaximumTimestamp() == null? Long.MIN_VALUE : storeFile.getMaximumTimestamp()); + maxTimestampSeen = Math.max(maxTimestampSeen, storeFile.getMaximumTimestamp() == null + ? Long.MIN_VALUE : storeFile.getMaximumTimestamp().longValue()); storefileMaxTimestampPairs.add(new Pair(storeFile, maxTimestampSeen)); } Collections.reverse(storefileMaxTimestampPairs); - Window window = getIncomingWindow(now, comConf.getBaseWindowMillis()); + CompactionWindow window = getIncomingWindow(now); int minThreshold = comConf.getIncomingWindowMin(); PeekingIterator> it = Iterators.peekingIterator(storefileMaxTimestampPairs.iterator()); while (it.hasNext()) { + if (window.compareToTimestamp(oldestToCompact) < 0) { + // the whole window lies before oldestToCompact + break; + } int compResult = window.compareToTimestamp(it.peek().getSecond()); if (compResult > 0) { // If the file is too old for the window, switch to the next window - window = window.nextWindow(comConf.getWindowsPerTier(), - oldestToCompact); + window = window.nextWindow(oldestToCompact); minThreshold = comConf.getMinFilesToCompact(); } else { // The file is within the target window @@ -238,13 +368,12 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { } } } - // A non-null file list is expected by HStore - return new CompactionRequest(Collections. emptyList()); + return tryArchive(allStoreFiles, filesCompacting, window, oldestToCompact, now); } private DateTieredCompactionRequest generateCompactionRequest(ArrayList storeFiles, - Window window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold) - throws IOException { + CompactionWindow window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold) + throws IOException { // The files has to be in ascending order for ratio-based compaction to work right // and removeExcessFile to exclude youngest files. Collections.reverse(storeFiles); @@ -259,74 +388,63 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { boolean singleOutput = storeFiles.size() != storeFileSelection.size() || comConf.useSingleOutputForMinorCompaction(); List boundaries = getCompactionBoundariesForMinor(window, singleOutput); + // minor compaction does not deal with window beyond max age so it is safe to just pass + // Long.MIN_VALUE as archiveWindowBefore. DateTieredCompactionRequest result = new DateTieredCompactionRequest(storeFileSelection, - boundaries); + boundaries, Long.MIN_VALUE); return result; } return null; } /** - * Return a list of boundaries for multiple compaction output - * in ascending order. + * Return a list of boundaries for multiple compaction output in ascending order. */ private List getCompactBoundariesForMajor(Collection filesToCompact, - long oldestToCompact, long now) { + long oldestToCompact, long now) { long minTimestamp = Long.MAX_VALUE; + long maxTimestamp = Long.MIN_VALUE; for (StoreFile file : filesToCompact) { - minTimestamp = Math.min(minTimestamp, - file.getMinimumTimestamp() == null? Long.MAX_VALUE : file.getMinimumTimestamp()); + minTimestamp = Math.min(minTimestamp, file.getMinimumTimestamp() == null ? Long.MAX_VALUE + : file.getMinimumTimestamp().longValue()); + maxTimestamp = Math.max(maxTimestamp, file.getMaximumTimestamp() == null ? Long.MIN_VALUE + : file.getMaximumTimestamp().longValue()); } - List boundaries = new ArrayList(); + List boundaries = Lists.newArrayList(); + CompactionWindow window = getIncomingWindow(now); + // find the first window that covers the max timestamp. + while (window.compareToTimestamp(maxTimestamp) > 0) { + window = window.nextWindow(oldestToCompact); + } + boundaries.add(window.endMillis()); - // Add startMillis of all windows between now and min timestamp - for (Window window = getIncomingWindow(now, comConf.getBaseWindowMillis()); - window.compareToTimestamp(minTimestamp) > 0; - window = window.nextWindow(comConf.getWindowsPerTier(), oldestToCompact)) { + // Add startMillis of all windows between overall max and min timestamp + for (; window.compareToTimestamp(minTimestamp) > 0; window = window + .nextWindow(oldestToCompact)) { boundaries.add(window.startMillis()); } - boundaries.add(Long.MIN_VALUE); + boundaries.add(minTimestamp); Collections.reverse(boundaries); return boundaries; } /** - * @return a list of boundaries for multiple compaction output - * from minTimestamp to maxTimestamp. + * @return a list of boundaries for multiple compaction output from minTimestamp to maxTimestamp. */ - private static List getCompactionBoundariesForMinor(Window window, boolean singleOutput) { + private static List getCompactionBoundariesForMinor(CompactionWindow window, + boolean singleOutput) { List boundaries = new ArrayList(); boundaries.add(Long.MIN_VALUE); if (!singleOutput) { boundaries.add(window.startMillis()); } + boundaries.add(Long.MAX_VALUE); return boundaries; } - /** - * Removes all store files with max timestamp older than (current - maxAge). - * @param storeFiles all store files to consider - * @param maxAge the age in milliseconds when a store file stops participating in compaction. - * @return a list of storeFiles with the store file older than maxAge excluded - */ - private static Iterable filterOldStoreFiles(List storeFiles, - final long cutoff) { - return Iterables.filter(storeFiles, new Predicate() { - @Override - public boolean apply(StoreFile storeFile) { - // Known findbugs issue to guava. SuppressWarning or Nonnull annotation don't work. - if (storeFile == null) { - return false; - } - Long maxTimestamp = storeFile.getMaximumTimestamp(); - return maxTimestamp == null ? true : maxTimestamp >= cutoff; - } - }); - } - - private static Window getIncomingWindow(long now, long baseWindowMillis) { - return new Window(baseWindowMillis, now / baseWindowMillis); + private CompactionWindow getIncomingWindow(long now) { + return windowFactory.newIncomingWindow(now); } private static long getOldestToCompact(long maxAgeMillis, long now) { @@ -338,88 +456,4 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { return Long.MIN_VALUE; } } - - /** - * This is the class we use to partition from epoch time to now into tiers of exponential sizes of - * windows. - */ - private static final class Window { - /** - * How big a range of timestamps fit inside the window in milliseconds. - */ - private final long windowMillis; - - /** - * A timestamp t is within the window iff t / size == divPosition. - */ - private final long divPosition; - - private Window(long baseWindowMillis, long divPosition) { - windowMillis = baseWindowMillis; - this.divPosition = divPosition; - } - - /** - * Compares the window to a timestamp. - * @param timestamp the timestamp to compare. - * @return a negative integer, zero, or a positive integer as the window lies before, covering, - * or after than the timestamp. - */ - public int compareToTimestamp(long timestamp) { - if (timestamp < 0) { - try { - timestamp = LongMath.checkedSubtract(timestamp, windowMillis - 1); - } catch (ArithmeticException ae) { - timestamp = Long.MIN_VALUE; - } - } - long pos = timestamp / windowMillis; - return divPosition == pos ? 0 : divPosition < pos ? -1 : 1; - } - - /** - * Move to the new window of the same tier or of the next tier, which represents an earlier time - * span. - * @param windowsPerTier The number of contiguous windows that will have the same size. Windows - * following those will be tierBase times as big. - * @return The next window - */ - public Window nextWindow(int windowsPerTier, long oldestToCompact) { - // Don't promote to the next tier if there is not even 1 window at current tier - // or if the next window crosses the max age. - if (divPosition % windowsPerTier > 0 || - startMillis() - windowMillis * windowsPerTier < oldestToCompact) { - return new Window(windowMillis, divPosition - 1); - } else { - return new Window(windowMillis * windowsPerTier, divPosition / windowsPerTier - 1); - } - } - - /** - * Inclusive lower bound - */ - public long startMillis() { - try { - return LongMath.checkedMultiply(windowMillis, divPosition); - } catch (ArithmeticException ae) { - return Long.MIN_VALUE; - } - } - - /** - * Exclusive upper bound - */ - public long endMillis() { - try { - return LongMath.checkedMultiply(windowMillis, (divPosition + 1)); - } catch (ArithmeticException ae) { - return Long.MAX_VALUE; - } - } - - @Override - public String toString() { - return "[" + startMillis() + ", " + endMillis() + ")"; - } - } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java index b33663f..ddb6091 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java @@ -26,19 +26,29 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_DOESNT_OVERRIDE_EQUALS", justification="It is intended to use the same equal method as superclass") public class DateTieredCompactionRequest extends CompactionRequest { - private List boundaries; - public DateTieredCompactionRequest(Collection files, List boundaryList) { + private final List boundaries; + + private final long archiveWindowBefore; + + public DateTieredCompactionRequest(Collection files, List boundaries, + long archiveWindowBefore) { super(files); - boundaries = boundaryList; + this.boundaries = boundaries; + this.archiveWindowBefore = archiveWindowBefore; } public List getBoundaries() { return boundaries; } + public long getArchiveWindowBefore() { + return archiveWindowBefore; + } + @Override public String toString() { - return super.toString() + " boundaries=" + Arrays.toString(boundaries.toArray()); + return super.toString() + " boundaries=" + Arrays.toString(boundaries.toArray()) + + " archiveWindowBefore=" + archiveWindowBefore; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java index b1203c5..affd07b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java @@ -50,11 +50,12 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor compact(final CompactionRequest request, final List lowerBoundaries, - ThroughputController throughputController, User user) throws IOException { + public List compact(final CompactionRequest request, final List boundaries, + final long archiveWindowBefore, ThroughputController throughputController, User user) + throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("Executing compaction with " + lowerBoundaries.size() - + "windows, lower boundaries: " + lowerBoundaries); + LOG.debug("Executing compaction with " + (boundaries.size() - 1) + "windows, boundaries: " + + boundaries); } return compact(request, defaultScannerFactory, @@ -63,8 +64,8 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor 0 + || startMillis() - windowMillis * windowsPerTier < oldestToCompact) { + return new Window(windowMillis, divPosition - 1); + } else { + return new Window(windowMillis * windowsPerTier, divPosition / windowsPerTier - 1); + } + } + + @Override + public long startMillis() { + try { + return LongMath.checkedMultiply(windowMillis, divPosition); + } catch (ArithmeticException ae) { + return Long.MIN_VALUE; + } + } + + @Override + public long endMillis() { + try { + return LongMath.checkedMultiply(windowMillis, (divPosition + 1)); + } catch (ArithmeticException ae) { + return Long.MAX_VALUE; + } + } + } + + private final long baseWindowMillis; + private final int windowsPerTier; + + public ExponentialCompactionWindowFactory(CompactionConfiguration comConf) { + Configuration conf = comConf.conf; + baseWindowMillis = conf.getLong(BASE_WINDOW_MILLIS_KEY, 3600000 * 6); + windowsPerTier = conf.getInt(WINDOWS_PER_TIER_KEY, 4); + LOG.info(this); + } + + @Override + public CompactionWindow newIncomingWindow(long now) { + return new Window(baseWindowMillis, now / baseWindowMillis); + } + + @Override + public String toString() { + return String.format("%s [base window in milliseconds %d, windows per tier %d]", + getClass().getSimpleName(), baseWindowMillis, windowsPerTier); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java index d339898..293afcf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java @@ -55,22 +55,22 @@ public class FIFOCompactionPolicy extends ExploringCompactionPolicy { } @Override - public CompactionRequest selectCompaction(Collection candidateFiles, + public CompactionRequest selectCompaction(Collection allStoreFiles, List filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak, boolean forceMajor) throws IOException { if(forceMajor){ LOG.warn("Major compaction is not supported for FIFO compaction policy. Ignore the flag."); } - boolean isAfterSplit = StoreUtils.hasReferences(candidateFiles); + boolean isAfterSplit = StoreUtils.hasReferences(allStoreFiles); if(isAfterSplit){ LOG.info("Split detected, delegate selection to the parent policy."); - return super.selectCompaction(candidateFiles, filesCompacting, isUserCompaction, + return super.selectCompaction(allStoreFiles, filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor); } // Nothing to compact - Collection toCompact = getExpiredStores(candidateFiles, filesCompacting); + Collection toCompact = getExpiredStores(allStoreFiles, filesCompacting); CompactionRequest result = new CompactionRequest(toCompact); return result; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java index 5600a4e..d29a047 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java @@ -101,9 +101,9 @@ public class RatioBasedCompactionPolicy extends SortedCompactionPolicy { } @Override - protected CompactionRequest createCompactionRequest(ArrayList - candidateSelection, boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) - throws IOException { + protected CompactionRequest createCompactionRequest(Collection allStoreFiles, + List filesCompacting, ArrayList candidateSelection, boolean tryingMajor, + boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { if (!tryingMajor) { candidateSelection = filterBulk(candidateSelection); candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java index 77b0af8..61be370 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java @@ -10,6 +10,7 @@ */ package org.apache.hadoop.hbase.regionserver.compactions; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Collections2; @@ -47,33 +48,35 @@ public abstract class SortedCompactionPolicy extends CompactionPolicy { } /** - * @param candidateFiles candidate files, ordered from oldest to newest by seqId. We rely on + * @param allStoreFiles candidate files, ordered from oldest to newest by seqId. We rely on * DefaultStoreFileManager to sort the files by seqId to guarantee contiguous compaction based * on seqId for data consistency. * @return subset copy of candidate list that meets compaction criteria */ - public CompactionRequest selectCompaction(Collection candidateFiles, + public CompactionRequest selectCompaction(Collection allStoreFiles, final List filesCompacting, final boolean isUserCompaction, final boolean mayUseOffPeak, final boolean forceMajor) throws IOException { // Preliminary compaction subject to filters - ArrayList candidateSelection = new ArrayList(candidateFiles); + ArrayList candidateSelection = new ArrayList(allStoreFiles); // Stuck and not compacting enough (estimate). It is not guaranteed that we will be // able to compact more if stuck and compacting, because ratio policy excludes some // non-compacting files from consideration during compaction (see getCurrentEligibleFiles). int futureFiles = filesCompacting.isEmpty() ? 0 : 1; - boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles) + boolean mayBeStuck = (allStoreFiles.size() - filesCompacting.size() + futureFiles) >= storeConfigInfo.getBlockingFileCount(); candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting); - LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " + - filesCompacting.size() + " compacting, " + candidateSelection.size() + - " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking"); + if (LOG.isDebugEnabled()) { + LOG.debug("Selecting compaction from " + allStoreFiles.size() + " store files, " + + filesCompacting.size() + " compacting, " + candidateSelection.size() + " eligible, " + + storeConfigInfo.getBlockingFileCount() + " blocking"); + } // If we can't have all files, we cannot do major anyway - boolean isAllFiles = candidateFiles.size() == candidateSelection.size(); + boolean isAllFiles = allStoreFiles.size() == candidateSelection.size(); if (!(forceMajor && isAllFiles)) { candidateSelection = skipLargeFiles(candidateSelection, mayUseOffPeak); - isAllFiles = candidateFiles.size() == candidateSelection.size(); + isAllFiles = allStoreFiles.size() == candidateSelection.size(); } // Try a major compaction if this is a user-requested major compaction, @@ -84,30 +87,30 @@ public abstract class SortedCompactionPolicy extends CompactionPolicy { // Or, if there are any references among the candidates. boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection); - CompactionRequest result = createCompactionRequest(candidateSelection, - isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck); + CompactionRequest result = createCompactionRequest(allStoreFiles, filesCompacting, + candidateSelection, isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck); ArrayList filesToCompact = Lists.newArrayList(result.getFiles()); removeExcessFiles(filesToCompact, isUserCompaction, isTryingMajor); result.updateFiles(filesToCompact); - isAllFiles = (candidateFiles.size() == filesToCompact.size()); + isAllFiles = (allStoreFiles.size() == filesToCompact.size()); result.setOffPeak(!filesToCompact.isEmpty() && !isAllFiles && mayUseOffPeak); result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles); return result; } - protected abstract CompactionRequest createCompactionRequest(ArrayList - candidateSelection, boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) - throws IOException; + protected abstract CompactionRequest createCompactionRequest(Collection allStoreFiles, + List filesCompacting, ArrayList candidateSelection, boolean tryingMajor, + boolean mayUseOffPeak, boolean mayBeStuck) throws IOException; - /* + /** * @param filesToCompact Files to compact. Can be null. * @return True if we should run a major compaction. */ public abstract boolean shouldPerformMajorCompaction(final Collection filesToCompact) - throws IOException; + throws IOException; /** * Used calculation jitter @@ -155,7 +158,8 @@ public abstract class SortedCompactionPolicy extends CompactionPolicy { public abstract boolean needsCompaction(final Collection storeFiles, final List filesCompacting); - protected ArrayList getCurrentEligibleFiles(ArrayList candidateFiles, + @VisibleForTesting + public ArrayList getCurrentEligibleFiles(ArrayList candidateFiles, final List filesCompacting) { // candidates = all storefiles not already in compaction queue if (!filesCompacting.isEmpty()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java new file mode 100644 index 0000000..b8a661c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import com.google.common.collect.Lists; + +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.junit.Assert; + +public abstract class AbstractTestDateTieredCompactionPolicy extends TestCompactionPolicy { + + protected ArrayList sfCreate(long[] minTimestamps, long[] maxTimestamps, long[] sizes) + throws IOException { + ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(timeMachine); + // Has to be > 0 and < now. + timeMachine.setValue(1); + ArrayList ageInDisk = new ArrayList(); + for (int i = 0; i < sizes.length; i++) { + ageInDisk.add(0L); + } + + ArrayList ret = Lists.newArrayList(); + for (int i = 0; i < sizes.length; i++) { + MockStoreFile msf = new MockStoreFile(TEST_UTIL, TEST_FILE, sizes[i], ageInDisk.get(i), false, + i); + msf.setTimeRangeTracker(new TimeRangeTracker(minTimestamps[i], maxTimestamps[i])); + ret.add(msf); + } + return ret; + } + + protected void compactEquals(long now, ArrayList candidates, long[] expectedFileSizes, + long[] expectedBoundaries, boolean isMajor, boolean toCompact) throws IOException { + compactEquals(now, candidates, Collections. emptyList(), expectedFileSizes, + expectedBoundaries, isMajor, toCompact); + } + + protected void compactEquals(long now, Collection candidateFiles, + List filesCompacting, long[] expectedFileSizes, long[] expectedBoundaries, + boolean isMajor, boolean toCompact) throws IOException { + compactEquals(now, candidateFiles, filesCompacting, expectedFileSizes, expectedBoundaries, + isMajor, toCompact, (DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy()); + } + + protected void compactEquals(long now, Collection candidateFiles, + List filesCompacting, long[] expectedFileSizes, long[] expectedBoundaries, + boolean isMajor, boolean toCompact, DateTieredCompactionPolicy policy) throws IOException { + ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(timeMachine); + timeMachine.setValue(now); + DateTieredCompactionRequest request; + if (isMajor) { + for (StoreFile file : candidateFiles) { + ((MockStoreFile) file).setIsMajor(true); + } + Assert.assertEquals(toCompact, policy.shouldPerformMajorCompaction(candidateFiles)); + request = (DateTieredCompactionRequest) policy + .selectMajorCompaction(Lists.newArrayList(candidateFiles)); + } else { + Assert.assertEquals(toCompact, policy.needsCompaction(candidateFiles, filesCompacting)); + request = (DateTieredCompactionRequest) policy.selectMinorCompaction(candidateFiles, + filesCompacting, + policy.getCurrentEligibleFiles(Lists.newArrayList(candidateFiles), filesCompacting), false, + false); + } + List actual = Lists.newArrayList(request.getFiles()); + Assert.assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual))); + Assert.assertEquals(Arrays.toString(expectedBoundaries), + Arrays.toString(request.getBoundaries().toArray())); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java index ecccbdd..c918634 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java @@ -17,47 +17,19 @@ */ package org.apache.hadoop.hbase.regionserver; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; -import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy; -import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionWindowFactory; +import org.apache.hadoop.hbase.regionserver.compactions.ExponentialCompactionWindowFactory; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; -import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category(SmallTests.class) -public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { - ArrayList sfCreate(long[] minTimestamps, long[] maxTimestamps, long[] sizes) - throws IOException { - ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); - EnvironmentEdgeManager.injectEdge(timeMachine); - // Has to be > 0 and < now. - timeMachine.setValue(1); - ArrayList ageInDisk = new ArrayList(); - for (int i = 0; i < sizes.length; i++) { - ageInDisk.add(0L); - } - - ArrayList ret = Lists.newArrayList(); - for (int i = 0; i < sizes.length; i++) { - MockStoreFile msf = - new MockStoreFile(TEST_UTIL, TEST_FILE, sizes[i], ageInDisk.get(i), false, i); - msf.setTimeRangeTracker(new TimeRangeTracker(minTimestamps[i], maxTimestamps[i])); - ret.add(msf); - } - return ret; - } +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestDateTieredCompactionPolicy extends AbstractTestDateTieredCompactionPolicy { @Override protected void config() { @@ -68,8 +40,10 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine"); conf.setLong(CompactionConfiguration.MAX_AGE_MILLIS_KEY, 100); conf.setLong(CompactionConfiguration.INCOMING_WINDOW_MIN_KEY, 3); - conf.setLong(CompactionConfiguration.BASE_WINDOW_MILLIS_KEY, 6); - conf.setInt(CompactionConfiguration.WINDOWS_PER_TIER_KEY, 4); + conf.setClass(CompactionConfiguration.COMPACTION_WINDOW_FACTORY_CLASS, + ExponentialCompactionWindowFactory.class, CompactionWindowFactory.class); + conf.setLong(ExponentialCompactionWindowFactory.BASE_WINDOW_MILLIS_KEY, 6); + conf.setInt(ExponentialCompactionWindowFactory.WINDOWS_PER_TIER_KEY, 4); conf.setBoolean(CompactionConfiguration.SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, false); // Special settings for compaction policy per window @@ -81,32 +55,6 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 10); } - void compactEquals(long now, ArrayList candidates, long[] expectedFileSizes, - long[] expectedBoundaries, boolean isMajor, boolean toCompact) throws IOException { - ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); - EnvironmentEdgeManager.injectEdge(timeMachine); - timeMachine.setValue(now); - DateTieredCompactionRequest request; - if (isMajor) { - for (StoreFile file : candidates) { - ((MockStoreFile)file).setIsMajor(true); - } - Assert.assertEquals(toCompact, ((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy()) - .shouldPerformMajorCompaction(candidates)); - request = (DateTieredCompactionRequest) ((DateTieredCompactionPolicy) store.storeEngine - .getCompactionPolicy()).selectMajorCompaction(candidates); - } else { - Assert.assertEquals(toCompact, ((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy()) - .needsCompaction(candidates, ImmutableList. of())); - request = (DateTieredCompactionRequest) ((DateTieredCompactionPolicy) store.storeEngine - .getCompactionPolicy()).selectMinorCompaction(candidates, false, false); - } - List actual = Lists.newArrayList(request.getFiles()); - Assert.assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual))); - Assert.assertEquals(Arrays.toString(expectedBoundaries), - Arrays.toString(request.getBoundaries().toArray())); - } - /** * Test for incoming window * @throws IOException with error @@ -118,7 +66,7 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 }; compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 10, 11, 12, 13 }, - new long[] { Long.MIN_VALUE, 12 }, false, true); + new long[] { Long.MIN_VALUE, 12, Long.MAX_VALUE }, false, true); } /** @@ -132,7 +80,7 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11 }; compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 20, 21, 22, 23, - 24, 25 }, new long[] { Long.MIN_VALUE, 6}, false, true); + 24, 25 }, new long[] { Long.MIN_VALUE, 6, Long.MAX_VALUE}, false, true); } /** @@ -146,7 +94,7 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 }; compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 10, 11, 12, 13 }, - new long[] { Long.MIN_VALUE, 12 }, false, true); + new long[] { Long.MIN_VALUE, 12, Long.MAX_VALUE }, false, true); } /** @@ -160,7 +108,7 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 }; compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 10, 11, 12, 13 }, - new long[] { Long.MIN_VALUE, 12}, false, true); + new long[] { Long.MIN_VALUE, 12, Long.MAX_VALUE}, false, true); } /** @@ -174,7 +122,7 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { long[] sizes = new long[] { 0, 20, 21, 22, 23, 1 }; compactEquals(194, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 22, 23 }, - new long[] { Long.MIN_VALUE, 96}, false, true); + new long[] { Long.MIN_VALUE, 96, Long.MAX_VALUE}, false, true); } @Test @@ -184,7 +132,7 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 30, 31, 32, 2, 1 }; compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 30, 31, 32 }, - new long[] { Long.MIN_VALUE, 120 }, false, true); + new long[] { Long.MIN_VALUE, 120, Long.MAX_VALUE }, false, true); } /** @@ -198,7 +146,7 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 280, 23, 24, 1 }; compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 20, 21, 22 }, - new long[] { Long.MIN_VALUE }, false, true); + new long[] { Long.MIN_VALUE, Long.MAX_VALUE }, false, true); } /** @@ -212,7 +160,7 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 350, 30, 31, 2, 1 }; compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 30, 31 }, - new long[] { Long.MIN_VALUE }, false, true); + new long[] { Long.MIN_VALUE, Long.MAX_VALUE }, false, true); } /** @@ -226,7 +174,7 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 280, 23, 24, 1 }; compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 23, 24 }, - new long[] { Long.MIN_VALUE }, false, true); + new long[] { Long.MIN_VALUE, Long.MAX_VALUE }, false, true); } /** @@ -240,7 +188,7 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 }; compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 40, 41, 42, 33, - 30, 31 }, new long[] { Long.MIN_VALUE, 96 }, false, true); + 30, 31 }, new long[] { Long.MIN_VALUE, 96, Long.MAX_VALUE }, false, true); } /** @@ -254,7 +202,7 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 28, 23, 24, 1 }; compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 31, 32, 33, 34, - 22, 28, 23, 24, 1 }, new long[] { Long.MIN_VALUE, 12 }, false, true); + 22, 28, 23, 24, 1 }, new long[] { Long.MIN_VALUE, 12, Long.MAX_VALUE }, false, true); } /** @@ -270,7 +218,7 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { compactEquals(1, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 31, 32, 33, 34, 22, 25, 23, 24, 1 }, - new long[] { Long.MIN_VALUE, -24 }, false, true); + new long[] { Long.MIN_VALUE, -24, Long.MAX_VALUE }, false, true); } /** @@ -283,8 +231,9 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 105, 106, 113, 145, 157 }; long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 }; - compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 0, 50, 51, 40,41, 42, - 33, 30, 31, 2, 1 }, new long[] { Long.MIN_VALUE, 24, 48, 72, 96, 120, 144, 150, 156 }, true, true); + compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), + new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 }, + new long[] { 0, 24, 48, 72, 96, 120, 144, 150, 156, 162 }, true, true); } /** @@ -300,26 +249,6 @@ public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 }, - new long[] { Long.MIN_VALUE, -144, -120, -96, -72, -48, -24, 0, 6, 12 }, true, true); - } - - /** - * Major compaction with maximum values - * @throws IOException with error - */ - @Test - public void maxValuesForMajor() throws IOException { - conf.setLong(CompactionConfiguration.BASE_WINDOW_MILLIS_KEY, Long.MAX_VALUE / 2); - conf.setInt(CompactionConfiguration.WINDOWS_PER_TIER_KEY, 2); - store.storeEngine.getCompactionPolicy().setConf(conf); - long[] minTimestamps = - new long[] { Long.MIN_VALUE, -100 }; - long[] maxTimestamps = new long[] { -8, Long.MAX_VALUE }; - long[] sizes = new long[] { 0, 1 }; - - compactEquals(Long.MAX_VALUE, sfCreate(minTimestamps, maxTimestamps, sizes), - new long[] { 0, 1 }, - new long[] { Long.MIN_VALUE, -4611686018427387903L, 0, 4611686018427387903L, - 9223372036854775806L }, true, true); + new long[] { -155, -144, -120, -96, -72, -48, -24, 0, 6, 12, 18 }, true, true); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyOverflow.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyOverflow.java new file mode 100644 index 0000000..748cb42 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyOverflow.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionWindowFactory; +import org.apache.hadoop.hbase.regionserver.compactions.ExponentialCompactionWindowFactory; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestDateTieredCompactionPolicyOverflow extends AbstractTestDateTieredCompactionPolicy { + + @Override + protected void config() { + super.config(); + + // Set up policy + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, + "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine"); + conf.setLong(CompactionConfiguration.MAX_AGE_MILLIS_KEY, 100); + conf.setLong(CompactionConfiguration.INCOMING_WINDOW_MIN_KEY, 3); + conf.setClass(CompactionConfiguration.COMPACTION_WINDOW_FACTORY_CLASS, + ExponentialCompactionWindowFactory.class, CompactionWindowFactory.class); + conf.setLong(ExponentialCompactionWindowFactory.BASE_WINDOW_MILLIS_KEY, Long.MAX_VALUE / 2); + conf.setInt(ExponentialCompactionWindowFactory.WINDOWS_PER_TIER_KEY, 2); + conf.setBoolean(CompactionConfiguration.SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, false); + + // Special settings for compaction policy per window + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2); + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 12); + this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F); + + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 20); + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 10); + } + + /** + * Major compaction with maximum values + * @throws IOException with error + */ + @Test + public void maxValuesForMajor() throws IOException { + long[] minTimestamps = new long[] { Long.MIN_VALUE, -100 }; + long[] maxTimestamps = new long[] { -8, Long.MAX_VALUE }; + long[] sizes = new long[] { 0, 1 }; + + compactEquals(Long.MAX_VALUE, sfCreate(minTimestamps, maxTimestamps, sizes), + new long[] { 0, 1 }, new long[] { Long.MIN_VALUE, -4611686018427387903L, 0, + 4611686018427387903L, 9223372036854775806L, Long.MAX_VALUE }, + true, true); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCalendricalHotColdCompactionWindowFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCalendricalHotColdCompactionWindowFactory.java new file mode 100644 index 0000000..7e86cb9 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCalendricalHotColdCompactionWindowFactory.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.joda.time.DateTimeField; +import org.joda.time.DateTimeFieldType; +import org.joda.time.chrono.ISOChronology; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestCalendricalHotColdCompactionWindowFactory { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final class FakeStoreConfigInformation implements StoreConfigInformation { + + private long memstoreFlushSize; + + private int compactionCheckMultiplier; + + private long blockingFileCount; + + public FakeStoreConfigInformation(Configuration conf) { + this.memstoreFlushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, + HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE); + this.blockingFileCount = conf.getInt(HStore.BLOCKING_STOREFILES_KEY, + HStore.DEFAULT_BLOCKING_STOREFILE_COUNT); + this.compactionCheckMultiplier = conf.getInt(HStore.COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, + HStore.DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); + } + + @Override + public long getMemstoreFlushSize() { + return memstoreFlushSize; + } + + @Override + public long getStoreFileTtl() { + return Long.MAX_VALUE; + } + + @Override + public long getCompactionCheckMultiplier() { + return compactionCheckMultiplier; + } + + @Override + public long getBlockingFileCount() { + return blockingFileCount; + } + } + + private void assertWindow(long startMillis, long endMillis, CompactionWindow window) { + assertEquals(startMillis, window.startMillis()); + assertEquals(endMillis, window.endMillis()); + } + + @Test + public void test() { + UTIL.getConfiguration().set(CalendricalHotColdCompactionWindowFactory.HOT_WINDOW_UNIT, "D"); + UTIL.getConfiguration().setInt(CalendricalHotColdCompactionWindowFactory.HOT_WINDOWS_PER_TIER, + 3); + UTIL.getConfiguration().set(CalendricalHotColdCompactionWindowFactory.COLD_WINDOW_UNIT, "Y"); + UTIL.getConfiguration().setLong(CompactionConfiguration.MAX_AGE_MILLIS_KEY, + TimeUnit.DAYS.toMillis(7)); + UTIL.getConfiguration().setClass(CompactionConfiguration.COMPACTION_WINDOW_FACTORY_CLASS, + CalendricalHotColdCompactionWindowFactory.class, CompactionWindowFactory.class); + CompactionConfiguration comConf = new CompactionConfiguration(UTIL.getConfiguration(), + new FakeStoreConfigInformation(UTIL.getConfiguration())); + CalendricalHotColdCompactionWindowFactory factory = new CalendricalHotColdCompactionWindowFactory( + comConf); + DateTimeField year = DateTimeFieldType.year().getField(ISOChronology.getInstance()); + DateTimeField day = DateTimeFieldType.dayOfMonth().getField(ISOChronology.getInstance()); + long startOfYear = year.roundFloor(System.currentTimeMillis()); + long windowStartMillis = day.add(startOfYear, 4); + long windowEndMillis = day.add(windowStartMillis, 1); + long now = windowStartMillis + (windowEndMillis - windowStartMillis) / 2; + CompactionWindow window = factory.newIncomingWindow(now); + long oldestToCompact = now - comConf.getMaxStoreFileAgeMillis(); + for (int i = 0; i < 3; i++) { + assertWindow(windowStartMillis, windowEndMillis, window); + window = window.nextWindow(oldestToCompact); + windowEndMillis = windowStartMillis; + windowStartMillis = day.add(windowStartMillis, -1); + } + windowStartMillis = startOfYear; + assertWindow(windowStartMillis, windowEndMillis, window); + for (int i = 0; i < 10; i++) { + windowEndMillis = windowStartMillis; + windowStartMillis = year.add(windowStartMillis, -1); + window = window.nextWindow(oldestToCompact); + assertWindow(windowStartMillis, windowEndMillis, window); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java index 7707116..e35ca9d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver.compactions; +import static org.apache.hadoop.hbase.regionserver.DateTieredStoreFileManager.*; import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_END_KEY; import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_START_KEY; import static org.junit.Assert.assertArrayEquals; @@ -183,6 +184,21 @@ public class TestCompactor { } } + public void verifyBoundaries(List boundaries, long archiveWindowBefore) { + assertEquals(boundaries.size() - 1, writers.size()); + for (int i = 0; i < writers.size(); ++i) { + if (boundaries.get(i + 1) <= archiveWindowBefore) { + assertEquals("i = " + i, boundaries.get(i).longValue(), + Bytes.toLong(writers.get(i).data.get(ARCHIVE_WINDOW_START_TIMESTAMP))); + assertEquals("i = " + i, boundaries.get(i + 1).longValue(), + Bytes.toLong(writers.get(i).data.get(ARCHIVE_WINDOW_END_TIMESTAMP))); + } else { + assertNull(writers.get(i).data.get(ARCHIVE_WINDOW_START_TIMESTAMP)); + assertNull(writers.get(i).data.get(ARCHIVE_WINDOW_END_TIMESTAMP)); + } + } + } + public List getWriters() { return writers; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactionPolicyArchive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactionPolicyArchive.java new file mode 100644 index 0000000..c93cd31 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactionPolicyArchive.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import com.google.common.collect.Lists; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.AbstractTestDateTieredCompactionPolicy; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.StoreEngine; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionWindowFactory; +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.ExponentialCompactionWindowFactory; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import static org.mockito.Mockito.*; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestDateTieredCompactionPolicyArchive extends AbstractTestDateTieredCompactionPolicy { + + @Override + protected void config() { + super.config(); + + // Set up policy + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, + "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine"); + conf.setLong(CompactionConfiguration.MAX_AGE_MILLIS_KEY, 100); + conf.setLong(CompactionConfiguration.INCOMING_WINDOW_MIN_KEY, 3); + conf.setClass(CompactionConfiguration.COMPACTION_WINDOW_FACTORY_CLASS, + ExponentialCompactionWindowFactory.class, CompactionWindowFactory.class); + conf.setLong(ExponentialCompactionWindowFactory.BASE_WINDOW_MILLIS_KEY, 6); + conf.setInt(ExponentialCompactionWindowFactory.WINDOWS_PER_TIER_KEY, 4); + conf.setBoolean(CompactionConfiguration.SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY, false); + conf.setBoolean(CompactionConfiguration.ARCHIVE_FILES_OLDER_THAN_MAX_AGE, true); + // Special settings for compaction policy per window + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2); + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 12); + this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F); + + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 20); + conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 10); + } + + @Test + public void test() throws IOException { + long[] minTimestamps = new long[] { 0, 24, 32, 150 }; + long[] maxTimestamps = new long[] { 12, 30, 56, 160 }; + long[] sizes = new long[] { 10, 20, 10, 20 }; + + compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 20, 10 }, + new long[] { 24, 48, 72 }, false, true); + } + + @Test + public void testOneFileButOverlap() throws IOException { + long[] minTimestamps = new long[] { 0, 24, 150 }; + long[] maxTimestamps = new long[] { 12, 56, 160 }; + long[] sizes = new long[] { 10, 20, 10 }; + + compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), new long[] { 20 }, + new long[] { 24, 48, 72 }, false, true); + } + + @Test + public void testCompacting() throws IOException { + long[] minTimestamps = new long[] { 0, 12, 24, 32, 150 }; + long[] maxTimestamps = new long[] { 12, 23, 30, 56, 160 }; + long[] sizes = new long[] { 10, 20, 30, 40, 10 }; + + List candidateFiles = sfCreate(minTimestamps, maxTimestamps, sizes); + + compactEquals(161, candidateFiles, Lists.newArrayList(candidateFiles.subList(0, 2)), + new long[] { 30, 40 }, new long[] { 24, 48, 72 }, false, true); + } + + @Test + public void testTTL() throws IOException { + long[] minTimestamps = new long[] { 0 }; + long[] maxTimestamps = new long[] { 12 }; + long[] sizes = new long[] { 10 }; + + List candidateFiles = sfCreate(minTimestamps, maxTimestamps, sizes); + + HStore mockedStore = spy(store); + when(mockedStore.getStoreFileTtl()).thenReturn(120L); + DateTieredCompactionPolicy policy = (DateTieredCompactionPolicy) store.getStoreEngine() + .getCompactionPolicy(); + policy.storeConfigInfo = mockedStore; + try { + compactEquals(161, candidateFiles, Collections. emptyList(), new long[] { 10 }, + new long[] { 0, 24 }, false, true, policy); + } finally { + policy.storeConfigInfo = store; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java index 38d9f99..4b630fe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java @@ -122,15 +122,16 @@ public class TestDateTieredCompactor { }; } - private void verify(KeyValue[] input, List boundaries, KeyValue[][] output, - boolean allFiles) throws Exception { + private void verify(KeyValue[] input, List boundaries, long archiveWindowBefore, + KeyValue[][] output, boolean allFiles) throws Exception { StoreFileWritersCapture writers = new StoreFileWritersCapture(); StoreFile sf1 = createDummyStoreFile(1L); StoreFile sf2 = createDummyStoreFile(2L); DateTieredCompactor dtc = createCompactor(writers, input, Arrays.asList(sf1, sf2)); - List paths = dtc.compact(new CompactionRequest(Arrays.asList(sf1)), - boundaries.subList(0, boundaries.size() - 1), NoLimitThroughputController.INSTANCE, null); + List paths = dtc.compact(new CompactionRequest(Arrays.asList(sf1)), boundaries, + archiveWindowBefore, NoLimitThroughputController.INSTANCE, null); writers.verifyKvs(output, allFiles, boundaries); + writers.verifyBoundaries(boundaries, archiveWindowBefore); if (allFiles) { assertEquals(output.length, paths.size()); } @@ -143,11 +144,11 @@ public class TestDateTieredCompactor { @Test public void test() throws Exception { - verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(100L, 200L, 300L, 400L, 500L), + verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(100L, 200L, 300L, 400L, 500L), 300L, a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)), true); verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, 200L, Long.MAX_VALUE), - a(a(KV_A), a(KV_B, KV_C, KV_D)), false); - verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE), + Long.MIN_VALUE, a(a(KV_A), a(KV_B, KV_C, KV_D)), false); + verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE), Long.MIN_VALUE, new KeyValue[][] { a(KV_A, KV_B, KV_C, KV_D) }, false); } @@ -158,7 +159,7 @@ public class TestDateTieredCompactor { DateTieredCompactor dtc = createCompactor(writers, new KeyValue[0], new ArrayList(request.getFiles())); List paths = dtc.compact(request, Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE), - NoLimitThroughputController.INSTANCE, null); + Long.MIN_VALUE, NoLimitThroughputController.INSTANCE, null); assertEquals(1, paths.size()); List dummyWriters = writers.getWriters(); assertEquals(1, dummyWriters.size()); diff --git a/pom.xml b/pom.xml index 0324c1c..aaed6eb 100644 --- a/pom.xml +++ b/pom.xml @@ -1233,6 +1233,7 @@ 1.0.8 2.11.6 1.46 + 2.9.2 2.4 1.8 @@ -1563,6 +1564,11 @@ ${netty.version}
+ joda-time + joda-time + ${joda-time.version} + + org.apache.thrift libthrift ${thrift.version} -- 1.9.1