From 08ef3304013d9d6bd57baffbcae468625fe89ca5 Mon Sep 17 00:00:00 2001 From: anastas Date: Mon, 12 Sep 2016 15:31:59 +0300 Subject: [PATCH] My squashed commits --- .../hbase/regionserver/CompactingMemStore.java | 15 +- .../hbase/regionserver/CompactionPipeline.java | 16 +- .../hadoop/hbase/regionserver/HeapMemStoreLAB.java | 13 +- .../hbase/regionserver/ImmutableSegment.java | 21 ++- .../hbase/regionserver/MemStoreCompactor.java | 197 +++++++++------------ .../regionserver/MemStoreCompactorIterator.java | 5 +- .../hadoop/hbase/regionserver/SegmentFactory.java | 32 +++- .../hbase/regionserver/TestCompactingMemStore.java | 13 ++ .../TestCompactingToCellArrayMapMemStore.java | 12 +- 9 files changed, 184 insertions(+), 140 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 504ddab..4ba0378 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; /** @@ -160,7 +161,13 @@ public class CompactingMemStore extends AbstractMemStore { + getRegionServices().getRegionInfo().getRegionNameAsString() + "store: " + getFamilyName()); } - stopCompaction(); + if (!compactor.isIndexCompaction()){ // if there is ongoing time-consuming data-compaction + stopCompaction(); // interrupt and stop it + } else { + while (inMemoryFlushInProgress.get()) { + Threads.sleep(10); // if there is ongoing short-term index-compaction, + } // busy-wait till iti s finished + } pushActiveToPipeline(active); snapshotId = EnvironmentEdgeManager.currentTime(); pushTailToSnapshot(); @@ -204,9 +211,9 @@ public class CompactingMemStore extends AbstractMemStore { return list; } - public boolean swapCompactedSegments(VersionedSegmentsList versionedList, - ImmutableSegment result) { - return pipeline.swap(versionedList, result); + public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result, + boolean merge) { + return pipeline.swap(versionedList, result, merge); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index e0ba8c3..849b67b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -83,9 +83,11 @@ public class CompactionPipeline { * Swapping only if there were no changes to the suffix of the list while it was compacted. * @param versionedList tail of the pipeline that was compacted * @param segment new compacted segment + * @param merge * @return true iff swapped tail with new compacted segment */ - public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment) { + public boolean swap( + VersionedSegmentsList versionedList, ImmutableSegment segment, boolean merge) { if (versionedList.getVersion() != version) { return false; } @@ -101,13 +103,14 @@ public class CompactionPipeline { + versionedList.getStoreSegments().size() + ", and the number of cells in new segment is:" + segment.getCellsCount()); } - swapSuffix(suffix,segment); + swapSuffix(suffix,segment, merge); } if (region != null) { // update the global memstore size counter long suffixSize = CompactingMemStore.getSegmentsSize(suffix); long newSize = CompactingMemStore.getSegmentSize(segment); long delta = suffixSize - newSize; + if (merge) assert(delta==0); // sanity check long globalMemstoreSize = region.addAndGetGlobalMemstoreSize(-delta); if (LOG.isDebugEnabled()) { LOG.debug("Suffix size: " + suffixSize + " compacted item size: " + newSize @@ -189,10 +192,13 @@ public class CompactionPipeline { return CompactingMemStore.getSegmentSize(pipeline.peekLast()); } - private void swapSuffix(LinkedList suffix, ImmutableSegment segment) { + private void swapSuffix(LinkedList suffix, ImmutableSegment segment, + boolean merge) { version++; - for(Segment itemInSuffix : suffix) { - itemInSuffix.close(); + if (!merge) { + for (Segment itemInSuffix : suffix) { + itemInSuffix.close(); + } } pipeline.removeAll(suffix); pipeline.addLast(segment); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java index 3ca4b0c..69c8af3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java @@ -69,6 +69,8 @@ public class HeapMemStoreLAB implements MemStoreLAB { private AtomicReference curChunk = new AtomicReference(); // A queue of chunks from pool contained by this memstore LAB + // TODO: in the future, it would be better to have List implementation instead of Queue, + // as FIFO order is not so important here @VisibleForTesting BlockingQueue pooledChunkQueue = null; private final int chunkSize; @@ -107,6 +109,13 @@ public class HeapMemStoreLAB implements MemStoreLAB { } /** + * To be used for merging multiple MSLABs + */ + public void addPooledChunkQueue(BlockingQueue targetQueue) { + targetQueue.drainTo(pooledChunkQueue); + } + + /** * Allocate a slice of the given length. * * If the size is larger than the maximum size specified for this @@ -242,8 +251,8 @@ public class HeapMemStoreLAB implements MemStoreLAB { return this.curChunk.get(); } - @VisibleForTesting - BlockingQueue getChunkQueue() { + + public BlockingQueue getChunkQueue() { return this.pooledChunkQueue; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index 28f14d5..5ffed02 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -79,7 +79,7 @@ public class ImmutableSegment extends Segment { * are going to be introduced. */ protected ImmutableSegment(CellComparator comparator, MemStoreCompactorIterator iterator, - MemStoreLAB memStoreLAB, int numOfCells, Type type) { + MemStoreLAB memStoreLAB, int numOfCells, Type type, boolean doMerge) { super(null, // initiailize the CellSet with NULL comparator, memStoreLAB, @@ -88,7 +88,7 @@ public class ImmutableSegment extends Segment { ClassSize.CELL_ARRAY_MAP_ENTRY); // build the true CellSet based on CellArrayMap - CellSet cs = createCellArrayMapSet(numOfCells, iterator); + CellSet cs = createCellArrayMapSet(numOfCells, iterator, doMerge); this.setCellSet(null, cs); // update the CellSet of the new Segment this.type = type; @@ -194,19 +194,26 @@ public class ImmutableSegment extends Segment { ///////////////////// PRIVATE METHODS ///////////////////// /*------------------------------------------------------------------------*/ // Create CellSet based on CellArrayMap from compacting iterator - private CellSet createCellArrayMapSet(int numOfCells, MemStoreCompactorIterator iterator) { + private CellSet createCellArrayMapSet(int numOfCells, MemStoreCompactorIterator iterator, + boolean doMerge) { Cell[] cells = new Cell[numOfCells]; // build the Cell Array int i = 0; while (iterator.hasNext()) { Cell c = iterator.next(); // The scanner behind the iterator is doing all the elimination logic - // now we just copy it to the new segment (also MSLAB copy) - cells[i] = maybeCloneWithAllocator(c); - boolean usedMSLAB = (cells[i] != c); + if (doMerge) { + // if this is merge we just move the Cell object without copying MSLAB + // the sizes still need to be updated in the new segment + cells[i] = c; + } else { + // now we just copy it to the new segment (also MSLAB copy) + cells[i] = maybeCloneWithAllocator(c); + } + boolean useMSLAB = (getMemStoreLAB()!=null); // second parameter true, because in compaction addition of the cell to new segment // is always successful - updateMetaInfo(c, true, usedMSLAB); // updates the size per cell + updateMetaInfo(c, true, useMSLAB); // updates the size per cell i++; } // build the immutable CellSet diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 470dc9c..05bed4f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -40,23 +40,13 @@ import java.util.concurrent.atomic.AtomicBoolean; @InterfaceAudience.Private class MemStoreCompactor { - // Option for external guidance whether flattening is allowed - static final String MEMSTORE_COMPACTOR_FLATTENING = "hbase.hregion.compacting.memstore.flatten"; - static final boolean MEMSTORE_COMPACTOR_FLATTENING_DEFAULT = true; - - // Option for external setting of the compacted structure (SkipList, CellArray, etc.) + // The external setting of the compacting MemStore behaviour + // Compaction of the index without the data is the default static final String COMPACTING_MEMSTORE_TYPE_KEY = "hbase.hregion.compacting.memstore.type"; - static final int COMPACTING_MEMSTORE_TYPE_DEFAULT = 2; // COMPACT_TO_ARRAY_MAP as default - - // What percentage of the duplications is causing compaction? - static final String COMPACTION_THRESHOLD_REMAIN_FRACTION - = "hbase.hregion.compacting.memstore.comactPercent"; - static final double COMPACTION_THRESHOLD_REMAIN_FRACTION_DEFAULT = 0.2; + static final String COMPACTING_MEMSTORE_TYPE_DEFAULT = "index-compaction"; - // Option for external guidance whether the flattening is allowed - static final String MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN - = "hbase.hregion.compacting.memstore.avoidSpeculativeScan"; - static final boolean MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN_DEFAULT = false; + // Maximal number of the segments in the compaction pipeline + private static final int THRESHOLD_PIPELINE_SEGMENTS = 1; private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); private CompactingMemStore compactingMemStore; @@ -70,26 +60,25 @@ class MemStoreCompactor { // the limit to the size of the groups to be later provided to MemStoreCompactorIterator private final int compactionKVMax; - double fraction = 0.8; - - int immutCellsNum = 0; // number of immutable for compaction cells /** - * Types of Compaction + * Types of actions to be done on the pipeline upon MemStoreCompaction invocation. + * Note that every value covers the previous ones, i.e. if MERGE is the action it implies + * that the youngest segment is going to be flatten anyway. */ - private enum Type { - COMPACT_TO_SKIPLIST_MAP, - COMPACT_TO_ARRAY_MAP + private enum Action { + NOP, + FLATTEN, // flatten the youngest segment in the pipeline + MERGE, // merge all the segments in the pipeline into one + COMPACT // copy-compact the data of all the segments in the pipeline } - private Type type = Type.COMPACT_TO_ARRAY_MAP; + private Action action = Action.FLATTEN; public MemStoreCompactor(CompactingMemStore compactingMemStore) { this.compactingMemStore = compactingMemStore; - this.compactionKVMax = compactingMemStore.getConfiguration().getInt( - HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); - this.fraction = 1 - compactingMemStore.getConfiguration().getDouble( - COMPACTION_THRESHOLD_REMAIN_FRACTION, - COMPACTION_THRESHOLD_REMAIN_FRACTION_DEFAULT); + this.compactionKVMax = compactingMemStore.getConfiguration() + .getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); + initiateAction(); } /**---------------------------------------------------------------------- @@ -98,26 +87,20 @@ class MemStoreCompactor { * is already an ongoing compaction or no segments to compact. */ public boolean start() throws IOException { - if (!compactingMemStore.hasImmutableSegments()) return false; // no compaction on empty - - int t = compactingMemStore.getConfiguration().getInt(COMPACTING_MEMSTORE_TYPE_KEY, - COMPACTING_MEMSTORE_TYPE_DEFAULT); - - switch (t) { - case 1: type = Type.COMPACT_TO_SKIPLIST_MAP; - break; - case 2: type = Type.COMPACT_TO_ARRAY_MAP; - break; - default: throw new RuntimeException("Unknown type " + type); // sanity check + if (!compactingMemStore.hasImmutableSegments()) { // no compaction on empty pipeline + return false; } + // for the tests that change the configuration on the fly, initiate the action once again here, + // although it was already initiated in the constructor + initiateAction(); + // get a snapshot of the list of the segments from the pipeline, // this local copy of the list is marked with specific version versionedList = compactingMemStore.getImmutableSegments(); - immutCellsNum = versionedList.getNumOfCells(); if (LOG.isDebugEnabled()) { - LOG.debug("Starting the MemStore In-Memory Shrink of type " + type + " for store " + LOG.debug("Starting the In-Memory Compaction for store " + compactingMemStore.getStore().getColumnFamilyName()); } @@ -135,6 +118,13 @@ class MemStoreCompactor { } /**---------------------------------------------------------------------- + * The interface to check whether user requested the index-compaction + */ + public boolean isIndexCompaction() { + return (action == Action.MERGE); + } + + /**---------------------------------------------------------------------- * Close the scanners and clear the pointers in order to allow good * garbage collection */ @@ -148,41 +138,31 @@ class MemStoreCompactor { * returns false if we must compact. If this method returns true we * still need to evaluate the compaction. */ - private boolean shouldFlatten() { - boolean userToFlatten = // the user configurable option to flatten or not to flatten - compactingMemStore.getConfiguration().getBoolean(MEMSTORE_COMPACTOR_FLATTENING, - MEMSTORE_COMPACTOR_FLATTENING_DEFAULT); - if (userToFlatten==false) { - LOG.debug("In-Memory shrink is doing compaction, as user asked to avoid flattening"); - return false; // the user doesn't want to flatten + private Action policy() { + + if (isInterrupted.get()) // if the entire process is interrupted cancel flattening + return Action.NOP; // the compaction also doesn't start when interrupted + + if (action == Action.COMPACT) { // compact according to the user request + LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() + + " is going to be compacted, number of" + + " cells before compaction is " + versionedList.getNumOfCells()); + return Action.COMPACT; } + // compaction shouldn't happen or doesn't worth it // limit the number of the segments in the pipeline int numOfSegments = versionedList.getNumOfSegments(); - if (numOfSegments > 3) { // hard-coded for now as it is going to move to policy - LOG.debug("In-Memory shrink is doing compaction, as there already are " + numOfSegments - + " segments in the compaction pipeline"); - return false; // to avoid "too many open files later", compact now - } - // till here we hvae all the signs that it is possible to flatten, run the speculative scan - // (if allowed by the user) to check the efficiency of compaction - boolean avoidSpeculativeScan = // the user configurable option to avoid the speculative scan - compactingMemStore.getConfiguration().getBoolean(MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN, - MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN_DEFAULT); - if (avoidSpeculativeScan==true) { - LOG.debug("In-Memory shrink is doing flattening, as user asked to avoid compaction " - + "evaluation"); - return true; // flatten without checking the compaction expedience + if (numOfSegments > THRESHOLD_PIPELINE_SEGMENTS) { + LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() + + " is going to be merged, as there are " + numOfSegments + " segments"); + return Action.MERGE; // to avoid too many segments, merge now } - try { - immutCellsNum = countCellsForCompaction(); - if (immutCellsNum > fraction * versionedList.getNumOfCells()) { - return true; - } - } catch(Exception e) { - return true; - } - return false; + + // if nothing of the above, then just flatten the newly joined segment + LOG.debug("The youngest segment in the in-Memory Compaction Pipeline for store " + + compactingMemStore.getFamilyName() + " is going to be flattened"); + return Action.FLATTEN; } /**---------------------------------------------------------------------- @@ -195,24 +175,28 @@ class MemStoreCompactor { boolean resultSwapped = false; try { - // PHASE I: estimate the compaction expedience - EVALUATE COMPACTION - if (shouldFlatten()) { - // too much cells "survive" the possible compaction, we do not want to compact! - LOG.debug("In-Memory compaction does not pay off - storing the flattened segment" - + " for store: " + compactingMemStore.getFamilyName()); - // Looking for Segment in the pipeline with SkipList index, to make it flat + Action nextStep = policy(); + switch (nextStep){ + case FLATTEN: // Youngest Segment in the pipeline is with SkipList index, make it flat compactingMemStore.flattenOneSegment(versionedList.getVersion()); + case NOP: // intentionally falling through return; + case MERGE: + case COMPACT: + break; + default: throw new RuntimeException("Unknown action " + action); // sanity check } - // PHASE II: create the new compacted ImmutableSegment - START COPY-COMPACTION + // Create one segment representing all segments in the compaction pipeline, + // either by compaction or by merge if (!isInterrupted.get()) { - result = compact(immutCellsNum); + result = createSubstitution(); } - // Phase III: swap the old compaction pipeline - END COPY-COMPACTION + // Substitute the pipeline with one segment if (!isInterrupted.get()) { - if (resultSwapped = compactingMemStore.swapCompactedSegments(versionedList, result)) { + if (resultSwapped = compactingMemStore.swapCompactedSegments( + versionedList, result, (action==Action.MERGE))) { // update the wal so it can be truncated and not get too long compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater } @@ -229,14 +213,10 @@ class MemStoreCompactor { } /**---------------------------------------------------------------------- - * The copy-compaction is the creation of the ImmutableSegment (from the relevant type) - * based on the Compactor Iterator. The new ImmutableSegment is returned. + * Creation of the ImmutableSegment either by merge or copy-compact of the segments of the + * pipeline, based on the Compactor Iterator. The new ImmutableSegment is returned. */ - private ImmutableSegment compact(int numOfCells) throws IOException { - - LOG.debug("In-Memory compaction does pay off - The estimated number of cells " - + "after compaction is " + numOfCells + ", while number of cells before is " + versionedList - .getNumOfCells() + ". The fraction of remaining cells should be: " + fraction); + private ImmutableSegment createSubstitution() throws IOException { ImmutableSegment result = null; MemStoreCompactorIterator iterator = @@ -244,17 +224,19 @@ class MemStoreCompactor { compactingMemStore.getComparator(), compactionKVMax, compactingMemStore.getStore()); try { - switch (type) { - case COMPACT_TO_SKIPLIST_MAP: + switch (action) { + case COMPACT: result = SegmentFactory.instance().createImmutableSegment( - compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator); + compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, + versionedList.getNumOfCells(), ImmutableSegment.Type.ARRAY_MAP_BASED); break; - case COMPACT_TO_ARRAY_MAP: + case MERGE: result = SegmentFactory.instance().createImmutableSegment( compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, - numOfCells, ImmutableSegment.Type.ARRAY_MAP_BASED); + versionedList.getNumOfCells(), ImmutableSegment.Type.ARRAY_MAP_BASED, + versionedList.getStoreSegments()); break; - default: throw new RuntimeException("Unknown type " + type); // sanity check + default: throw new RuntimeException("Unknown action " + action); // sanity check } } finally { iterator.close(); @@ -264,24 +246,19 @@ class MemStoreCompactor { } /**---------------------------------------------------------------------- - * Count cells to estimate the efficiency of the future compaction + * Initiate the action according to user config, after its default is Action.MERGE */ - private int countCellsForCompaction() throws IOException { - - int cnt = 0; - MemStoreCompactorIterator iterator = - new MemStoreCompactorIterator( - versionedList.getStoreSegments(), compactingMemStore.getComparator(), - compactionKVMax, compactingMemStore.getStore()); + private void initiateAction() { + String memStoreType = compactingMemStore.getConfiguration().get(COMPACTING_MEMSTORE_TYPE_KEY, + COMPACTING_MEMSTORE_TYPE_DEFAULT); - try { - while (iterator.next() != null) { - cnt++; - } - } finally { - iterator.close(); + switch (memStoreType) { + case "index-compaction": action = Action.MERGE; + break; + case "data-compaction": action = Action.COMPACT; + break; + default: + throw new RuntimeException("Unknown memstore type " + memStoreType); // sanity check } - - return cnt; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java index 2eafb42..af7f7e4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java @@ -86,7 +86,7 @@ public class MemStoreCompactorIterator implements Iterator { return false; } } - return (kvsIterator.hasNext() || hasMore); + return kvsIterator.hasNext(); } @Override @@ -129,7 +129,8 @@ public class MemStoreCompactorIterator implements Iterator { } - + /* Refill kev-value set (should be invoked only when KVS is empty) + * Returns true if KVS is non-empty */ private boolean refillKVS() { kvs.clear(); // clear previous KVS, first initiated in the constructor if (!hasMore) { // if there is nothing expected next in compactingScanner diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java index 6351f13..75bdf3d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ReflectionUtils; import java.io.IOException; +import java.util.LinkedList; /** * A singleton store segment factory. @@ -70,16 +71,39 @@ public final class SegmentFactory { } // create new flat immutable segment from compacting old immutable segment - public ImmutableSegment createImmutableSegment(final Configuration conf, final CellComparator comparator, + public ImmutableSegment createImmutableSegment( + final Configuration conf, final CellComparator comparator, MemStoreCompactorIterator iterator, int numOfCells, ImmutableSegment.Type segmentType) throws IOException { - Preconditions.checkArgument( - segmentType != ImmutableSegment.Type.SKIPLIST_MAP_BASED, "wrong immutable segment type"); + Preconditions.checkArgument(segmentType != ImmutableSegment.Type.SKIPLIST_MAP_BASED, + "wrong immutable segment type"); MemStoreLAB memStoreLAB = getMemStoreLAB(conf); return - new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType); + // the last parameter "false" means not to merge, but to compact the pipeline + // in order to create the new segment + new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, false); } + // create new flat immutable segment from merging old immutable segment + public ImmutableSegment createImmutableSegment( + final Configuration conf, final CellComparator comparator, + MemStoreCompactorIterator iterator, int numOfCells, ImmutableSegment.Type segmentType, + LinkedList segments) + throws IOException { + Preconditions.checkArgument(segmentType != ImmutableSegment.Type.ARRAY_MAP_BASED, + "wrong immutable segment type"); + MemStoreLAB memStoreLAB = getMemStoreLAB(conf); + + for (Segment s: segments){ + ((HeapMemStoreLAB)memStoreLAB).addPooledChunkQueue( + ((HeapMemStoreLAB)s.getMemStoreLAB()).getChunkQueue()); + } + + return + // the last parameter "true" means to merge the compaction pipeline + // in order to create the new segment + new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, true); + } //****** private methods to instantiate concrete store segments **********// private MutableSegment generateMutableSegment( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index db0205e..584a128 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -508,6 +508,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { @Test public void testPuttingBackChunksWithOpeningPipelineScanner() throws IOException { + + // set memstore to do data compaction and not to use the speculative scan + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); + byte[] row = Bytes.toBytes("testrow"); byte[] fam = Bytes.toBytes("testfamily"); byte[] qf1 = Bytes.toBytes("testqualifier1"); @@ -589,6 +593,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore { @Test public void testCompaction1Bucket() throws IOException { + // set memstore to do data compaction and not to use the speculative scan + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); + String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 // test 1 bucket @@ -616,6 +623,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore { @Test public void testCompaction2Buckets() throws IOException { + // set memstore to do data compaction and not to use the speculative scan + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); + String[] keys1 = { "A", "A", "B", "C" }; String[] keys2 = { "A", "B", "D" }; @@ -660,6 +670,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore { @Test public void testCompaction3Buckets() throws IOException { + // set memstore to do data compaction and not to use the speculative scan + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); + String[] keys1 = { "A", "A", "B", "C" }; String[] keys2 = { "A", "B", "D" }; String[] keys3 = { "D", "B", "B" }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java index 1933343..851e23b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java @@ -67,7 +67,8 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore compactingSetUp(); Configuration conf = HBaseConfiguration.create(); - conf.setLong("hbase.hregion.compacting.memstore.type", 2); // compact to CellArrayMap + // set memstore to do data compaction and not to use the speculative scan + conf.set("hbase.hregion.compacting.memstore.type", "data-compaction"); this.memstore = new CompactingMemStore(conf, CellComparator.COMPARATOR, store, @@ -221,17 +222,16 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore } ////////////////////////////////////////////////////////////////////////////// - // Flattening tests + // Merging tests ////////////////////////////////////////////////////////////////////////////// @Test - public void testFlattening() throws IOException { + public void testMerging() throws IOException { String[] keys1 = { "A", "A", "B", "C", "F", "H"}; String[] keys2 = { "A", "B", "D", "G", "I", "J"}; String[] keys3 = { "D", "B", "B", "E" }; - // set flattening to true - memstore.getConfiguration().setBoolean("hbase.hregion.compacting.memstore.flatten", true); + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "index-compaction"); addRowsByKeys(memstore, keys1); @@ -264,7 +264,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore for ( Segment s : memstore.getSegments()) { counter += s.getCellsCount(); } - assertEquals(10,counter); + assertEquals(16,counter); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot ImmutableSegment s = memstore.getSnapshot(); -- 1.8.5.2 (Apple Git-48)