From 7158829e2b724011d8d6863d4d7f8c3abe446337 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 7 Mar 2016 20:59:55 +0800 Subject: [PATCH] HBASE-15389 Write out multiple files when compaction --- .../regionserver/AbstractMultiFileWriter.java | 128 ++++++++ .../regionserver/DateTieredMultiFileWriter.java | 84 ++++++ .../hbase/regionserver/StripeMultiFileWriter.java | 136 ++++----- .../hbase/regionserver/StripeStoreFlusher.java | 2 +- .../compactions/DateTieredCompactor.java | 152 ++++++++++ .../hbase/regionserver/TestStripeCompactor.java | 325 --------------------- .../regionserver/compactions/TestCompactor.java | 187 ++++++++++++ .../compactions/TestDateTieredCompactor.java | 129 ++++++++ .../compactions/TestStripeCompactionPolicy.java | 2 +- .../compactions/TestStripeCompactor.java | 212 ++++++++++++++ 10 files changed, 963 insertions(+), 394 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java new file mode 100644 index 0000000..9d4bdf9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.compactions.Compactor.CellSink; + +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; + +/** + * Base class for cell sink that separates the provided cells into multiple files. + */ +@InterfaceAudience.Private +public abstract class AbstractMultiFileWriter implements CellSink { + + private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class); + + /** Factory that is used to produce single StoreFile.Writer-s */ + protected WriterFactory writerFactory; + protected CellComparator comparator; + + /** Source scanner that is tracking KV count; may be null if source is not StoreScanner */ + protected StoreScanner sourceScanner; + + /** Whether to write stripe metadata */ + private boolean doWriteExtraMetadata = true; + + public interface WriterFactory { + public StoreFile.Writer createWriter() throws IOException; + } + + /** + * Initializes multi-writer before usage. + * @param sourceScanner Optional store scanner to obtain the information about read progress. + * @param factory Factory used to produce individual file writers. + * @param comparator Comparator used to compare rows. + */ + public void init(StoreScanner sourceScanner, WriterFactory factory, CellComparator comparator) + throws IOException { + this.writerFactory = factory; + this.sourceScanner = sourceScanner; + this.comparator = comparator; + } + + public void setNoExtraMetadata() { + this.doWriteExtraMetadata = false; + } + + public List commitWriters(long maxSeqId, boolean isMajor) throws IOException { + commitWritersInternal(); + LOG.debug((this.doWriteExtraMetadata ? "W" : "Not w") + "riting out metadata for " + + this.writers().size() + " writers"); + List paths = new ArrayList(); + for (PeekingIterator> iter = Iterators + .peekingIterator(boundaryAndWriters().iterator());;) { + Map.Entry entry = iter.next(); + if (!iter.hasNext()) { + // this is the last boundary, just break + break; + } + StoreFile.Writer writer = entry.getValue(); + if (writer == null) { + continue; + } + if (doWriteExtraMetadata) { + writeExtraMetadata(writer, entry.getKey(), iter.peek().getKey()); + } + writer.appendMetadata(maxSeqId, isMajor); + paths.add(writer.getPath()); + writer.close(); + } + return paths; + } + + public List abortWriters() { + List paths = new ArrayList(); + for (StoreFile.Writer writer : writers()) { + try { + if (writer != null) { + paths.add(writer.getPath()); + writer.close(); + } + } catch (Exception ex) { + LOG.error("Failed to close the writer after an unfinished compaction.", ex); + } + } + return paths; + } + + protected abstract Iterable> boundaryAndWriters(); + + protected abstract Collection writers(); + + /** + * Subclasses override this method to be called at the end of a successful sequence of append; all + * appends are processed before this method is called. + */ + protected abstract void commitWritersInternal() throws IOException; + + protected abstract void writeExtraMetadata(StoreFile.Writer writer, K lowerBoundary, + K higherBoundary) throws IOException; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java new file mode 100644 index 0000000..f2817c1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; + +/** + * class for cell sink that separates the provided cells into multiple files for date tiered + * compaction. + */ +@InterfaceAudience.Private +public class DateTieredMultiFileWriter extends AbstractMultiFileWriter { + /** + * Usually the boundaries.size = writers.size + 1, so use {@link NavigableMap#floorEntry(Object)} + * to get writer. And when commit, you should know that the last value must be null. + */ + private final NavigableMap boundary2Writer + = new TreeMap(); + + public DateTieredMultiFileWriter(List boundaries) { + for (Long boundary : boundaries) { + boundary2Writer.put(boundary, null); + } + } + + @Override + public void append(Cell cell) throws IOException { + Map.Entry entry = boundary2Writer.floorEntry(cell.getTimestamp()); + if (entry == null || entry.getKey().equals(boundary2Writer.lastKey())) { + throw new IndexOutOfBoundsException(entry + " is out of range, should in range [" + + boundary2Writer.firstKey() + ", " + boundary2Writer.lastKey() + ")"); + } + StoreFile.Writer writer = entry.getValue(); + if (writer == null) { + writer = writerFactory.createWriter(); + boundary2Writer.put(entry.getKey(), writer); + } + writer.append(cell); + } + + @Override + protected Iterable> boundaryAndWriters() { + return boundary2Writer.entrySet(); + } + + @Override + protected Collection writers() { + return boundary2Writer.headMap(boundary2Writer.lastKey(), false).values(); + } + + @Override + protected void commitWritersInternal() throws IOException { + } + + @Override + protected void writeExtraMetadata(Writer writer, Long lowerBoundary, Long higherBoundary) + throws IOException { + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java index 651b863..c073586 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java @@ -20,94 +20,96 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.regionserver.compactions.Compactor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; import org.apache.hadoop.hbase.util.Bytes; /** - * Base class for cell sink that separates the provided cells into multiple files. + * Base class for cell sink that separates the provided cells into multiple files for stripe + * compaction. */ @InterfaceAudience.Private -public abstract class StripeMultiFileWriter implements Compactor.CellSink { - private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class); +public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter { - /** Factory that is used to produce single StoreFile.Writer-s */ - protected WriterFactory writerFactory; - protected CellComparator comparator; + private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class); protected List existingWriters; protected List boundaries; - /** Source scanner that is tracking KV count; may be null if source is not StoreScanner */ - protected StoreScanner sourceScanner; - - /** Whether to write stripe metadata */ - private boolean doWriteStripeMetadata = true; - - public interface WriterFactory { - public StoreFile.Writer createWriter() throws IOException; - } - - /** - * Initializes multi-writer before usage. - * @param sourceScanner Optional store scanner to obtain the information about read progress. - * @param factory Factory used to produce individual file writers. - * @param comparator Comparator used to compare rows. - */ - public void init(StoreScanner sourceScanner, WriterFactory factory, CellComparator comparator) - throws IOException { - this.writerFactory = factory; - this.sourceScanner = sourceScanner; - this.comparator = comparator; - } - public void setNoStripeMetadata() { - this.doWriteStripeMetadata = false; + @Override + protected Iterable> boundaryAndWriters() { + return new Iterable>() { + + @Override + public Iterator> iterator() { + return new Iterator>() { + + private int index; + + @Override + public boolean hasNext() { + return index < boundaries.size(); + } + + @Override + public Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + final byte[] boundary = boundaries.get(index); + final StoreFile.Writer writer = index < existingWriters.size() + ? existingWriters.get(index) : null; + index++; + return new Entry() { + + @Override + public Writer setValue(Writer value) { + throw new UnsupportedOperationException(); + } + + @Override + public Writer getValue() { + return writer; + } + + @Override + public byte[] getKey() { + return boundary; + } + }; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; } - public List commitWriters(long maxSeqId, boolean isMajor) throws IOException { - assert this.existingWriters != null; - commitWritersInternal(); - assert this.boundaries.size() == (this.existingWriters.size() + 1); - LOG.debug((this.doWriteStripeMetadata ? "W" : "Not w") - + "riting out metadata for " + this.existingWriters.size() + " writers"); - List paths = new ArrayList(); - for (int i = 0; i < this.existingWriters.size(); ++i) { - StoreFile.Writer writer = this.existingWriters.get(i); - if (writer == null) continue; // writer was skipped due to 0 KVs - if (doWriteStripeMetadata) { - writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, this.boundaries.get(i)); - writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, this.boundaries.get(i + 1)); - } - writer.appendMetadata(maxSeqId, isMajor); - paths.add(writer.getPath()); - writer.close(); - } - this.existingWriters = null; - return paths; + @Override + protected Collection writers() { + return existingWriters; } - public List abortWriters() { - assert this.existingWriters != null; - List paths = new ArrayList(); - for (StoreFile.Writer writer : this.existingWriters) { - try { - paths.add(writer.getPath()); - writer.close(); - } catch (Exception ex) { - LOG.error("Failed to close the writer after an unfinished compaction.", ex); - } - } - this.existingWriters = null; - return paths; + @Override + protected void writeExtraMetadata(Writer writer, byte[] lowerBoundary, byte[] higherBoundary) + throws IOException { + writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, lowerBoundary); + writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, higherBoundary); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java index 9a06a88..d8eab98 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java @@ -127,7 +127,7 @@ public class StripeStoreFlusher extends StoreFlusher { public StripeMultiFileWriter createWriter() throws IOException { StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(1, Long.MAX_VALUE, OPEN_KEY, OPEN_KEY); - writer.setNoStripeMetadata(); + writer.setNoExtraMetadata(); return writer; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java new file mode 100644 index 0000000..cc4eae7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; +import org.apache.hadoop.hbase.security.User; + +import com.google.common.io.Closeables; + +/** + * This compactor will generate StoreFile for different time ranges. + */ +@InterfaceAudience.Private +public class DateTieredCompactor extends Compactor { + + private static final Log LOG = LogFactory.getLog(DateTieredCompactor.class); + + public DateTieredCompactor(Configuration conf, Store store) { + super(conf, store); + } + + public List compact(final CompactionRequest request, List boundaries, + ThroughputController throughputController, User user) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Executing compaction with " + boundaries.size() + " boundaries: " + boundaries); + } + final FileDetails fd = getFileDetails(request.getFiles(), request.isMajor()); + this.progress = new CompactionProgress(fd.maxKeyCount); + + // Find the smallest read point across all the Scanners. + long smallestReadPoint = getSmallestReadPoint(); + + List scanners; + Collection readersToClose; + boolean cleanSeqId = false; + if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) { + // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles, + // HFileFiles, and their readers + readersToClose = new ArrayList(request.getFiles().size()); + for (StoreFile f : request.getFiles()) { + readersToClose.add(new StoreFile(f)); + } + scanners = createFileScanners(readersToClose, smallestReadPoint, + store.throttleCompaction(request.getSize())); + } else { + readersToClose = Collections.emptyList(); + scanners = createFileScanners(request.getFiles(), smallestReadPoint, + store.throttleCompaction(request.getSize())); + } + InternalScanner scanner = null; + DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(boundaries); + boolean finished = false; + try { + /* Include deletes, unless we are doing a major compaction */ + ScanType scanType = request.isMajor() ? ScanType.COMPACT_DROP_DELETES + : ScanType.COMPACT_RETAIN_DELETES; + scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners); + if (scanner == null) { + scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs); + } + scanner = postCreateCoprocScanner(request, scanType, scanner, user); + if (scanner == null) { + // NULL scanner returned from coprocessor hooks means skip normal processing. + return new ArrayList(); + } + if (fd.minSeqIdToKeep > 0) { + smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); + cleanSeqId = true; + } + // Create the writer factory for compactions. + final boolean needMvcc = fd.maxMVCCReadpoint >= smallestReadPoint; + final Compression.Algorithm compression = store.getFamily().getCompactionCompressionType(); + DateTieredMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() { + @Override + public Writer createWriter() throws IOException { + return store.createWriterInTmp(fd.maxKeyCount, compression, true, needMvcc, + fd.maxTagsLength > 0, store.throttleCompaction(request.getSize())); + } + }; + // Prepare multi-writer, and perform the compaction using scanner and writer. + // It is ok here if storeScanner is null. + StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner) scanner : null; + writer.init(storeScanner, factory, store.getComparator()); + finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, + throughputController, request.isMajor()); + if (!finished) { + throw new InterruptedIOException("Aborting compaction of store " + store + " in region " + + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); + } + } finally { + Closeables.close(scanner, true); + for (StoreFile f : readersToClose) { + try { + f.closeReader(true); + } catch (Exception e) { + LOG.warn("Exception closing " + f, e); + } + } + if (!finished) { + FileSystem fs = store.getFileSystem(); + for (Path leftoverFile : writer.abortWriters()) { + try { + fs.delete(leftoverFile, false); + } catch (Exception e) { + LOG.error("Failed to delete the leftover file " + leftoverFile + + " after an unfinished compaction.", + e); + } + } + } + } + return writer.commitWriters(fd.maxSeqId, request.isMajor()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java deleted file mode 100644 index cb586f3..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java +++ /dev/null @@ -1,325 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY; -import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_END_KEY; -import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_START_KEY; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.TreeMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; -import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - - -@Category({RegionServerTests.class, SmallTests.class}) -public class TestStripeCompactor { - private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo"); - private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS); - - private static final byte[] KEY_B = Bytes.toBytes("bbb"); - private static final byte[] KEY_C = Bytes.toBytes("ccc"); - private static final byte[] KEY_D = Bytes.toBytes("ddd"); - - private static final KeyValue KV_A = kvAfter(Bytes.toBytes("aaa")); - private static final KeyValue KV_B = kvAfter(KEY_B); - private static final KeyValue KV_C = kvAfter(KEY_C); - private static final KeyValue KV_D = kvAfter(KEY_D); - - private static KeyValue kvAfter(byte[] key) { - return new KeyValue(Arrays.copyOf(key, key.length + 1), 0L); - } - - private static T[] a(T... a) { - return a; - } - - private static KeyValue[] e() { - return TestStripeCompactor.a(); - } - - @Test - public void testBoundaryCompactions() throws Exception { - // General verification - verifyBoundaryCompaction(a(KV_A, KV_A, KV_B, KV_B, KV_C, KV_D), - a(OPEN_KEY, KEY_B, KEY_D, OPEN_KEY), a(a(KV_A, KV_A), a(KV_B, KV_B, KV_C), a(KV_D))); - verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_C, KEY_D), a(a(KV_B), a(KV_C))); - verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_D), new KeyValue[][] { a(KV_B, KV_C) }); - } - - @Test - public void testBoundaryCompactionEmptyFiles() throws Exception { - // No empty file if there're already files. - verifyBoundaryCompaction( - a(KV_B), a(KEY_B, KEY_C, KEY_D, OPEN_KEY), a(a(KV_B), null, null), null, null, false); - verifyBoundaryCompaction(a(KV_A, KV_C), - a(OPEN_KEY, KEY_B, KEY_C, KEY_D), a(a(KV_A), null, a(KV_C)), null, null, false); - // But should be created if there are no file. - verifyBoundaryCompaction( - e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, null, e()), null, null, false); - // In major range if there's major range. - verifyBoundaryCompaction( - e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, e(), null), KEY_B, KEY_C, false); - verifyBoundaryCompaction( - e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(e(), e(), null), OPEN_KEY, KEY_C, false); - // Major range should have files regardless of KVs. - verifyBoundaryCompaction(a(KV_A), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY), - a(a(KV_A), e(), e(), null), KEY_B, KEY_D, false); - verifyBoundaryCompaction(a(KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY), - a(null, null, a(KV_C), e()), KEY_C, OPEN_KEY, false); - - } - - public static void verifyBoundaryCompaction( - KeyValue[] input, byte[][] boundaries, KeyValue[][] output) throws Exception { - verifyBoundaryCompaction(input, boundaries, output, null, null, true); - } - - public static void verifyBoundaryCompaction(KeyValue[] input, byte[][] boundaries, - KeyValue[][] output, byte[] majorFrom, byte[] majorTo, boolean allFiles) - throws Exception { - StoreFileWritersCapture writers = new StoreFileWritersCapture(); - StripeCompactor sc = createCompactor(writers, input); - List paths = - sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo, - NoLimitThroughputController.INSTANCE); - writers.verifyKvs(output, allFiles, true); - if (allFiles) { - assertEquals(output.length, paths.size()); - writers.verifyBoundaries(boundaries); - } - } - - @Test - public void testSizeCompactions() throws Exception { - // General verification with different sizes. - verifySizeCompaction(a(KV_A, KV_A, KV_B, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY, - a(a(KV_A, KV_A), a(KV_B, KV_C), a(KV_D))); - verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 4, 1, OPEN_KEY, OPEN_KEY, - a(a(KV_A), a(KV_B), a(KV_C), a(KV_D))); - verifySizeCompaction(a(KV_B, KV_C), 2, 1, KEY_B, KEY_D, a(a(KV_B), a(KV_C))); - // Verify row boundaries are preserved. - verifySizeCompaction(a(KV_A, KV_A, KV_A, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY, - a(a(KV_A, KV_A, KV_A), a(KV_C, KV_D))); - verifySizeCompaction(a(KV_A, KV_B, KV_B, KV_C), 3, 1, OPEN_KEY, OPEN_KEY, - a(a(KV_A), a(KV_B, KV_B), a(KV_C))); - // Too much data, count limits the number of files. - verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 2, 1, OPEN_KEY, OPEN_KEY, - a(a(KV_A), a(KV_B, KV_C, KV_D))); - verifySizeCompaction(a(KV_A, KV_B, KV_C), 1, Long.MAX_VALUE, OPEN_KEY, KEY_D, - new KeyValue[][] { a(KV_A, KV_B, KV_C) }); - // Too little data/large count, no extra files. - verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), Integer.MAX_VALUE, 2, OPEN_KEY, OPEN_KEY, - a(a(KV_A, KV_B), a(KV_C, KV_D))); - } - - public static void verifySizeCompaction(KeyValue[] input, int targetCount, long targetSize, - byte[] left, byte[] right, KeyValue[][] output) throws Exception { - StoreFileWritersCapture writers = new StoreFileWritersCapture(); - StripeCompactor sc = createCompactor(writers, input); - List paths = - sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, null, - NoLimitThroughputController.INSTANCE); - assertEquals(output.length, paths.size()); - writers.verifyKvs(output, true, true); - List boundaries = new ArrayList(); - boundaries.add(left); - for (int i = 1; i < output.length; ++i) { - boundaries.add(CellUtil.cloneRow(output[i][0])); - } - boundaries.add(right); - writers.verifyBoundaries(boundaries.toArray(new byte[][] {})); - } - - private static StripeCompactor createCompactor( - StoreFileWritersCapture writers, KeyValue[] input) throws Exception { - Configuration conf = HBaseConfiguration.create(); - final Scanner scanner = new Scanner(input); - - // Create store mock that is satisfactory for compactor. - HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS); - ScanInfo si = new ScanInfo(conf, col, Long.MAX_VALUE, 0, CellComparator.COMPARATOR); - Store store = mock(Store.class); - when(store.getFamily()).thenReturn(col); - when(store.getScanInfo()).thenReturn(si); - when(store.areWritesEnabled()).thenReturn(true); - when(store.getFileSystem()).thenReturn(mock(FileSystem.class)); - when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME)); - when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), - anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); - when(store.getComparator()).thenReturn(CellComparator.COMPARATOR); - - return new StripeCompactor(conf, store) { - @Override - protected InternalScanner createScanner(Store store, List scanners, - long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, - byte[] dropDeletesToRow) throws IOException { - return scanner; - } - - @Override - protected InternalScanner createScanner(Store store, List scanners, - ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { - return scanner; - } - }; - } - - private static CompactionRequest createDummyRequest() throws Exception { - // "Files" are totally unused, it's Scanner class below that gives compactor fake KVs. - // But compaction depends on everything under the sun, so stub everything with dummies. - StoreFile sf = mock(StoreFile.class); - StoreFile.Reader r = mock(StoreFile.Reader.class); - when(r.length()).thenReturn(1L); - when(r.getBloomFilterType()).thenReturn(BloomType.NONE); - when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class)); - when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())) - .thenReturn(mock(StoreFileScanner.class)); - when(sf.getReader()).thenReturn(r); - when(sf.createReader()).thenReturn(r); - when(sf.createReader(anyBoolean())).thenReturn(r); - return new CompactionRequest(Arrays.asList(sf)); - } - - private static class Scanner implements InternalScanner { - private final ArrayList kvs; - public Scanner(KeyValue... kvs) { - this.kvs = new ArrayList(Arrays.asList(kvs)); - } - - @Override - public boolean next(List results) throws IOException { - if (kvs.isEmpty()) return false; - results.add(kvs.remove(0)); - return !kvs.isEmpty(); - } - - @Override - public boolean next(List result, ScannerContext scannerContext) - throws IOException { - return next(result); - } - - @Override - public void close() throws IOException {} - } - - // StoreFile.Writer has private ctor and is unwieldy, so this has to be convoluted. - public static class StoreFileWritersCapture implements - Answer, StripeMultiFileWriter.WriterFactory { - public static class Writer { - public ArrayList kvs = new ArrayList(); - public TreeMap data = new TreeMap(Bytes.BYTES_COMPARATOR); - } - - private List writers = new ArrayList(); - - @Override - public StoreFile.Writer createWriter() throws IOException { - final Writer realWriter = new Writer(); - writers.add(realWriter); - StoreFile.Writer writer = mock(StoreFile.Writer.class); - doAnswer(new Answer() { - public Object answer(InvocationOnMock invocation) { - return realWriter.kvs.add((KeyValue)invocation.getArguments()[0]); - }}).when(writer).append(any(KeyValue.class)); - doAnswer(new Answer() { - public Object answer(InvocationOnMock invocation) { - Object[] args = invocation.getArguments(); - return realWriter.data.put((byte[])args[0], (byte[])args[1]); - }}).when(writer).appendFileInfo(any(byte[].class), any(byte[].class)); - return writer; - } - - @Override - public StoreFile.Writer answer(InvocationOnMock invocation) throws Throwable { - return createWriter(); - } - - public void verifyKvs(KeyValue[][] kvss, boolean allFiles, boolean requireMetadata) { - if (allFiles) { - assertEquals(kvss.length, writers.size()); - } - int skippedWriters = 0; - for (int i = 0; i < kvss.length; ++i) { - KeyValue[] kvs = kvss[i]; - if (kvs != null) { - Writer w = writers.get(i - skippedWriters); - if (requireMetadata) { - assertNotNull(w.data.get(STRIPE_START_KEY)); - assertNotNull(w.data.get(STRIPE_END_KEY)); - } else { - assertNull(w.data.get(STRIPE_START_KEY)); - assertNull(w.data.get(STRIPE_END_KEY)); - } - assertEquals(kvs.length, w.kvs.size()); - for (int j = 0; j < kvs.length; ++j) { - assertEquals(kvs[j], w.kvs.get(j)); - } - } else { - assertFalse(allFiles); - ++skippedWriters; - } - } - } - - public void verifyBoundaries(byte[][] boundaries) { - assertEquals(boundaries.length - 1, writers.size()); - for (int i = 0; i < writers.size(); ++i) { - assertArrayEquals("i = " + i, boundaries[i], writers.get(i).data.get(STRIPE_START_KEY)); - assertArrayEquals("i = " + i, boundaries[i + 1], writers.get(i).data.get(STRIPE_END_KEY)); - } - } - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java new file mode 100644 index 0000000..c404067 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_END_KEY; +import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_START_KEY; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.TreeMap; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; +import org.apache.hadoop.hbase.util.Bytes; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestCompactor { + + public static CompactionRequest createDummyRequest() throws Exception { + // "Files" are totally unused, it's Scanner class below that gives compactor fake KVs. + // But compaction depends on everything under the sun, so stub everything with dummies. + StoreFile sf = mock(StoreFile.class); + StoreFile.Reader r = mock(StoreFile.Reader.class); + when(r.length()).thenReturn(1L); + when(r.getBloomFilterType()).thenReturn(BloomType.NONE); + when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class)); + when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())) + .thenReturn(mock(StoreFileScanner.class)); + when(sf.getReader()).thenReturn(r); + when(sf.createReader()).thenReturn(r); + when(sf.createReader(anyBoolean())).thenReturn(r); + return new CompactionRequest(Arrays.asList(sf)); + } + + // StoreFile.Writer has private ctor and is unwieldy, so this has to be convoluted. + public static class StoreFileWritersCapture + implements Answer, StripeMultiFileWriter.WriterFactory { + public static class Writer { + public ArrayList kvs = new ArrayList(); + public TreeMap data = new TreeMap(Bytes.BYTES_COMPARATOR); + } + + private List writers = new ArrayList(); + + @Override + public StoreFile.Writer createWriter() throws IOException { + final Writer realWriter = new Writer(); + writers.add(realWriter); + StoreFile.Writer writer = mock(StoreFile.Writer.class); + doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + return realWriter.kvs.add((KeyValue) invocation.getArguments()[0]); + } + }).when(writer).append(any(KeyValue.class)); + doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + Object[] args = invocation.getArguments(); + return realWriter.data.put((byte[]) args[0], (byte[]) args[1]); + } + }).when(writer).appendFileInfo(any(byte[].class), any(byte[].class)); + return writer; + } + + @Override + public StoreFile.Writer answer(InvocationOnMock invocation) throws Throwable { + return createWriter(); + } + + public void verifyKvs(KeyValue[][] kvss, boolean allFiles, boolean requireMetadata) { + if (allFiles) { + assertEquals(kvss.length, writers.size()); + } + int skippedWriters = 0; + for (int i = 0; i < kvss.length; ++i) { + KeyValue[] kvs = kvss[i]; + if (kvs != null) { + Writer w = writers.get(i - skippedWriters); + if (requireMetadata) { + assertNotNull(w.data.get(STRIPE_START_KEY)); + assertNotNull(w.data.get(STRIPE_END_KEY)); + } else { + assertNull(w.data.get(STRIPE_START_KEY)); + assertNull(w.data.get(STRIPE_END_KEY)); + } + assertEquals(kvs.length, w.kvs.size()); + for (int j = 0; j < kvs.length; ++j) { + assertEquals(kvs[j], w.kvs.get(j)); + } + } else { + assertFalse(allFiles); + ++skippedWriters; + } + } + } + + public void verifyBoundaries(byte[][] boundaries) { + assertEquals(boundaries.length - 1, writers.size()); + for (int i = 0; i < writers.size(); ++i) { + assertArrayEquals("i = " + i, boundaries[i], writers.get(i).data.get(STRIPE_START_KEY)); + assertArrayEquals("i = " + i, boundaries[i + 1], writers.get(i).data.get(STRIPE_END_KEY)); + } + } + + public void verifyKvs(KeyValue[][] kvss, boolean allFiles, List boundaries) { + if (allFiles) { + assertEquals(kvss.length, writers.size()); + } + int skippedWriters = 0; + for (int i = 0; i < kvss.length; ++i) { + KeyValue[] kvs = kvss[i]; + if (kvs != null) { + Writer w = writers.get(i - skippedWriters); + assertEquals(kvs.length, w.kvs.size()); + for (int j = 0; j < kvs.length; ++j) { + assertTrue(kvs[j].getTimestamp() >= boundaries.get(i)); + assertTrue(kvs[j].getTimestamp() < boundaries.get(i + 1)); + assertEquals(kvs[j], w.kvs.get(j)); + } + } else { + assertFalse(allFiles); + ++skippedWriters; + } + } + } + } + + public static class Scanner implements InternalScanner { + private final ArrayList kvs; + + public Scanner(KeyValue... kvs) { + this.kvs = new ArrayList(Arrays.asList(kvs)); + } + + @Override + public boolean next(List results) throws IOException { + if (kvs.isEmpty()) return false; + results.add(kvs.remove(0)); + return !kvs.isEmpty(); + } + + @Override + public boolean next(List result, ScannerContext scannerContext) throws IOException { + return next(result); + } + + @Override + public void close() throws IOException { + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java new file mode 100644 index 0000000..54ef5c5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.ScanInfo; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner; +import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture; +import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestDateTieredCompactor { + + private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo"); + + private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS); + + private static final KeyValue KV_A = new KeyValue(Bytes.toBytes("aaa"), 100L); + + private static final KeyValue KV_B = new KeyValue(Bytes.toBytes("bbb"), 200L); + + private static final KeyValue KV_C = new KeyValue(Bytes.toBytes("ccc"), 300L); + + private static final KeyValue KV_D = new KeyValue(Bytes.toBytes("ddd"), 400L); + + private static DateTieredCompactor createCompactor(StoreFileWritersCapture writers, + KeyValue[] input) throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.setBoolean("hbase.regionserver.compaction.private.readers", false); + final Scanner scanner = new Scanner(input); + + // Create store mock that is satisfactory for compactor. + HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS); + ScanInfo si = new ScanInfo(conf, col, Long.MAX_VALUE, 0, CellComparator.COMPARATOR); + Store store = mock(Store.class); + when(store.getFamily()).thenReturn(col); + when(store.getScanInfo()).thenReturn(si); + when(store.areWritesEnabled()).thenReturn(true); + when(store.getFileSystem()).thenReturn(mock(FileSystem.class)); + when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME)); + when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(), + anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); + when(store.getComparator()).thenReturn(CellComparator.COMPARATOR); + + return new DateTieredCompactor(conf, store) { + @Override + protected InternalScanner createScanner(Store store, List scanners, + long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, + byte[] dropDeletesToRow) throws IOException { + return scanner; + } + + @Override + protected InternalScanner createScanner(Store store, List scanners, + ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { + return scanner; + } + }; + } + + private static void verify(KeyValue[] input, List boundaries, KeyValue[][] output, + boolean allFiles) throws Exception { + StoreFileWritersCapture writers = new StoreFileWritersCapture(); + DateTieredCompactor dtc = createCompactor(writers, input); + List paths = dtc.compact(createDummyRequest(), boundaries, + NoLimitThroughputController.INSTANCE, null); + writers.verifyKvs(output, allFiles, boundaries); + if (allFiles) { + assertEquals(output.length, paths.size()); + } + } + + private static T[] a(T... a) { + return a; + } + + @Test + public void test() throws Exception { + verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(100L, 200L, 300L, 400L, 500L), + a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)), true); + verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, 200L, Long.MAX_VALUE), + a(a(KV_A), a(KV_B, KV_C, KV_D)), false); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index c440a57..98555b9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -65,8 +65,8 @@ import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager; import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher; -import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider; +import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.RegionServerTests; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java new file mode 100644 index 0000000..c17668f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY; +import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.ScanInfo; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner; +import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture; +import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + + +@Category({RegionServerTests.class, SmallTests.class}) +public class TestStripeCompactor { + private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo"); + private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS); + + private static final byte[] KEY_B = Bytes.toBytes("bbb"); + private static final byte[] KEY_C = Bytes.toBytes("ccc"); + private static final byte[] KEY_D = Bytes.toBytes("ddd"); + + private static final KeyValue KV_A = kvAfter(Bytes.toBytes("aaa")); + private static final KeyValue KV_B = kvAfter(KEY_B); + private static final KeyValue KV_C = kvAfter(KEY_C); + private static final KeyValue KV_D = kvAfter(KEY_D); + + private static KeyValue kvAfter(byte[] key) { + return new KeyValue(Arrays.copyOf(key, key.length + 1), 0L); + } + + private static T[] a(T... a) { + return a; + } + + private static KeyValue[] e() { + return TestStripeCompactor.a(); + } + + @Test + public void testBoundaryCompactions() throws Exception { + // General verification + verifyBoundaryCompaction(a(KV_A, KV_A, KV_B, KV_B, KV_C, KV_D), + a(OPEN_KEY, KEY_B, KEY_D, OPEN_KEY), a(a(KV_A, KV_A), a(KV_B, KV_B, KV_C), a(KV_D))); + verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_C, KEY_D), a(a(KV_B), a(KV_C))); + verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_D), new KeyValue[][] { a(KV_B, KV_C) }); + } + + @Test + public void testBoundaryCompactionEmptyFiles() throws Exception { + // No empty file if there're already files. + verifyBoundaryCompaction( + a(KV_B), a(KEY_B, KEY_C, KEY_D, OPEN_KEY), a(a(KV_B), null, null), null, null, false); + verifyBoundaryCompaction(a(KV_A, KV_C), + a(OPEN_KEY, KEY_B, KEY_C, KEY_D), a(a(KV_A), null, a(KV_C)), null, null, false); + // But should be created if there are no file. + verifyBoundaryCompaction( + e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, null, e()), null, null, false); + // In major range if there's major range. + verifyBoundaryCompaction( + e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, e(), null), KEY_B, KEY_C, false); + verifyBoundaryCompaction( + e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(e(), e(), null), OPEN_KEY, KEY_C, false); + // Major range should have files regardless of KVs. + verifyBoundaryCompaction(a(KV_A), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY), + a(a(KV_A), e(), e(), null), KEY_B, KEY_D, false); + verifyBoundaryCompaction(a(KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY), + a(null, null, a(KV_C), e()), KEY_C, OPEN_KEY, false); + + } + + public static void verifyBoundaryCompaction( + KeyValue[] input, byte[][] boundaries, KeyValue[][] output) throws Exception { + verifyBoundaryCompaction(input, boundaries, output, null, null, true); + } + + public static void verifyBoundaryCompaction(KeyValue[] input, byte[][] boundaries, + KeyValue[][] output, byte[] majorFrom, byte[] majorTo, boolean allFiles) + throws Exception { + StoreFileWritersCapture writers = new StoreFileWritersCapture(); + StripeCompactor sc = createCompactor(writers, input); + List paths = + sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo, + NoLimitThroughputController.INSTANCE); + writers.verifyKvs(output, allFiles, true); + if (allFiles) { + assertEquals(output.length, paths.size()); + writers.verifyBoundaries(boundaries); + } + } + + @Test + public void testSizeCompactions() throws Exception { + // General verification with different sizes. + verifySizeCompaction(a(KV_A, KV_A, KV_B, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY, + a(a(KV_A, KV_A), a(KV_B, KV_C), a(KV_D))); + verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 4, 1, OPEN_KEY, OPEN_KEY, + a(a(KV_A), a(KV_B), a(KV_C), a(KV_D))); + verifySizeCompaction(a(KV_B, KV_C), 2, 1, KEY_B, KEY_D, a(a(KV_B), a(KV_C))); + // Verify row boundaries are preserved. + verifySizeCompaction(a(KV_A, KV_A, KV_A, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY, + a(a(KV_A, KV_A, KV_A), a(KV_C, KV_D))); + verifySizeCompaction(a(KV_A, KV_B, KV_B, KV_C), 3, 1, OPEN_KEY, OPEN_KEY, + a(a(KV_A), a(KV_B, KV_B), a(KV_C))); + // Too much data, count limits the number of files. + verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 2, 1, OPEN_KEY, OPEN_KEY, + a(a(KV_A), a(KV_B, KV_C, KV_D))); + verifySizeCompaction(a(KV_A, KV_B, KV_C), 1, Long.MAX_VALUE, OPEN_KEY, KEY_D, + new KeyValue[][] { a(KV_A, KV_B, KV_C) }); + // Too little data/large count, no extra files. + verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), Integer.MAX_VALUE, 2, OPEN_KEY, OPEN_KEY, + a(a(KV_A, KV_B), a(KV_C, KV_D))); + } + + public static void verifySizeCompaction(KeyValue[] input, int targetCount, long targetSize, + byte[] left, byte[] right, KeyValue[][] output) throws Exception { + StoreFileWritersCapture writers = new StoreFileWritersCapture(); + StripeCompactor sc = createCompactor(writers, input); + List paths = + sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, null, + NoLimitThroughputController.INSTANCE); + assertEquals(output.length, paths.size()); + writers.verifyKvs(output, true, true); + List boundaries = new ArrayList(); + boundaries.add(left); + for (int i = 1; i < output.length; ++i) { + boundaries.add(CellUtil.cloneRow(output[i][0])); + } + boundaries.add(right); + writers.verifyBoundaries(boundaries.toArray(new byte[][] {})); + } + + private static StripeCompactor createCompactor( + StoreFileWritersCapture writers, KeyValue[] input) throws Exception { + Configuration conf = HBaseConfiguration.create(); + final Scanner scanner = new Scanner(input); + + // Create store mock that is satisfactory for compactor. + HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS); + ScanInfo si = new ScanInfo(conf, col, Long.MAX_VALUE, 0, CellComparator.COMPARATOR); + Store store = mock(Store.class); + when(store.getFamily()).thenReturn(col); + when(store.getScanInfo()).thenReturn(si); + when(store.areWritesEnabled()).thenReturn(true); + when(store.getFileSystem()).thenReturn(mock(FileSystem.class)); + when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME)); + when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), + anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); + when(store.getComparator()).thenReturn(CellComparator.COMPARATOR); + + return new StripeCompactor(conf, store) { + @Override + protected InternalScanner createScanner(Store store, List scanners, + long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, + byte[] dropDeletesToRow) throws IOException { + return scanner; + } + + @Override + protected InternalScanner createScanner(Store store, List scanners, + ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { + return scanner; + } + }; + } +} -- 1.9.1