From 6740ebc24f03338d5250ef018d4d952bb18469d8 Mon Sep 17 00:00:00 2001 From: anastas Date: Mon, 19 Sep 2016 10:59:42 +0300 Subject: [PATCH] My squashed commits --- .../hbase/regionserver/CompactingMemStore.java | 7 +- .../hbase/regionserver/CompactionPipeline.java | 16 +- .../hadoop/hbase/regionserver/HeapMemStoreLAB.java | 14 +- .../hbase/regionserver/ImmutableSegment.java | 24 +- .../hbase/regionserver/MemStoreCompactor.java | 206 ++++++----- .../regionserver/MemStoreCompactorIterator.java | 42 ++- .../hadoop/hbase/regionserver/SegmentFactory.java | 37 +- .../hbase/regionserver/TestCompactingMemStore.java | 13 + .../TestCompactingToCellArrayMapMemStore.java | 37 +- .../regionserver/TestPerColumnFamilyFlush.java | 2 +- .../TestWalAndCompactingMemStoreFlush.java | 383 ++++++++++++++------- 11 files changed, 499 insertions(+), 282 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 177f222..c2d5e37 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; /** @@ -195,9 +196,9 @@ public class CompactingMemStore extends AbstractMemStore { return list; } - public boolean swapCompactedSegments(VersionedSegmentsList versionedList, - ImmutableSegment result) { - return pipeline.swap(versionedList, result); + public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result, + boolean merge) { + return pipeline.swap(versionedList, result, !merge); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 6a13f43..2988374 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -90,9 +90,11 @@ public class CompactionPipeline { * Swapping only if there were no changes to the suffix of the list while it was compacted. * @param versionedList tail of the pipeline that was compacted * @param segment new compacted segment + * @param closeSuffix - wether to close the suffix (to release memory), as part of swapping it out * @return true iff swapped tail with new compacted segment */ - public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment) { + public boolean swap( + VersionedSegmentsList versionedList, ImmutableSegment segment, boolean closeSuffix) { if (versionedList.getVersion() != version) { return false; } @@ -108,13 +110,14 @@ public class CompactionPipeline { + versionedList.getStoreSegments().size() + ", and the number of cells in new segment is:" + segment.getCellsCount()); } - swapSuffix(suffix,segment); + swapSuffix(suffix,segment, closeSuffix); } if (region != null) { // update the global memstore size counter long suffixSize = getSegmentsKeySize(suffix); long newSize = segment.keySize(); long delta = suffixSize - newSize; + if (!closeSuffix) assert(delta>0); // sanity check long globalMemstoreSize = region.addAndGetGlobalMemstoreSize(-delta); if (LOG.isDebugEnabled()) { LOG.debug("Suffix size: " + suffixSize + " compacted item size: " + newSize @@ -204,10 +207,13 @@ public class CompactionPipeline { return pipeline.peekLast().keySize(); } - private void swapSuffix(LinkedList suffix, ImmutableSegment segment) { + private void swapSuffix(LinkedList suffix, ImmutableSegment segment, + boolean close) { version++; - for (Segment itemInSuffix : suffix) { - itemInSuffix.close(); + if (close) { + for (Segment itemInSuffix : suffix) { + itemInSuffix.close(); + } } pipeline.removeAll(suffix); pipeline.addLast(segment); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java index 3ca4b0c..726b5e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java @@ -69,6 +69,8 @@ public class HeapMemStoreLAB implements MemStoreLAB { private AtomicReference curChunk = new AtomicReference(); // A queue of chunks from pool contained by this memstore LAB + // TODO: in the future, it would be better to have List implementation instead of Queue, + // as FIFO order is not so important here @VisibleForTesting BlockingQueue pooledChunkQueue = null; private final int chunkSize; @@ -107,6 +109,14 @@ public class HeapMemStoreLAB implements MemStoreLAB { } /** + * To be used for merging multiple MSLABs + */ + public void addPooledChunkQueue(BlockingQueue targetQueue) { + if (targetQueue==null) return; + targetQueue.drainTo(pooledChunkQueue); + } + + /** * Allocate a slice of the given length. * * If the size is larger than the maximum size specified for this @@ -242,8 +252,8 @@ public class HeapMemStoreLAB implements MemStoreLAB { return this.curChunk.get(); } - @VisibleForTesting - BlockingQueue getChunkQueue() { + + public BlockingQueue getChunkQueue() { return this.pooledChunkQueue; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index 12b7916..7058f51 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -86,12 +86,13 @@ public class ImmutableSegment extends Segment { * are going to be introduced. */ protected ImmutableSegment(CellComparator comparator, MemStoreCompactorIterator iterator, - MemStoreLAB memStoreLAB, int numOfCells, Type type) { + MemStoreLAB memStoreLAB, int numOfCells, Type type, boolean merge) { + super(null, // initiailize the CellSet with NULL comparator, memStoreLAB); this.type = type; // build the true CellSet based on CellArrayMap - CellSet cs = createCellArrayMapSet(numOfCells, iterator); + CellSet cs = createCellArrayMapSet(numOfCells, iterator, merge); this.setCellSet(null, cs); // update the CellSet of the new Segment this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange(); @@ -155,7 +156,7 @@ public class ImmutableSegment extends Segment { /**------------------------------------------------------------------------ * Change the CellSet of this ImmutableSegment from one based on ConcurrentSkipListMap to one * based on CellArrayMap. - * If this ImmutableSegment is not based on ConcurrentSkipListMap , this is NOP + * If this ImmutableSegment is not based on ConcurrentSkipListMap , this is NOOP * * Synchronization of the CellSet replacement: * The reference to the CellSet is AtomicReference and is updated only when ImmutableSegment @@ -188,19 +189,26 @@ public class ImmutableSegment extends Segment { ///////////////////// PRIVATE METHODS ///////////////////// /*------------------------------------------------------------------------*/ // Create CellSet based on CellArrayMap from compacting iterator - private CellSet createCellArrayMapSet(int numOfCells, MemStoreCompactorIterator iterator) { + private CellSet createCellArrayMapSet(int numOfCells, MemStoreCompactorIterator iterator, + boolean merge) { Cell[] cells = new Cell[numOfCells]; // build the Cell Array int i = 0; while (iterator.hasNext()) { Cell c = iterator.next(); // The scanner behind the iterator is doing all the elimination logic - // now we just copy it to the new segment (also MSLAB copy) - cells[i] = maybeCloneWithAllocator(c); - boolean usedMSLAB = (cells[i] != c); + if (merge) { + // if this is merge we just move the Cell object without copying MSLAB + // the sizes still need to be updated in the new segment + cells[i] = c; + } else { + // now we just copy it to the new segment (also MSLAB copy) + cells[i] = maybeCloneWithAllocator(c); + } + boolean useMSLAB = (getMemStoreLAB()!=null); // second parameter true, because in compaction addition of the cell to new segment // is always successful - updateMetaInfo(c, true, usedMSLAB); // updates the size per cell + updateMetaInfo(c, true, useMSLAB); // updates the size per cell i++; } // build the immutable CellSet diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 714ffe3..56ed141 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -43,26 +43,22 @@ import java.util.concurrent.atomic.AtomicBoolean; public class MemStoreCompactor { public static final long DEEP_OVERHEAD = ClassSize - .align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + Bytes.SIZEOF_DOUBLE - + ClassSize.ATOMIC_BOOLEAN); + .align(ClassSize.OBJECT + + 2 * ClassSize.REFERENCE // compactingMemStore, versionedList + + 2 * Bytes.SIZEOF_INT // compactionKVMax, action + + ClassSize.ATOMIC_BOOLEAN); // isInterrupted - // Option for external guidance whether flattening is allowed - static final String MEMSTORE_COMPACTOR_FLATTENING = "hbase.hregion.compacting.memstore.flatten"; - static final boolean MEMSTORE_COMPACTOR_FLATTENING_DEFAULT = true; - - // Option for external setting of the compacted structure (SkipList, CellArray, etc.) + // The external setting of the compacting MemStore behaviour + // Compaction of the index without the data is the default static final String COMPACTING_MEMSTORE_TYPE_KEY = "hbase.hregion.compacting.memstore.type"; - static final int COMPACTING_MEMSTORE_TYPE_DEFAULT = 2; // COMPACT_TO_ARRAY_MAP as default + static final String COMPACTING_MEMSTORE_TYPE_DEFAULT = "index-compaction"; - // What percentage of the duplications is causing compaction? - static final String COMPACTION_THRESHOLD_REMAIN_FRACTION - = "hbase.hregion.compacting.memstore.comactPercent"; - static final double COMPACTION_THRESHOLD_REMAIN_FRACTION_DEFAULT = 0.2; + // constant strings explaining user's possibilities + static final String INDEX_COMPACTION_CONFIG = "index-compaction"; + static final String DATA_COMPACTION_CONFIG = "data-compaction"; - // Option for external guidance whether the flattening is allowed - static final String MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN - = "hbase.hregion.compacting.memstore.avoidSpeculativeScan"; - static final boolean MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN_DEFAULT = false; + // Maximal number of the segments in the compaction pipeline + private static final int THRESHOLD_PIPELINE_SEGMENTS = 1; private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); @@ -85,19 +81,25 @@ public class MemStoreCompactor { // the limit to the size of the groups to be later provided to MemStoreCompactorIterator private final int compactionKVMax; - double fraction = 0.8; - - int immutCellsNum = 0; // number of immutable for compaction cells + /** + * Types of actions to be done on the pipeline upon MemStoreCompaction invocation. + * Note that every value covers the previous ones, i.e. if MERGE is the action it implies + * that the youngest segment is going to be flatten anyway. + */ + private enum Action { + NOOP, + FLATTEN, // flatten the youngest segment in the pipeline + MERGE, // merge all the segments in the pipeline into one + COMPACT // copy-compact the data of all the segments in the pipeline + } - private Type type = Type.COMPACT_TO_ARRAY_MAP; + private Action action = Action.FLATTEN; public MemStoreCompactor(CompactingMemStore compactingMemStore) { this.compactingMemStore = compactingMemStore; - this.compactionKVMax = compactingMemStore.getConfiguration().getInt( - HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); - this.fraction = 1 - compactingMemStore.getConfiguration().getDouble( - COMPACTION_THRESHOLD_REMAIN_FRACTION, - COMPACTION_THRESHOLD_REMAIN_FRACTION_DEFAULT); + this.compactionKVMax = compactingMemStore.getConfiguration() + .getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); + initiateAction(); } /**---------------------------------------------------------------------- @@ -106,26 +108,20 @@ public class MemStoreCompactor { * is already an ongoing compaction or no segments to compact. */ public boolean start() throws IOException { - if (!compactingMemStore.hasImmutableSegments()) return false; // no compaction on empty - - int t = compactingMemStore.getConfiguration().getInt(COMPACTING_MEMSTORE_TYPE_KEY, - COMPACTING_MEMSTORE_TYPE_DEFAULT); - - switch (t) { - case 1: type = Type.COMPACT_TO_SKIPLIST_MAP; - break; - case 2: type = Type.COMPACT_TO_ARRAY_MAP; - break; - default: throw new RuntimeException("Unknown type " + type); // sanity check + if (!compactingMemStore.hasImmutableSegments()) { // no compaction on empty pipeline + return false; } + // for the tests that change the configuration on the fly, initiate the action once again here, + // although it was already initiated in the constructor + initiateAction(); + // get a snapshot of the list of the segments from the pipeline, // this local copy of the list is marked with specific version versionedList = compactingMemStore.getImmutableSegments(); - immutCellsNum = versionedList.getNumOfCells(); if (LOG.isDebugEnabled()) { - LOG.debug("Starting the MemStore In-Memory Shrink of type " + type + " for store " + LOG.debug("Starting the In-Memory Compaction for store " + compactingMemStore.getStore().getColumnFamilyName()); } @@ -143,6 +139,13 @@ public class MemStoreCompactor { } /**---------------------------------------------------------------------- + * The interface to check whether user requested the index-compaction + */ + public boolean isIndexCompaction() { + return (action == Action.MERGE); + } + + /**---------------------------------------------------------------------- * Close the scanners and clear the pointers in order to allow good * garbage collection */ @@ -156,41 +159,31 @@ public class MemStoreCompactor { * returns false if we must compact. If this method returns true we * still need to evaluate the compaction. */ - private boolean shouldFlatten() { - boolean userToFlatten = // the user configurable option to flatten or not to flatten - compactingMemStore.getConfiguration().getBoolean(MEMSTORE_COMPACTOR_FLATTENING, - MEMSTORE_COMPACTOR_FLATTENING_DEFAULT); - if (userToFlatten==false) { - LOG.debug("In-Memory shrink is doing compaction, as user asked to avoid flattening"); - return false; // the user doesn't want to flatten + private Action policy() { + + if (isInterrupted.get()) // if the entire process is interrupted cancel flattening + return Action.NOOP; // the compaction also doesn't start when interrupted + + if (action == Action.COMPACT) { // compact according to the user request + LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() + + " is going to be compacted, number of" + + " cells before compaction is " + versionedList.getNumOfCells()); + return Action.COMPACT; } + // compaction shouldn't happen or doesn't worth it // limit the number of the segments in the pipeline int numOfSegments = versionedList.getNumOfSegments(); - if (numOfSegments > 3) { // hard-coded for now as it is going to move to policy - LOG.debug("In-Memory shrink is doing compaction, as there already are " + numOfSegments - + " segments in the compaction pipeline"); - return false; // to avoid "too many open files later", compact now + if (numOfSegments > THRESHOLD_PIPELINE_SEGMENTS) { + LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() + + " is going to be merged, as there are " + numOfSegments + " segments"); + return Action.MERGE; // to avoid too many segments, merge now } - // till here we hvae all the signs that it is possible to flatten, run the speculative scan - // (if allowed by the user) to check the efficiency of compaction - boolean avoidSpeculativeScan = // the user configurable option to avoid the speculative scan - compactingMemStore.getConfiguration().getBoolean(MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN, - MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN_DEFAULT); - if (avoidSpeculativeScan==true) { - LOG.debug("In-Memory shrink is doing flattening, as user asked to avoid compaction " - + "evaluation"); - return true; // flatten without checking the compaction expedience - } - try { - immutCellsNum = countCellsForCompaction(); - if (immutCellsNum > fraction * versionedList.getNumOfCells()) { - return true; - } - } catch(Exception e) { - return true; - } - return false; + + // if nothing of the above, then just flatten the newly joined segment + LOG.debug("The youngest segment in the in-Memory Compaction Pipeline for store " + + compactingMemStore.getFamilyName() + " is going to be flattened"); + return Action.FLATTEN; } /**---------------------------------------------------------------------- @@ -203,24 +196,28 @@ public class MemStoreCompactor { boolean resultSwapped = false; try { - // PHASE I: estimate the compaction expedience - EVALUATE COMPACTION - if (shouldFlatten()) { - // too much cells "survive" the possible compaction, we do not want to compact! - LOG.debug("In-Memory compaction does not pay off - storing the flattened segment" - + " for store: " + compactingMemStore.getFamilyName()); - // Looking for Segment in the pipeline with SkipList index, to make it flat + Action nextStep = policy(); + switch (nextStep){ + case FLATTEN: // Youngest Segment in the pipeline is with SkipList index, make it flat compactingMemStore.flattenOneSegment(versionedList.getVersion()); + case NOOP: // intentionally falling through return; + case MERGE: + case COMPACT: + break; + default: throw new RuntimeException("Unknown action " + action); // sanity check } - // PHASE II: create the new compacted ImmutableSegment - START COPY-COMPACTION + // Create one segment representing all segments in the compaction pipeline, + // either by compaction or by merge if (!isInterrupted.get()) { - result = compact(immutCellsNum); + result = createSubstitution(); } - // Phase III: swap the old compaction pipeline - END COPY-COMPACTION + // Substitute the pipeline with one segment if (!isInterrupted.get()) { - if (resultSwapped = compactingMemStore.swapCompactedSegments(versionedList, result)) { + if (resultSwapped = compactingMemStore.swapCompactedSegments( + versionedList, result, (action==Action.MERGE))) { // update the wal so it can be truncated and not get too long compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater } @@ -237,32 +234,30 @@ public class MemStoreCompactor { } /**---------------------------------------------------------------------- - * The copy-compaction is the creation of the ImmutableSegment (from the relevant type) - * based on the Compactor Iterator. The new ImmutableSegment is returned. + * Creation of the ImmutableSegment either by merge or copy-compact of the segments of the + * pipeline, based on the Compactor Iterator. The new ImmutableSegment is returned. */ - private ImmutableSegment compact(int numOfCells) throws IOException { - - LOG.debug("In-Memory compaction does pay off - The estimated number of cells " - + "after compaction is " + numOfCells + ", while number of cells before is " + versionedList - .getNumOfCells() + ". The fraction of remaining cells should be: " + fraction); + private ImmutableSegment createSubstitution() throws IOException { ImmutableSegment result = null; MemStoreCompactorIterator iterator = new MemStoreCompactorIterator(versionedList.getStoreSegments(), compactingMemStore.getComparator(), - compactionKVMax, compactingMemStore.getStore()); + compactionKVMax, compactingMemStore.getStore(), (action==Action.COMPACT)); try { - switch (type) { - case COMPACT_TO_SKIPLIST_MAP: + switch (action) { + case COMPACT: result = SegmentFactory.instance().createImmutableSegment( - compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator); + compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, + versionedList.getNumOfCells(), ImmutableSegment.Type.ARRAY_MAP_BASED); break; - case COMPACT_TO_ARRAY_MAP: + case MERGE: result = SegmentFactory.instance().createImmutableSegment( compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, - numOfCells, ImmutableSegment.Type.ARRAY_MAP_BASED); + versionedList.getNumOfCells(), ImmutableSegment.Type.ARRAY_MAP_BASED, + versionedList.getStoreSegments()); break; - default: throw new RuntimeException("Unknown type " + type); // sanity check + default: throw new RuntimeException("Unknown action " + action); // sanity check } } finally { iterator.close(); @@ -272,24 +267,19 @@ public class MemStoreCompactor { } /**---------------------------------------------------------------------- - * Count cells to estimate the efficiency of the future compaction + * Initiate the action according to user config, after its default is Action.MERGE */ - private int countCellsForCompaction() throws IOException { - - int cnt = 0; - MemStoreCompactorIterator iterator = - new MemStoreCompactorIterator( - versionedList.getStoreSegments(), compactingMemStore.getComparator(), - compactionKVMax, compactingMemStore.getStore()); + private void initiateAction() { + String memStoreType = compactingMemStore.getConfiguration().get(COMPACTING_MEMSTORE_TYPE_KEY, + COMPACTING_MEMSTORE_TYPE_DEFAULT); - try { - while (iterator.next() != null) { - cnt++; - } - } finally { - iterator.close(); + switch (memStoreType) { + case INDEX_COMPACTION_CONFIG: action = Action.MERGE; + break; + case DATA_COMPACTION_CONFIG: action = Action.COMPACT; + break; + default: + throw new RuntimeException("Unknown memstore type " + memStoreType); // sanity check } - - return cnt; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java index 2eafb42..730db52 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java @@ -46,13 +46,14 @@ public class MemStoreCompactorIterator implements Iterator { private StoreScanner compactingScanner; private final ScannerContext scannerContext; - + private boolean useSQM; // do we need SQM for obsolete cells elimination private boolean hasMore; private Iterator kvsIterator; // C-tor - public MemStoreCompactorIterator(List segments, - CellComparator comparator, int compactionKVMax, Store store) throws IOException { + public MemStoreCompactorIterator( + List segments, CellComparator comparator, int compactionKVMax, Store store, + boolean useSQM) throws IOException { this.scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); @@ -66,31 +67,45 @@ public class MemStoreCompactorIterator implements Iterator { } scanner = new MemStoreScanner(comparator, scanners, MemStoreScanner.Type.COMPACT_FORWARD); + this.useSQM = useSQM; - // reinitialize the compacting scanner for each instance of iterator - compactingScanner = createScanner(store, scanner); + if (useSQM) { // build the scanner based on Query Matcher + // reinitialize the compacting scanner for each instance of iterator + compactingScanner = createScanner(store, scanner); - hasMore = compactingScanner.next(kvs, scannerContext); + hasMore = compactingScanner.next(kvs, scannerContext); - if (!kvs.isEmpty()) { - kvsIterator = kvs.iterator(); + if (!kvs.isEmpty()) { + kvsIterator = kvs.iterator(); + } } } @Override public boolean hasNext() { + if (!useSQM) return (scanner.peek()!=null); if (!kvsIterator.hasNext()) { // refillKVS() method should be invoked only if !kvsIterator.hasNext() if (!refillKVS()) { return false; } } - return (kvsIterator.hasNext() || hasMore); + return kvsIterator.hasNext(); } @Override public Cell next() { + if (!useSQM) { + Cell result = null; + try { // try to get next + result = scanner.next(); + } catch (IOException ie) { + throw new IllegalStateException(ie); + } + return result; + } + if (!kvsIterator.hasNext()) { // refillKVS() method should be invoked only if !kvsIterator.hasNext() if (!refillKVS()) return null; @@ -99,8 +114,10 @@ public class MemStoreCompactorIterator implements Iterator { } public void close() { - compactingScanner.close(); - compactingScanner = null; + if (useSQM) { + compactingScanner.close(); + compactingScanner = null; + } scanner.close(); scanner = null; } @@ -129,7 +146,8 @@ public class MemStoreCompactorIterator implements Iterator { } - + /* Refill kev-value set (should be invoked only when KVS is empty) + * Returns true if KVS is non-empty */ private boolean refillKVS() { kvs.clear(); // clear previous KVS, first initiated in the constructor if (!hasMore) { // if there is nothing expected next in compactingScanner diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java index 510ebbd..eabf50b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ReflectionUtils; import java.io.IOException; +import java.util.LinkedList; /** * A singleton store segment factory. @@ -51,13 +52,17 @@ public final class SegmentFactory { } // create new flat immutable segment from compacting old immutable segment - public ImmutableSegment createImmutableSegment(final Configuration conf, - final CellComparator comparator, MemStoreCompactorIterator iterator, int numOfCells, - ImmutableSegment.Type segmentType) throws IOException { - Preconditions.checkArgument(segmentType != ImmutableSegment.Type.SKIPLIST_MAP_BASED, + public ImmutableSegment createImmutableSegment( + final Configuration conf, final CellComparator comparator, + MemStoreCompactorIterator iterator, int numOfCells, ImmutableSegment.Type segmentType) + throws IOException { + Preconditions.checkArgument(segmentType == ImmutableSegment.Type.ARRAY_MAP_BASED, "wrong immutable segment type"); - return new ImmutableSegment(comparator, iterator, getMemStoreLAB(conf), numOfCells, - segmentType); + MemStoreLAB memStoreLAB = getMemStoreLAB(conf); + return + // the last parameter "false" means not to merge, but to compact the pipeline + // in order to create the new segment + new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, false); } // create empty immutable segment @@ -77,6 +82,26 @@ public final class SegmentFactory { return generateMutableSegment(conf, comparator, memStoreLAB); } + // create new flat immutable segment from merging old immutable segment + public ImmutableSegment createImmutableSegment( + final Configuration conf, final CellComparator comparator, + MemStoreCompactorIterator iterator, int numOfCells, ImmutableSegment.Type segmentType, + LinkedList segments) + throws IOException { + Preconditions.checkArgument(segmentType == ImmutableSegment.Type.ARRAY_MAP_BASED, + "wrong immutable segment type"); + MemStoreLAB memStoreLAB = getMemStoreLAB(conf); + + for (Segment s: segments){ + ((HeapMemStoreLAB)memStoreLAB).addPooledChunkQueue( + ((HeapMemStoreLAB)s.getMemStoreLAB()).getChunkQueue()); + } + + return + // the last parameter "true" means to merge the compaction pipeline + // in order to create the new segment + new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, true); + } //****** private methods to instantiate concrete store segments **********// private MutableSegment generateMutableSegment(final Configuration conf, CellComparator comparator, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 211a6d8..15c7c6d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -508,6 +508,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { @Test public void testPuttingBackChunksWithOpeningPipelineScanner() throws IOException { + + // set memstore to do data compaction and not to use the speculative scan + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); + byte[] row = Bytes.toBytes("testrow"); byte[] fam = Bytes.toBytes("testfamily"); byte[] qf1 = Bytes.toBytes("testqualifier1"); @@ -585,6 +589,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore { @Test public void testCompaction1Bucket() throws IOException { + // set memstore to do data compaction and not to use the speculative scan + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); + String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 // test 1 bucket @@ -609,6 +616,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore { @Test public void testCompaction2Buckets() throws IOException { + // set memstore to do data compaction and not to use the speculative scan + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); + String[] keys1 = { "A", "A", "B", "C" }; String[] keys2 = { "A", "B", "D" }; @@ -647,6 +657,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore { @Test public void testCompaction3Buckets() throws IOException { + // set memstore to do data compaction and not to use the speculative scan + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); + String[] keys1 = { "A", "A", "B", "C" }; String[] keys2 = { "A", "B", "D" }; String[] keys3 = { "D", "B", "B" }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java index fefe2c1..b14ed1c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java @@ -22,12 +22,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -36,9 +34,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryMXBean; -import java.util.ArrayList; import java.util.List; @@ -67,7 +62,8 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore compactingSetUp(); Configuration conf = HBaseConfiguration.create(); - conf.setLong("hbase.hregion.compacting.memstore.type", 2); // compact to CellArrayMap + // set memstore to do data compaction and not to use the speculative scan + conf.set("hbase.hregion.compacting.memstore.type", "data-compaction"); this.memstore = new CompactingMemStore(conf, CellComparator.COMPARATOR, store, @@ -221,17 +217,16 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore } ////////////////////////////////////////////////////////////////////////////// - // Flattening tests + // Merging tests ////////////////////////////////////////////////////////////////////////////// @Test - public void testFlattening() throws IOException { + public void testMerging() throws IOException { String[] keys1 = { "A", "A", "B", "C", "F", "H"}; String[] keys2 = { "A", "B", "D", "G", "I", "J"}; String[] keys3 = { "D", "B", "B", "E" }; - // set flattening to true - memstore.getConfiguration().setBoolean("hbase.hregion.compacting.memstore.flatten", true); + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "index-compaction"); addRowsByKeys(memstore, keys1); @@ -244,13 +239,31 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore addRowsByKeys(memstore, keys2); // also should only flatten + int counter2 = 0; + for ( Segment s : memstore.getSegments()) { + counter2 += s.getCellsCount(); + } + assertEquals(12, counter2); + ((CompactingMemStore) memstore).disableCompaction(); ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening assertEquals(0, memstore.getSnapshot().getCellsCount()); + int counter3 = 0; + for ( Segment s : memstore.getSegments()) { + counter3 += s.getCellsCount(); + } + assertEquals(12, counter3); + addRowsByKeys(memstore, keys3); + int counter4 = 0; + for ( Segment s : memstore.getSegments()) { + counter4 += s.getCellsCount(); + } + assertEquals(16, counter4); + ((CompactingMemStore) memstore).enableCompaction(); @@ -264,7 +277,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore for ( Segment s : memstore.getSegments()) { counter += s.getCellsCount(); } - assertEquals(10,counter); + assertEquals(16,counter); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot ImmutableSegment s = memstore.getSnapshot(); @@ -302,7 +315,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore // Just doing the cnt operation here MemStoreCompactorIterator itr = new MemStoreCompactorIterator( ((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(), - CellComparator.COMPARATOR, 10, ((CompactingMemStore) memstore).getStore()); + CellComparator.COMPARATOR, 10, ((CompactingMemStore) memstore).getStore(), true); int cnt = 0; try { while (itr.next() != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java index 6bfaa59..f0f8c39 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java @@ -130,7 +130,7 @@ public class TestPerColumnFamilyFlush { conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 100 * 1024); // Intialize the region - Region region = initHRegion("testSelectiveFlushWhenEnabled", conf); + Region region = initHRegion("testSelectiveFlushWithDataCompaction", conf); // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 for (int i = 1; i <= 1200; i++) { region.put(createPut(1, i)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java index 74826b0..9749737 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java @@ -36,13 +36,12 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; import org.junit.Test; import org.junit.experimental.categories.Category; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; /** * This test verifies the correctness of the Per Column Family flushing strategy @@ -121,7 +120,7 @@ public class TestWalAndCompactingMemStoreFlush { } @Test(timeout = 180000) - public void testSelectiveFlushWhenEnabled() throws IOException { + public void testSelectiveFlushWithDataCompaction() throws IOException { // Set up the configuration Configuration conf = HBaseConfiguration.create(); @@ -130,9 +129,12 @@ public class TestWalAndCompactingMemStoreFlush { FlushNonSloppyStoresFirstPolicy.class.getName()); conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 * 1024); conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.25); + // set memstore to do data compaction and not to use the speculative scan + conf.set("hbase.hregion.compacting.memstore.type", "data-compaction"); + // Intialize the region - Region region = initHRegion("testSelectiveFlushWhenEnabled", conf); + Region region = initHRegion("testSelectiveFlushWithDataCompaction", conf); // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 for (int i = 1; i <= 1200; i++) { @@ -311,7 +313,7 @@ public class TestWalAndCompactingMemStoreFlush { assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV); // CF3 should be bottleneck for WAL - assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); + assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); // Flush!!!!!!!!!!!!!!!!!!!!!! // Trying to clean the existing memstores, CF2 all flushed to disk. The single @@ -334,6 +336,7 @@ public class TestWalAndCompactingMemStoreFlush { assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV); + region.flush(true); // flush once again in order to be sure that everything is empty assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, region.getStore(FAMILY1).getMemStoreSize()); @@ -369,9 +372,13 @@ public class TestWalAndCompactingMemStoreFlush { HBaseTestingUtility.closeRegionAndWAL(region); } + /*------------------------------------------------------------------------------*/ + /* Check the same as above but for index-compaction type of compacting memstore */ @Test(timeout = 180000) - public void testSelectiveFlushWhenEnabledNoFlattening() throws IOException { + public void testSelectiveFlushWithIndexCompaction() throws IOException { + /*------------------------------------------------------------------------------*/ + /* SETUP */ // Set up the configuration Configuration conf = HBaseConfiguration.create(); conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024); @@ -379,18 +386,16 @@ public class TestWalAndCompactingMemStoreFlush { FlushNonSloppyStoresFirstPolicy.class.getName()); conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 * 1024); conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); + // set memstore to index-compaction + conf.set("hbase.hregion.compacting.memstore.type", "index-compaction"); + // Initialize the region + Region region = initHRegion("testSelectiveFlushWithIndexCompaction", conf); - // set memstore segment flattening to false and compact to skip-list - conf.setBoolean("hbase.hregion.compacting.memstore.flatten", false); - conf.setInt("hbase.hregion.compacting.memstore.type",1); - - // Intialize the region - Region region = initHRegion("testSelectiveFlushWhenEnabled", conf); - + /*------------------------------------------------------------------------------*/ + /* PHASE I - insertions */ // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 for (int i = 1; i <= 1200; i++) { region.put(createPut(1, i)); // compacted memstore - if (i <= 100) { region.put(createPut(2, i)); if (i <= 50) { @@ -398,41 +403,32 @@ public class TestWalAndCompactingMemStoreFlush { } } } - // Now add more puts for CF2, so that we only flush CF2 to disk for (int i = 100; i < 2000; i++) { region.put(createPut(2, i)); } - long totalMemstoreSize = region.getMemstoreSize(); - + /*------------------------------------------------------------------------------*/ + /*------------------------------------------------------------------------------*/ + /* PHASE I - collect sizes */ + long totalMemstoreSizePhaseI = region.getMemstoreSize(); // Find the smallest LSNs for edits wrt to each CF. long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1); long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2); long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3); - // Find the sizes of the memstores of each CF. long cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); long cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); long cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); - // Get the overall smallest LSN in the region's memstores. long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region) .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); - String s = "\n\n----------------------------------\n" - + "Upon initial insert and before any flush, size of CF1 is:" - + cf1MemstoreSizePhaseI + ", is CF1 compacted memstore?:" - + region.getStore(FAMILY1).getMemStore().isSloppy() + ". Size of CF2 is:" - + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:" - + region.getStore(FAMILY2).getMemStore().isSloppy() + ". Size of CF3 is:" - + cf3MemstoreSizePhaseI + ", is CF3 compacted memstore?:" - + region.getStore(FAMILY3).getMemStore().isSloppy() + "\n"; - + /*------------------------------------------------------------------------------*/ + /* PHASE I - validation */ // The overall smallest LSN in the region's memstores should be the same as // the LSN of the smallest edit in CF1 assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI); - // Some other sanity checks. assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI); assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI); @@ -442,147 +438,153 @@ public class TestWalAndCompactingMemStoreFlush { // The total memstore size should be the same as the sum of the sizes of // memstores of CF1, CF2 and CF3. - String msg = "totalMemstoreSize="+totalMemstoreSize + - " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD + - " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI + - " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI + - " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ; - assertEquals(msg, - totalMemstoreSize + 2 * (CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD) - + (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD), + assertEquals(totalMemstoreSizePhaseI + 3 * DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI); - // Flush!!!!!!!!!!!!!!!!!!!!!! - // We have big compacting memstore CF1 and two small memstores: - // CF2 (not compacted) and CF3 (compacting) - // All together they are above the flush size lower bound. - // Since CF1 and CF3 should be flushed to memory (not to disk), - // CF2 is going to be flushed to disk. - // CF1 - nothing to compact, CF3 - should be twice compacted + /*------------------------------------------------------------------------------*/ + /* PHASE I - Flush */ + // First Flush in Test!!!!!!!!!!!!!!!!!!!!!! + // CF1, CF2, CF3, all together they are above the flush size lower bound. + // Since CF1 and CF3 are compacting, CF2 is going to be flushed to disk. + // CF1 and CF3 - flushed to memory and flatten explicitly + region.flush(false); ((CompactingMemStore) region.getStore(FAMILY1).getMemStore()).flushInMemory(); ((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory(); - region.flush(false); + // CF3/CF1 should be merged so wait here to be sure the compaction is done + while (((CompactingMemStore) region.getStore(FAMILY1).getMemStore()) + .isMemStoreFlushingInMemory()) + Threads.sleep(10); + while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore()) + .isMemStoreFlushingInMemory()) + Threads.sleep(10); + + /*------------------------------------------------------------------------------*/ + /*------------------------------------------------------------------------------*/ + /* PHASE II - collect sizes */ // Recalculate everything long cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); long cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize(); - long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region) .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); // Find the smallest LSNs for edits wrt to each CF. - long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); - long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); + long totalMemstoreSizePhaseII = region.getMemstoreSize(); - s = s + "DefaultMemStore DEEP_OVERHEAD is:" + DefaultMemStore.DEEP_OVERHEAD - + ", CompactingMemStore DEEP_OVERHEAD is:" + CompactingMemStore.DEEP_OVERHEAD - + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n" - + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII - + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n"; - - // CF1 was flushed to memory, but there is nothing to compact, should - // remain the same size plus renewed empty skip-list - assertEquals(s, cf1MemstoreSizePhaseII, cf1MemstoreSizePhaseI - + ImmutableSegment.DEEP_OVERHEAD_CAM + CompactionPipeline.ENTRY_OVERHEAD); - + /*------------------------------------------------------------------------------*/ + /* PHASE II - validation */ + // CF1 was flushed to memory, should be flattened and take less space + assertTrue(cf1MemstoreSizePhaseII < cf1MemstoreSizePhaseI); // CF2 should become empty - assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, - cf2MemstoreSizePhaseII); - - // verify that CF3 was flushed to memory and was compacted (this is approximation check) - assertTrue(cf3MemstoreSizePhaseI / 2 + CompactingMemStore.DEEP_OVERHEAD - + ImmutableSegment.DEEP_OVERHEAD_CAM - + CompactionPipeline.ENTRY_OVERHEAD > cf3MemstoreSizePhaseII); - assertTrue(cf3MemstoreSizePhaseI/2 < cf3MemstoreSizePhaseII); - + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseII); + // verify that CF3 was flushed to memory and was not compacted (this is an approximation check) + // if compacted CF# should be at least twice less because its every key was duplicated + assertTrue(cf3MemstoreSizePhaseI / 2 < cf3MemstoreSizePhaseII); // Now the smallest LSN in the region should be the same as the smallest // LSN in the memstore of CF1. assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI); - + // The total memstore size should be the same as the sum of the sizes of + // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline + // items in CF1/2 + assertEquals( + totalMemstoreSizePhaseII + 3 * DefaultMemStore.DEEP_OVERHEAD + + 2 * ImmutableSegment.DEEP_OVERHEAD_CAM, + cf1MemstoreSizePhaseII + cf2MemstoreSizePhaseII + cf3MemstoreSizePhaseII); + + /*------------------------------------------------------------------------------*/ + /*------------------------------------------------------------------------------*/ + /* PHASE III - insertions */ // Now add more puts for CF1, so that we also flush CF1 to disk instead of - // memory in next flush - for (int i = 1200; i < 2000; i++) { + // memory in next flush. This is causing the CF! to be flushed to memory twice. + for (int i = 1200; i < 8000; i++) { region.put(createPut(1, i)); } - s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseII - + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", " + - "the smallest sequence in CF2:" - + smallestSeqCF2PhaseII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseII + "\n"; + // CF1 should be flatten and merged so wait here to be sure the compaction is done + while (((CompactingMemStore) region.getStore(FAMILY1).getMemStore()) + .isMemStoreFlushingInMemory()) + Threads.sleep(10); - // How much does the CF1 memstore occupy? Will be used later. + /*------------------------------------------------------------------------------*/ + /* PHASE III - collect sizes */ + // How much does the CF1 memstore occupy now? Will be used later. long cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize(); - long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); + long totalMemstoreSizePhaseIII = region.getMemstoreSize(); - s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII - + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n" ; - - - // Flush!!!!!!!!!!!!!!!!!!!!!! - // Flush again, CF1 is flushed to disk - // CF2 is flushed to disk, because it is not in-memory compacted memstore - // CF3 is flushed empty to memory (actually nothing happens to CF3) + /*------------------------------------------------------------------------------*/ + /* PHASE III - validation */ + // The total memstore size should be the same as the sum of the sizes of + // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline + // items in CF1/2 + assertEquals( + totalMemstoreSizePhaseIII + 3 * DefaultMemStore.DEEP_OVERHEAD + + 2 * ImmutableSegment.DEEP_OVERHEAD_CAM, + cf1MemstoreSizePhaseIII + cf2MemstoreSizePhaseII + cf3MemstoreSizePhaseII); + + /*------------------------------------------------------------------------------*/ + /* PHASE III - Flush */ + // Second Flush in Test!!!!!!!!!!!!!!!!!!!!!! + // CF1 is flushed to disk, but not entirely emptied. + // CF2 was and remained empty, same way nothing happens to CF3 region.flush(false); + /*------------------------------------------------------------------------------*/ + /*------------------------------------------------------------------------------*/ + /* PHASE IV - collect sizes */ // Recalculate everything long cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize(); long cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize(); long cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize(); - long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region) .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); - long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1); - long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2); long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); - s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:" - + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV - + "\n"; - - s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV - + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " + - "the smallest sequence in CF2:" - + smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIV - + "\n"; - - // CF1's pipeline component (inserted before first flush) should be flushed to disk - // CF2 should be flushed to disk - assertEquals(cf1MemstoreSizePhaseIII - cf1MemstoreSizePhaseI + CompactingMemStore.DEEP_OVERHEAD - + MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseIV); + /*------------------------------------------------------------------------------*/ + /* PHASE IV - validation */ + // CF1's biggest pipeline component (inserted before first flush) should be flushed to disk + // CF2 should remain empty + assertEquals( + cf1MemstoreSizePhaseIII - cf1MemstoreSizePhaseI + CompactingMemStore.DEEP_OVERHEAD + + MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseIV); assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV); - // CF3 shouldn't have been touched. assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII); - // the smallest LSN of CF3 shouldn't change assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV); - // CF3 should be bottleneck for WAL - assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); + assertEquals(smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV); - // Flush!!!!!!!!!!!!!!!!!!!!!! - // Clearing the existing memstores, CF2 all flushed to disk. The single - // memstore segment in the compaction pipeline of CF1 and CF3 should be flushed to disk. - // Note that active sets of CF1 and CF3 are empty + /*------------------------------------------------------------------------------*/ + /* PHASE IV - Flush */ + // Third Flush in Test!!!!!!!!!!!!!!!!!!!!!! + // Force flush to disk on all memstores (flush parameter true). + // CF1/CF3 all flushed to disk. Note that active sets of CF1 and CF3 are empty region.flush(true); + /*------------------------------------------------------------------------------*/ + /*------------------------------------------------------------------------------*/ + /* PHASE V - collect sizes */ // Recalculate everything long cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize(); long cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize(); long cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize(); - long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region) - .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long smallestSeqInRegionCurrentMemstorePhaseV = + getWAL(region).getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long totalMemstoreSizePhaseV = region.getMemstoreSize(); + /*------------------------------------------------------------------------------*/ + /* PHASE V - validation */ assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV); assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV); assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV); - + // The total memstores size should be empty + assertEquals(totalMemstoreSizePhaseV, 0); // Because there is nothing in any memstore the WAL's LSN should be -1 assertEquals(smallestSeqInRegionCurrentMemstorePhaseV, HConstants.NO_SEQNUM); @@ -590,6 +592,9 @@ public class TestWalAndCompactingMemStoreFlush { // any Column Family above the threshold? // In that case, we should flush all the CFs. + /*------------------------------------------------------------------------------*/ + /*------------------------------------------------------------------------------*/ + /* PHASE VI - insertions */ // The memstore limit is 200*1024 and the column family flush threshold is // around 50*1024. We try to just hit the memstore limit with each CF's // memstore being below the CF flush threshold. @@ -601,24 +606,33 @@ public class TestWalAndCompactingMemStoreFlush { region.put(createPut(5, i)); } - region.flush(false); + long cf1ActiveSizePhaseVI = region.getStore(FAMILY1).getMemStoreSize(); + long cf3ActiveSizePhaseVI = region.getStore(FAMILY3).getMemStoreSize(); + long cf5ActiveSizePhaseVI = region.getStore(FAMILIES[4]).getMemStoreSize(); - s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: " - + smallestSeqInRegionCurrentMemstorePhaseV - + ". After additional inserts and last flush, the entire region size is:" + region - .getMemstoreSize() - + "\n----------------------------------\n"; + /*------------------------------------------------------------------------------*/ + /* PHASE VI - Flush */ + // Fourth Flush in Test!!!!!!!!!!!!!!!!!!!!!! + // None among compacting memstores was flushed to memory due to previous puts. + // But is going to be moved to pipeline and flatten due to the flush. + region.flush(false); // Since we won't find any CF above the threshold, and hence no specific // store to flush, we should flush all the memstores - // Also compacted memstores are flushed to disk. - assertEquals(0, region.getMemstoreSize()); - System.out.println(s); + // Also compacted memstores are flushed to disk, but not entirely emptied + long cf1ActiveSizePhaseVII = region.getStore(FAMILY1).getMemStoreSize(); + long cf3ActiveSizePhaseVII = region.getStore(FAMILY3).getMemStoreSize(); + long cf5ActiveSizePhaseVII = region.getStore(FAMILIES[4]).getMemStoreSize(); + + assertTrue(cf1ActiveSizePhaseVII < cf1ActiveSizePhaseVI); + assertTrue(cf3ActiveSizePhaseVII < cf3ActiveSizePhaseVI); + assertTrue(cf5ActiveSizePhaseVII < cf5ActiveSizePhaseVI); + HBaseTestingUtility.closeRegionAndWAL(region); } @Test(timeout = 180000) - public void testSelectiveFlushWhenEnabledAndWALinCompaction() throws IOException { + public void testSelectiveFlushAndWALinDataCompaction() throws IOException { // Set up the configuration Configuration conf = HBaseConfiguration.create(); conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024); @@ -627,9 +641,11 @@ public class TestWalAndCompactingMemStoreFlush { conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 * 1024); conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); + // set memstore to do data compaction and not to use the speculative scan + conf.set("hbase.hregion.compacting.memstore.type", "data-compaction"); // Intialize the HRegion - HRegion region = initHRegion("testSelectiveFlushWhenNotEnabled", conf); + HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf); // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 for (int i = 1; i <= 1200; i++) { region.put(createPut(1, i)); @@ -744,6 +760,123 @@ public class TestWalAndCompactingMemStoreFlush { HBaseTestingUtility.closeRegionAndWAL(region); } + @Test(timeout = 180000) + public void testSelectiveFlushAndWALinIndexCompaction() throws IOException { + // Set up the configuration + Configuration conf = HBaseConfiguration.create(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, + FlushNonSloppyStoresFirstPolicy.class.getName()); + conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 200 * 1024); + conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5); + // set memstore to do data compaction and not to use the speculative scan + conf.set("hbase.hregion.compacting.memstore.type", "index-compaction"); + + // Intialize the HRegion + HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf); + // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 + for (int i = 1; i <= 1200; i++) { + region.put(createPut(1, i)); + if (i <= 100) { + region.put(createPut(2, i)); + if (i <= 50) { + region.put(createPut(3, i)); + } + } + } + // Now add more puts for CF2, so that we only flush CF2 to disk + for (int i = 100; i < 2000; i++) { + region.put(createPut(2, i)); + } + + long totalMemstoreSize = region.getMemstoreSize(); + + // Find the sizes of the memstores of each CF. + long cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); + long cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); + long cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); + + // Some other sanity checks. + assertTrue(cf1MemstoreSizePhaseI > 0); + assertTrue(cf2MemstoreSizePhaseI > 0); + assertTrue(cf3MemstoreSizePhaseI > 0); + + // The total memstore size should be the same as the sum of the sizes of + // memstores of CF1, CF2 and CF3. + assertEquals(totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, + cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI); + + // Flush! + ((CompactingMemStore) region.getStore(FAMILY1).getMemStore()).flushInMemory(); + ((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory(); + // CF1 and CF3 should be compacted so wait here to be sure the compaction is done + while (((CompactingMemStore) region.getStore(FAMILY1).getMemStore()) + .isMemStoreFlushingInMemory()) + Threads.sleep(10); + while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore()) + .isMemStoreFlushingInMemory()) + Threads.sleep(10); + region.flush(false); + + long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); + + long smallestSeqInRegionCurrentMemstorePhaseII = + region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1); + long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2); + long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); + + // CF2 should have been cleared + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseII); + + // Add same entries to compact them later + for (int i = 1; i <= 1200; i++) { + region.put(createPut(1, i)); + if (i <= 100) { + region.put(createPut(2, i)); + if (i <= 50) { + region.put(createPut(3, i)); + } + } + } + // Now add more puts for CF2, so that we only flush CF2 to disk + for (int i = 100; i < 2000; i++) { + region.put(createPut(2, i)); + } + + long smallestSeqInRegionCurrentMemstorePhaseIII = + region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1); + long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2); + long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3); + + // Flush! + ((CompactingMemStore) region.getStore(FAMILY1).getMemStore()).flushInMemory(); + ((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory(); + // CF1 and CF3 should be compacted so wait here to be sure the compaction is done + while (((CompactingMemStore) region.getStore(FAMILY1).getMemStore()) + .isMemStoreFlushingInMemory()) + Threads.sleep(10); + while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore()) + .isMemStoreFlushingInMemory()) + Threads.sleep(10); + region.flush(false); + + long smallestSeqInRegionCurrentMemstorePhaseIV = + region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1); + long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2); + long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3); + + // now check that the LSN of the entire WAL, of CF1 and of CF3 has NOT progressed due to merge + assertFalse( + smallestSeqInRegionCurrentMemstorePhaseIV > smallestSeqInRegionCurrentMemstorePhaseIII); + assertFalse(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII); + assertFalse(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII); + + HBaseTestingUtility.closeRegionAndWAL(region); + } + private WAL getWAL(Region region) { return ((HRegion)region).getWAL(); } -- 1.8.5.2 (Apple Git-48)