From b93a312a58791ec6ec45db054ce9c6c66a7aaa31 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 4 Mar 2016 09:22:54 +0800 Subject: [PATCH] HBASE-15389 Write out multiple files when compaction --- .../regionserver/AbstractMultiFileWriter.java | 128 +++++++++++++ .../regionserver/DateTieredMultiFileWriter.java | 82 ++++++++ .../hbase/regionserver/DateTieredStoreEngine.java | 88 +++++++++ .../hadoop/hbase/regionserver/StoreFile.java | 8 + .../hbase/regionserver/StripeMultiFileWriter.java | 135 ++++++------- .../hbase/regionserver/StripeStoreFlusher.java | 2 +- .../compactions/DateTieredCompactionPolicy.java | 60 +++++- .../compactions/DateTieredComparator.java | 145 ++++++++++++++ .../regionserver/TestDateTieredCompaction.java | 199 +++---------------- .../TestDateTieredCompactionPolicy.java | 211 +++++++++++++++++++++ 10 files changed, 809 insertions(+), 249 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredComparator.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java new file mode 100644 index 0000000..9d4bdf9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.compactions.Compactor.CellSink; + +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; + +/** + * Base class for cell sink that separates the provided cells into multiple files. + */ +@InterfaceAudience.Private +public abstract class AbstractMultiFileWriter implements CellSink { + + private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class); + + /** Factory that is used to produce single StoreFile.Writer-s */ + protected WriterFactory writerFactory; + protected CellComparator comparator; + + /** Source scanner that is tracking KV count; may be null if source is not StoreScanner */ + protected StoreScanner sourceScanner; + + /** Whether to write stripe metadata */ + private boolean doWriteExtraMetadata = true; + + public interface WriterFactory { + public StoreFile.Writer createWriter() throws IOException; + } + + /** + * Initializes multi-writer before usage. + * @param sourceScanner Optional store scanner to obtain the information about read progress. + * @param factory Factory used to produce individual file writers. + * @param comparator Comparator used to compare rows. + */ + public void init(StoreScanner sourceScanner, WriterFactory factory, CellComparator comparator) + throws IOException { + this.writerFactory = factory; + this.sourceScanner = sourceScanner; + this.comparator = comparator; + } + + public void setNoExtraMetadata() { + this.doWriteExtraMetadata = false; + } + + public List commitWriters(long maxSeqId, boolean isMajor) throws IOException { + commitWritersInternal(); + LOG.debug((this.doWriteExtraMetadata ? "W" : "Not w") + "riting out metadata for " + + this.writers().size() + " writers"); + List paths = new ArrayList(); + for (PeekingIterator> iter = Iterators + .peekingIterator(boundaryAndWriters().iterator());;) { + Map.Entry entry = iter.next(); + if (!iter.hasNext()) { + // this is the last boundary, just break + break; + } + StoreFile.Writer writer = entry.getValue(); + if (writer == null) { + continue; + } + if (doWriteExtraMetadata) { + writeExtraMetadata(writer, entry.getKey(), iter.peek().getKey()); + } + writer.appendMetadata(maxSeqId, isMajor); + paths.add(writer.getPath()); + writer.close(); + } + return paths; + } + + public List abortWriters() { + List paths = new ArrayList(); + for (StoreFile.Writer writer : writers()) { + try { + if (writer != null) { + paths.add(writer.getPath()); + writer.close(); + } + } catch (Exception ex) { + LOG.error("Failed to close the writer after an unfinished compaction.", ex); + } + } + return paths; + } + + protected abstract Iterable> boundaryAndWriters(); + + protected abstract Collection writers(); + + /** + * Subclasses override this method to be called at the end of a successful sequence of append; all + * appends are processed before this method is called. + */ + protected abstract void commitWritersInternal() throws IOException; + + protected abstract void writeExtraMetadata(StoreFile.Writer writer, K lowerBoundary, + K higherBoundary) throws IOException; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java new file mode 100644 index 0000000..38d81f6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; + +/** + * class for cell sink that separates the provided cells into multiple files for date tiered + * compaction. + */ +@InterfaceAudience.Private +public class DateTieredMultiFileWriter extends AbstractMultiFileWriter { + /** + * Usually the boundaries.size = writers.size + 1, so use {@link NavigableMap#floorEntry(Object)} + * to get writer. And when commit, you should know that the last value must be null. + */ + protected NavigableMap boundary2Writer; + + public DateTieredMultiFileWriter(List boundaries) { + for (Long boundary: boundaries) { + boundary2Writer.put(boundary, null); + } + } + + @Override + public void append(Cell cell) throws IOException { + Map.Entry entry = boundary2Writer.floorEntry(cell.getTimestamp()); + if (entry == null || entry.getKey().equals(boundary2Writer.lastKey())) { + throw new IndexOutOfBoundsException(entry + " is out of range, should in range [" + + boundary2Writer.firstKey() + ", " + boundary2Writer.lastKey() + ")"); + } + StoreFile.Writer writer = entry.getValue(); + if (writer == null) { + writer = writerFactory.createWriter(); + entry.setValue(writer); + } + writer.append(cell); + } + + @Override + protected Iterable> boundaryAndWriters() { + return boundary2Writer.entrySet(); + } + + @Override + protected Collection writers() { + return boundary2Writer.headMap(boundary2Writer.lastKey(), false).values(); + } + + @Override + protected void commitWritersInternal() throws IOException { + } + + @Override + protected void writeExtraMetadata(Writer writer, Long lowerBoundary, Long higherBoundary) + throws IOException { + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java new file mode 100644 index 0000000..a6e3dab --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredComparator; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; +import org.apache.hadoop.hbase.security.User; + +/** + * + */ +@InterfaceAudience.Private +public class DateTieredStoreEngine extends + StoreEngine { + + @Override + public boolean needsCompaction(List filesCompacting) { + return compactionPolicy.needsCompaction(storeFileManager.getStorefiles(), filesCompacting); + } + + @Override + public CompactionContext createCompaction() throws IOException { + return new DateTieredCompactionContext(); + } + + @Override + protected void createComponents(Configuration conf, Store store, CellComparator kvComparator) + throws IOException { + this.compactionPolicy = new DateTieredCompactionPolicy(conf, store); + this.storeFileManager = new DefaultStoreFileManager(kvComparator, conf, + compactionPolicy.getConf()); + this.storeFlusher = new DefaultStoreFlusher(conf, store); + this.compactor = new DateTieredComparator(conf, store); + } + + private final class DateTieredCompactionContext extends CompactionContext { + + @Override + public List preSelect(List filesCompacting) { + return compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStorefiles(), + filesCompacting); + } + + @Override + public boolean select(List filesCompacting, boolean isUserCompaction, + boolean mayUseOffPeak, boolean forceMajor) throws IOException { + request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(), filesCompacting, + isUserCompaction, mayUseOffPeak, forceMajor); + return request != null; + } + + @Override + public List compact(ThroughputController throughputController) throws IOException { + return compact(throughputController, null); + } + + @Override + public List compact(ThroughputController throughputController, User user) + throws IOException { + List boundaries = compactionPolicy.getCompactBoundaries(request.getFiles()); + return compactor.compact(request, boundaries, throughputController, user); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 61eb9b8..a31ad13 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -1765,6 +1765,7 @@ public class StoreFile { public static final Comparator SEQ_ID = Ordering.compound(ImmutableList.of( Ordering.natural().onResultOf(new GetSeqId()), + Ordering.natural().onResultOf(new GetMaxTimestamp()), Ordering.natural().onResultOf(new GetFileSize()).reverse(), Ordering.natural().onResultOf(new GetBulkTime()), Ordering.natural().onResultOf(new GetPathName()) @@ -1804,5 +1805,12 @@ public class StoreFile { return sf.getPath().getName(); } } + + private static class GetMaxTimestamp implements Function { + @Override + public Long apply(StoreFile sf) { + return sf.getMaximumTimestamp(); + } + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java index 651b863..b251a21 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java @@ -20,94 +20,95 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.regionserver.compactions.Compactor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; import org.apache.hadoop.hbase.util.Bytes; /** - * Base class for cell sink that separates the provided cells into multiple files. + * Base class for cell sink that separates the provided cells into multiple files for stripe + * compaction. */ @InterfaceAudience.Private -public abstract class StripeMultiFileWriter implements Compactor.CellSink { - private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class); +public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter { - /** Factory that is used to produce single StoreFile.Writer-s */ - protected WriterFactory writerFactory; - protected CellComparator comparator; + private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class); protected List existingWriters; protected List boundaries; - /** Source scanner that is tracking KV count; may be null if source is not StoreScanner */ - protected StoreScanner sourceScanner; - - /** Whether to write stripe metadata */ - private boolean doWriteStripeMetadata = true; - - public interface WriterFactory { - public StoreFile.Writer createWriter() throws IOException; - } - - /** - * Initializes multi-writer before usage. - * @param sourceScanner Optional store scanner to obtain the information about read progress. - * @param factory Factory used to produce individual file writers. - * @param comparator Comparator used to compare rows. - */ - public void init(StoreScanner sourceScanner, WriterFactory factory, CellComparator comparator) - throws IOException { - this.writerFactory = factory; - this.sourceScanner = sourceScanner; - this.comparator = comparator; - } - public void setNoStripeMetadata() { - this.doWriteStripeMetadata = false; + @Override + protected Iterable> boundaryAndWriters() { + return new Iterable>() { + + @Override + public Iterator> iterator() { + return new Iterator>() { + + private int index; + + @Override + public boolean hasNext() { + return index < boundaries.size() - 1; + } + + @Override + public Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + final byte[] boundary = boundaries.get(index); + final StoreFile.Writer writer = index < existingWriters.size() + ? existingWriters.get(index) : null; + return new Entry() { + + @Override + public Writer setValue(Writer value) { + throw new UnsupportedOperationException(); + } + + @Override + public Writer getValue() { + return writer; + } + + @Override + public byte[] getKey() { + return boundary; + } + }; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; } - public List commitWriters(long maxSeqId, boolean isMajor) throws IOException { - assert this.existingWriters != null; - commitWritersInternal(); - assert this.boundaries.size() == (this.existingWriters.size() + 1); - LOG.debug((this.doWriteStripeMetadata ? "W" : "Not w") - + "riting out metadata for " + this.existingWriters.size() + " writers"); - List paths = new ArrayList(); - for (int i = 0; i < this.existingWriters.size(); ++i) { - StoreFile.Writer writer = this.existingWriters.get(i); - if (writer == null) continue; // writer was skipped due to 0 KVs - if (doWriteStripeMetadata) { - writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, this.boundaries.get(i)); - writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, this.boundaries.get(i + 1)); - } - writer.appendMetadata(maxSeqId, isMajor); - paths.add(writer.getPath()); - writer.close(); - } - this.existingWriters = null; - return paths; + @Override + protected Collection writers() { + return existingWriters; } - public List abortWriters() { - assert this.existingWriters != null; - List paths = new ArrayList(); - for (StoreFile.Writer writer : this.existingWriters) { - try { - paths.add(writer.getPath()); - writer.close(); - } catch (Exception ex) { - LOG.error("Failed to close the writer after an unfinished compaction.", ex); - } - } - this.existingWriters = null; - return paths; + @Override + protected void writeExtraMetadata(Writer writer, byte[] lowerBoundary, byte[] higherBoundary) + throws IOException { + writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, lowerBoundary); + writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, higherBoundary); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java index 9a06a88..d8eab98 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java @@ -127,7 +127,7 @@ public class StripeStoreFlusher extends StoreFlusher { public StripeMultiFileWriter createWriter() throws IOException { StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(1, Long.MAX_VALUE, OPEN_KEY, OPEN_KEY); - writer.setNoStripeMetadata(); + writer.setNoExtraMetadata(); return writer; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java index 9f65e6e..30db8ee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java @@ -17,14 +17,6 @@ */ package org.apache.hadoop.hbase.regionserver.compactions; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import com.google.common.collect.PeekingIterator; - import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -42,6 +34,14 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.PeekingIterator; + /** * HBASE-15181 This is a simple implementation of date-based tiered compaction similar to * Cassandra's for the following benefits: @@ -147,6 +147,42 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy { comConf.getBaseWindowMillis(), mayUseOffPeak); } + public List getCompactBoundaries(Collection filesToCompact) { + long maxTimestamp = Long.MIN_VALUE; + long minTimestamp = Long.MAX_VALUE; + for (StoreFile file : filesToCompact) { + maxTimestamp = Math.max(maxTimestamp, file.getMaximumTimestamp()); + minTimestamp = Math.min(minTimestamp, file.getMinimumTimestamp()); + } + List boundaries = new ArrayList(); + Window window; + // locate max timestamp first + for (window = getInitialWindow(EnvironmentEdgeManager.currentTime(), + comConf.getBaseWindowMillis());; window = window.nextWindow(comConf.getWindowsPerTier())) { + if (window.compareToTimestamp(maxTimestamp) <= 0) { + boundaries.add(maxTimestamp + 1); + boundaries.add(window.startMillis()); + break; + } + } + assert !boundaries.isEmpty(); + for (;; window = window.nextWindow(comConf.getWindowsPerTier())) { + if (window.compareToTimestamp(minTimestamp) > 0) { + boundaries.add(window.endMillis()); + boundaries.add(window.startMillis()); + } else { + long endMillis = window.endMillis(); + if (endMillis > minTimestamp) { + boundaries.add(endMillis); + } + boundaries.add(minTimestamp); + break; + } + } + Collections.reverse(boundaries); + return boundaries; + } + /** * @param buckets the list of buckets, sorted from newest to oldest, from which to return the * newest bucket within thresholds. @@ -297,5 +333,13 @@ public class DateTieredCompactionPolicy extends RatioBasedCompactionPolicy { return new Window(windowMillis * windowsPerTier, divPosition / windowsPerTier - 1); } } + + public long startMillis() { + return windowMillis * divPosition; + } + + public long endMillis() { + return windowMillis * (divPosition + 1); + } } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredComparator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredComparator.java new file mode 100644 index 0000000..e58ffd4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredComparator.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; +import org.apache.hadoop.hbase.security.User; + +import com.google.common.io.Closeables; + +/** + * This compactor will generate StoreFile for different time ranges. + */ +@InterfaceAudience.Private +public class DateTieredComparator extends Compactor { + + private static final Log LOG = LogFactory.getLog(DateTieredComparator.class); + + public DateTieredComparator(Configuration conf, Store store) { + super(conf, store); + } + + public List compact(final CompactionRequest request, List boundaries, + ThroughputController throughputController, User user) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Executing compaction with " + boundaries.size() + " boundaries: " + boundaries); + } + final FileDetails fd = getFileDetails(request.getFiles(), request.isMajor()); + this.progress = new CompactionProgress(fd.maxKeyCount); + + // Find the smallest read point across all the Scanners. + long smallestReadPoint = getSmallestReadPoint(); + + List scanners; + Collection readersToClose; + boolean cleanSeqId = false; + if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) { + // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles, + // HFileFiles, and their readers + readersToClose = new ArrayList(request.getFiles().size()); + for (StoreFile f : request.getFiles()) { + readersToClose.add(new StoreFile(f)); + } + scanners = createFileScanners(readersToClose, smallestReadPoint, + store.throttleCompaction(request.getSize())); + } else { + readersToClose = Collections.emptyList(); + scanners = createFileScanners(request.getFiles(), smallestReadPoint, + store.throttleCompaction(request.getSize())); + } + InternalScanner scanner = null; + DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(boundaries); + boolean finished = false; + try { + /* Include deletes, unless we are doing a major compaction */ + ScanType scanType = request.isMajor() ? ScanType.COMPACT_DROP_DELETES + : ScanType.COMPACT_RETAIN_DELETES; + scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners); + if (scanner == null) { + scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs); + } + scanner = postCreateCoprocScanner(request, scanType, scanner, user); + if (scanner == null) { + // NULL scanner returned from coprocessor hooks means skip normal processing. + return new ArrayList(); + } + if (fd.minSeqIdToKeep > 0) { + smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); + cleanSeqId = true; + } + // Create the writer factory for compactions. + final boolean needMvcc = fd.maxMVCCReadpoint >= smallestReadPoint; + final Compression.Algorithm compression = store.getFamily().getCompactionCompressionType(); + DateTieredMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() { + @Override + public Writer createWriter() throws IOException { + return store.createWriterInTmp(fd.maxKeyCount, compression, true, needMvcc, + fd.maxTagsLength > 0, store.throttleCompaction(request.getSize())); + } + }; + // Prepare multi-writer, and perform the compaction using scanner and writer. + // It is ok here if storeScanner is null. + StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner) scanner : null; + writer.init(storeScanner, factory, store.getComparator()); + finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, + throughputController, request.isMajor()); + if (!finished) { + throw new InterruptedIOException("Aborting compaction of store " + store + " in region " + + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); + } + } finally { + Closeables.close(scanner, true); + if (!finished) { + FileSystem fs = store.getFileSystem(); + for (Path leftoverFile : writer.abortWriters()) { + try { + fs.delete(leftoverFile, false); + } catch (Exception e) { + LOG.error("Failed to delete the leftover file " + leftoverFile + + " after an unfinished compaction.", + e); + } + } + } + } + return writer.commitWriters(fd.maxSeqId, request.isMajor()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompaction.java index cfb54b7..e2d29fc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompaction.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,194 +17,48 @@ */ package org.apache.hadoop.hbase.regionserver; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; -import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.Assert; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category(SmallTests.class) -public class TestDateTieredCompaction extends TestCompactionPolicy { - ArrayList sfCreate(long[] minTimestamps, long[] maxTimestamps, long[] sizes) - throws IOException { - ArrayList ageInDisk = new ArrayList(); - for (int i = 0; i < sizes.length; i++) { - ageInDisk.add(0L); - } - - ArrayList ret = Lists.newArrayList(); - for (int i = 0; i < sizes.length; i++) { - MockStoreFile msf = - new MockStoreFile(TEST_UTIL, TEST_FILE, sizes[i], ageInDisk.get(i), false, i); - msf.setTimeRangeTracker(new TimeRangeTracker(minTimestamps[i], maxTimestamps[i])); - ret.add(msf); - } - return ret; - } - - @Override - protected void config() { - super.config(); - - // Set up policy - conf.setLong(CompactionConfiguration.MAX_AGE_MILLIS_KEY, 100); - conf.setLong(CompactionConfiguration.INCOMING_WINDOW_MIN_KEY, 3); - conf.setLong(CompactionConfiguration.BASE_WINDOW_MILLIS_KEY, 6); - conf.setInt(CompactionConfiguration.WINDOWS_PER_TIER_KEY, 4); - conf.set(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, - DateTieredCompactionPolicy.class.getName()); - - // Special settings for compaction policy per window - this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2); - this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 12); - this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F); - } - - void compactEquals(long now, ArrayList candidates, long... expected) - throws IOException { - Assert.assertTrue(((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy()) - .needsCompaction(candidates, ImmutableList. of(), now)); - - List actual = - ((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy()) - .applyCompactionPolicy(candidates, false, false, now); +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestDateTieredCompaction { - Assert.assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual))); - } + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - /** - * Test for incoming window - * @throws IOException with error - */ - @Test - public void incomingWindow() throws IOException { - long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; - long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 }; - long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 }; + private static final TableName TN = TableName.valueOf("dtc"); - compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 13, 12, 11, 10); - } - - /** - * Not enough files in incoming window - * @throws IOException with error - */ - @Test - public void NotIncomingWindow() throws IOException { - long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; - long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 }; - long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11 }; + private static final byte[] FAMILY = Bytes.toBytes("f"); - compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 25, 24, 23, 22, 21, 20); - } + private static final byte[] QUALIFIER = Bytes.toBytes("q"); - /** - * Test for file newer than incoming window - * @throws IOException with error - */ - @Test - public void NewerThanIncomingWindow() throws IOException { - long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; - long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 18 }; - long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 }; + private HRegion region; - compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 13, 12, 11, 10); + @Before + public void setUp() throws IOException { + HTableDescriptor htd = new HTableDescriptor(TN).addFamily(new HColumnDescriptor(FAMILY)); + htd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, DateTieredStoreEngine.class.getName()); + region = UTIL.createLocalHRegion(htd, null, null); } - /** - * If there is no T1 window, we don't build 2 - * @throws IOException with error - */ - @Test - public void NoT2() throws IOException { - long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; - long[] maxTimestamps = new long[] { 44, 60, 61, 92, 95, 100 }; - long[] sizes = new long[] { 0, 20, 21, 22, 23, 1 }; - - compactEquals(100, sfCreate(minTimestamps, maxTimestamps, sizes), 23, 22); - } - - @Test - public void T1() throws IOException { - long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; - long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 120, 124, 143, 145, 157 }; - long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 30, 31, 32, 2, 1 }; - - compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), 32, 31, 30); - } - - /** - * Apply exploring logic on non-incoming window - * @throws IOException with error - */ - @Test - public void RatioT0() throws IOException { - long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; - long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }; - long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 280, 23, 24, 1 }; - - compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 22, 21, 20); - } - - /** - * Also apply ratio-based logic on t2 window - * @throws IOException with error - */ - @Test - public void RatioT2() throws IOException { - long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; - long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 120, 124, 143, 145, 157 }; - long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 350, 30, 31, 2, 1 }; - - compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), 31, 30); - } - - /** - * The next compaction call after testTieredCompactionRatioT0 is compacted - * @throws IOException with error - */ - @Test - public void RatioT0Next() throws IOException { - long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; - long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 8, 9, 10, 11, 12 }; - long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 280, 23, 24, 1 }; - - compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 24, 23); - } - - /** - * Older than now(161) - maxAge(100) - * @throws IOException with error - */ - @Test - public void olderThanMaxAge() throws IOException { - long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; - long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 105, 106, 113, 145, 157 }; - long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 }; - - compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), 31, 30, 33, 42, 41, 40); + @After + public void tearDown() throws IOException { + HBaseTestingUtility.closeRegionAndWAL(region); + UTIL.cleanupTestDir(); } - /** - * Out-of-order data - * @throws IOException with error - */ @Test - public void OutOfOrder() throws IOException { - long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; - long[] maxTimestamps = new long[] { 0, 13, 3, 10, 11, 1, 2, 12, 14, 15 }; - long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 28, 23, 24, 1 }; + public void test() { - compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 1, 24, 23, 28, 22, 34, 33, 32, - 31); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java new file mode 100644 index 0000000..3460508 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicy.java @@ -0,0 +1,211 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestDateTieredCompactionPolicy extends TestCompactionPolicy { + ArrayList sfCreate(long[] minTimestamps, long[] maxTimestamps, long[] sizes) + throws IOException { + ArrayList ageInDisk = new ArrayList(); + for (int i = 0; i < sizes.length; i++) { + ageInDisk.add(0L); + } + + ArrayList ret = Lists.newArrayList(); + for (int i = 0; i < sizes.length; i++) { + MockStoreFile msf = + new MockStoreFile(TEST_UTIL, TEST_FILE, sizes[i], ageInDisk.get(i), false, i); + msf.setTimeRangeTracker(new TimeRangeTracker(minTimestamps[i], maxTimestamps[i])); + ret.add(msf); + } + return ret; + } + + @Override + protected void config() { + super.config(); + + // Set up policy + conf.setLong(CompactionConfiguration.MAX_AGE_MILLIS_KEY, 100); + conf.setLong(CompactionConfiguration.INCOMING_WINDOW_MIN_KEY, 3); + conf.setLong(CompactionConfiguration.BASE_WINDOW_MILLIS_KEY, 6); + conf.setInt(CompactionConfiguration.WINDOWS_PER_TIER_KEY, 4); + conf.set(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, + DateTieredCompactionPolicy.class.getName()); + + // Special settings for compaction policy per window + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2); + this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 12); + this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F); + } + + void compactEquals(long now, ArrayList candidates, long... expected) + throws IOException { + Assert.assertTrue(((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy()) + .needsCompaction(candidates, ImmutableList. of(), now)); + + List actual = + ((DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy()) + .applyCompactionPolicy(candidates, false, false, now); + + Assert.assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual))); + } + + /** + * Test for incoming window + * @throws IOException with error + */ + @Test + public void incomingWindow() throws IOException { + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 }; + long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 }; + + compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 13, 12, 11, 10); + } + + /** + * Not enough files in incoming window + * @throws IOException with error + */ + @Test + public void NotIncomingWindow() throws IOException { + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 }; + long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11 }; + + compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 25, 24, 23, 22, 21, 20); + } + + /** + * Test for file newer than incoming window + * @throws IOException with error + */ + @Test + public void NewerThanIncomingWindow() throws IOException { + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 18 }; + long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 }; + + compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 13, 12, 11, 10); + } + + /** + * If there is no T1 window, we don't build 2 + * @throws IOException with error + */ + @Test + public void NoT2() throws IOException { + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 44, 60, 61, 92, 95, 100 }; + long[] sizes = new long[] { 0, 20, 21, 22, 23, 1 }; + + compactEquals(100, sfCreate(minTimestamps, maxTimestamps, sizes), 23, 22); + } + + @Test + public void T1() throws IOException { + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 120, 124, 143, 145, 157 }; + long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 30, 31, 32, 2, 1 }; + + compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), 32, 31, 30); + } + + /** + * Apply exploring logic on non-incoming window + * @throws IOException with error + */ + @Test + public void RatioT0() throws IOException { + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }; + long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 280, 23, 24, 1 }; + + compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 22, 21, 20); + } + + /** + * Also apply ratio-based logic on t2 window + * @throws IOException with error + */ + @Test + public void RatioT2() throws IOException { + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 120, 124, 143, 145, 157 }; + long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 350, 30, 31, 2, 1 }; + + compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), 31, 30); + } + + /** + * The next compaction call after testTieredCompactionRatioT0 is compacted + * @throws IOException with error + */ + @Test + public void RatioT0Next() throws IOException { + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 8, 9, 10, 11, 12 }; + long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 280, 23, 24, 1 }; + + compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 24, 23); + } + + /** + * Older than now(161) - maxAge(100) + * @throws IOException with error + */ + @Test + public void olderThanMaxAge() throws IOException { + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 105, 106, 113, 145, 157 }; + long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 }; + + compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes), 31, 30, 33, 42, 41, 40); + } + + /** + * Out-of-order data + * @throws IOException with error + */ + @Test + public void OutOfOrder() throws IOException { + long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + long[] maxTimestamps = new long[] { 0, 13, 3, 10, 11, 1, 2, 12, 14, 15 }; + long[] sizes = new long[] { 30, 31, 32, 33, 34, 22, 28, 23, 24, 1 }; + + compactEquals(16, sfCreate(minTimestamps, maxTimestamps, sizes), 1, 24, 23, 28, 22, 34, 33, 32, + 31); + } +} -- 1.9.1