From 322d8c9bfe87d39223eb92b35de71b887a53b668 Mon Sep 17 00:00:00 2001 From: anastas Date: Thu, 9 Mar 2017 21:20:34 +0200 Subject: [PATCH] My squashed commits --- .../hadoop/hbase/regionserver/CompactingMemStore.java | 9 ++++++--- .../hadoop/hbase/regionserver/CompactionPipeline.java | 5 +++-- .../hadoop/hbase/regionserver/ImmutableSegment.java | 4 ++-- .../hadoop/hbase/regionserver/MemStoreCompactor.java | 18 +++++++++++++----- 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index e7f4a67..6d23795 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -256,7 +256,8 @@ public class CompactingMemStore extends AbstractMemStore { public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result, boolean merge) { - return pipeline.swap(versionedList, result, !merge); + // last true stands for updating the region size + return pipeline.swap(versionedList, result, !merge, true); } /** @@ -417,7 +418,8 @@ public class CompactingMemStore extends AbstractMemStore { private void pushTailToSnapshot() { VersionedSegmentsList segments = pipeline.getVersionedTail(); pushToSnapshot(segments.getStoreSegments()); - pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now + // In Swap: don't close segments (they are in snapshot now) and don't update the region size + pipeline.swap(segments,null,false, false); } private void pushPipelineToSnapshot() { @@ -429,7 +431,8 @@ public class CompactingMemStore extends AbstractMemStore { pushToSnapshot(segments.getStoreSegments()); // swap can return false in case the pipeline was updated by ongoing compaction // and the version increase, the chance of it happenning is very low - done = pipeline.swap(segments, null, false); // don't close segments; they are in snapshot now + // In Swap: don't close segments (they are in snapshot now) and don't update the region size + done = pipeline.swap(segments, null, false, false); if (iterationsCnt>2) { // practically it is impossible that this loop iterates more than two times // (because the compaction is stopped and none restarts it while in snapshot request), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 9a844e6..5ed07ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -106,12 +106,13 @@ public class CompactionPipeline { * removed. * @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out * During index merge op this will be false and for compaction it will be true. + * @param updateRegionSize * @return true iff swapped tail with new segment */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT", justification="Increment is done under a synchronize block so safe") public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment, - boolean closeSuffix) { + boolean closeSuffix, boolean updateRegionSize) { if (versionedList.getVersion() != version) { return false; } @@ -135,7 +136,7 @@ public class CompactionPipeline { readOnlyCopy = new LinkedList<>(pipeline); version++; } - if (closeSuffix && region != null) { + if (updateRegionSize && region != null) { // update the global memstore size counter long suffixDataSize = getSegmentsKeySize(suffix); long newDataSize = 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index faa9b67..7d008b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -102,7 +102,7 @@ public class ImmutableSegment extends Segment { super(null, // initiailize the CellSet with NULL comparator, memStoreLAB); this.type = type; - // build the true CellSet based on CellArrayMap + // build the new CellSet based on CellArrayMap CellSet cs = createCellArrayMapSet(numOfCells, iterator, merge); this.setCellSet(null, cs); // update the CellSet of the new Segment @@ -215,7 +215,7 @@ public class ImmutableSegment extends Segment { cells[i] = maybeCloneWithAllocator(c); } boolean useMSLAB = (getMemStoreLAB()!=null); - // second parameter true, because in compaction addition of the cell to new segment + // second parameter true, because in compaction/merge the addition of the cell to new segment // is always successful updateMetaInfo(c, true, useMSLAB, null); // updates the size per cell i++; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index c435098..5a7f027 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -44,6 +44,14 @@ import java.util.concurrent.atomic.AtomicBoolean; @InterfaceAudience.Private public class MemStoreCompactor { + // The upper bound for the number of segments we store in the pipeline prior to merging. + // This constant is subject to further experimentation. + // The external setting of the compacting MemStore behaviour + public static final String COMPACTING_MEMSTORE_THRESHOLD_KEY = + "hbase.hregion.compacting.memstore.threshold"; + // remaining with the same ("infinity") but configurable default for now + public static final int COMPACTING_MEMSTORE_THRESHOLD_DEFAULT = 30; + public static final long DEEP_OVERHEAD = ClassSize .align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE @@ -54,10 +62,6 @@ public class MemStoreCompactor { + ClassSize.ATOMIC_BOOLEAN // isInterrupted (the internals) ); - // The upper bound for the number of segments we store in the pipeline prior to merging. - // This constant is subject to further experimentation. - private static final int THRESHOLD_PIPELINE_SEGMENTS = 30; // stands here for infinity - private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); private CompactingMemStore compactingMemStore; @@ -158,10 +162,14 @@ public class MemStoreCompactor { return Action.COMPACT; } + int pipelineThreshold = // get the limit on the number of the segments in the pipeline + compactingMemStore.getConfiguration().getInt(COMPACTING_MEMSTORE_THRESHOLD_KEY, + COMPACTING_MEMSTORE_THRESHOLD_DEFAULT); + // compaction shouldn't happen or doesn't worth it // limit the number of the segments in the pipeline int numOfSegments = versionedList.getNumOfSegments(); - if (numOfSegments > THRESHOLD_PIPELINE_SEGMENTS) { + if (numOfSegments > pipelineThreshold) { LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() + " is going to be merged, as there are " + numOfSegments + " segments"); return Action.MERGE; // to avoid too many segments, merge now -- 1.8.5.2 (Apple Git-48)