diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java index 41c93ea..c1e48ac 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java @@ -45,6 +45,9 @@ public class ClassSize { /** Overhead for ArrayList(0) */ public static final int ARRAYLIST; + /** Overhead for LinkedList(0) */ + public static final int LINKEDLIST; + /** Overhead for ByteBuffer */ public static final int BYTE_BUFFER; @@ -105,8 +108,8 @@ public class ClassSize { /** Overhead for TimeRangeTracker */ public static final int TIMERANGE_TRACKER; - /** Overhead for CellSkipListSet */ - public static final int CELL_SKIPLIST_SET; + /** Overhead for CellSet */ + public static final int CELL_SET; public static final int STORE_SERVICES; @@ -233,6 +236,8 @@ public class ClassSize { ARRAYLIST = align(OBJECT + REFERENCE + (2 * Bytes.SIZEOF_INT)) + align(ARRAY); + LINKEDLIST = align(OBJECT + (2 * Bytes.SIZEOF_INT) + (2 * REFERENCE)); + //noinspection PointlessArithmeticExpression BYTE_BUFFER = align(OBJECT + REFERENCE + (5 * Bytes.SIZEOF_INT) + @@ -282,7 +287,7 @@ public class ClassSize { TIMERANGE_TRACKER = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2); - CELL_SKIPLIST_SET = align(OBJECT + REFERENCE); + CELL_SET = align(OBJECT + REFERENCE); STORE_SERVICES = align(OBJECT + REFERENCE + ATOMIC_LONG); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index 2b9910f..aa410b2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -50,34 +50,28 @@ public abstract class AbstractMemStore implements MemStore { private final CellComparator comparator; // active segment absorbs write operations - private volatile MutableSegment active; + protected volatile MutableSegment active; // Snapshot of memstore. Made for flusher. - private volatile ImmutableSegment snapshot; + protected volatile ImmutableSegment snapshot; protected volatile long snapshotId; // Used to track when to flush private volatile long timeOfOldestEdit; - public final static long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + - (4 * ClassSize.REFERENCE) + - (2 * Bytes.SIZEOF_LONG)); - - public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + - (ClassSize.ATOMIC_LONG + ClassSize.TIMERANGE_TRACKER + - ClassSize.CELL_SKIPLIST_SET + ClassSize.CONCURRENT_SKIPLISTMAP)); - + public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT + + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)); + public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + Segment.DEEP_OVERHEAD; protected AbstractMemStore(final Configuration conf, final CellComparator c) { this.conf = conf; this.comparator = c; resetCellSet(); - this.snapshot = SegmentFactory.instance().createImmutableSegment(conf, c, 0); + this.snapshot = SegmentFactory.instance().createImmutableSegment(c); this.snapshotId = NO_SNAPSHOT_ID; } protected void resetCellSet() { // Reset heap to not include any keys - this.active = SegmentFactory.instance().createMutableSegment(conf, comparator, DEEP_OVERHEAD); + this.active = SegmentFactory.instance().createMutableSegment(conf, comparator); this.timeOfOldestEdit = Long.MAX_VALUE; } @@ -177,25 +171,15 @@ public abstract class AbstractMemStore implements MemStore { // create a new snapshot and let the old one go. Segment oldSnapshot = this.snapshot; if (!this.snapshot.isEmpty()) { - this.snapshot = SegmentFactory.instance().createImmutableSegment( - getComparator(), 0); + this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator); } this.snapshotId = NO_SNAPSHOT_ID; oldSnapshot.close(); } - /** - * Get the entire heap usage for this MemStore not including keys in the - * snapshot. - */ - @Override - public long heapSize() { - return getActive().getSize(); - } - @Override public long getSnapshotSize() { - return getSnapshot().getSize(); + return this.snapshot.getSize(); } @Override @@ -265,7 +249,7 @@ public abstract class AbstractMemStore implements MemStore { continue; } // check that this is the row and column we are interested in, otherwise bail - if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) { + if (CellUtil.matchingRows(cell, cur) && CellUtil.matchingQualifier(cell, cur)) { // only remove Puts that concurrent scanners cannot possibly see if (cur.getTypeByte() == KeyValue.Type.Put.getCode() && cur.getSequenceId() <= readpoint) { @@ -346,7 +330,7 @@ public abstract class AbstractMemStore implements MemStore { Cell snc = snapshot.getFirstAfter(firstCell); if(snc != null) { // is there a matching Cell in the snapshot? - if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) { + if (CellUtil.matchingRows(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) { if (snc.getTimestamp() == now) { now += 1; } @@ -362,7 +346,7 @@ public abstract class AbstractMemStore implements MemStore { for (Cell cell : ss) { // if this isnt the row we are interested in, then bail: if (!CellUtil.matchingColumn(cell, family, qualifier) - || !CellUtil.matchingRow(cell, firstCell)) { + || !CellUtil.matchingRows(cell, firstCell)) { break; // rows dont match, bail. } @@ -406,8 +390,11 @@ public abstract class AbstractMemStore implements MemStore { } } + /** + * @return The size of the active segment. Means sum of all cell's size. + */ protected long keySize() { - return heapSize() - DEEP_OVERHEAD; + return this.active.getSize(); } protected CellComparator getComparator() { @@ -422,22 +409,12 @@ public abstract class AbstractMemStore implements MemStore { return snapshot; } - protected AbstractMemStore setSnapshot(ImmutableSegment snapshot) { - this.snapshot = snapshot; - return this; - } - - protected void setSnapshotSize(long snapshotSize) { - getSnapshot().setSize(snapshotSize); - } - /** * Check whether anything need to be done based on the current active set size */ protected abstract void checkActiveSize(); /** - * Returns an ordered list of segments from most recent to oldest in memstore * @return an ordered list of segments from most recent to oldest in memstore */ protected abstract List getSegments() throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java index 4433302..802609f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ClassSize; /** * A {@link java.util.Set} of {@link Cell}s, where an add will overwrite the entry if already @@ -39,6 +40,10 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; */ @InterfaceAudience.Private public class CellSet implements NavigableSet { + + private final static long FIXED_OVERHEAD = ClassSize.CELL_SET; + public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP; + // Implemented on top of a {@link java.util.concurrent.ConcurrentSkipListMap} // Differ from CSLS in one respect, where CSLS does "Adds the specified element to this set if it // is not already present.", this implementation "Adds the specified element to this set EVEN diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index e27acce..a10d737 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; @@ -51,9 +50,6 @@ import org.apache.hadoop.hbase.wal.WAL; */ @InterfaceAudience.Private public class CompactingMemStore extends AbstractMemStore { - public final static long DEEP_OVERHEAD_PER_PIPELINE_ITEM = ClassSize.align( - ClassSize.TIMERANGE_TRACKER + ClassSize.TIMERANGE + - ClassSize.CELL_SKIPLIST_SET + ClassSize.CONCURRENT_SKIPLISTMAP); // Default fraction of in-memory-flush size w.r.t. flush-to-disk size public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY = "hbase.memstore.inmemoryflush.threshold.factor"; @@ -69,6 +65,10 @@ public class CompactingMemStore extends AbstractMemStore { private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false); private final AtomicBoolean allowCompaction = new AtomicBoolean(true); + public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD + 6 * ClassSize.REFERENCE + + Bytes.SIZEOF_LONG + 2 * ClassSize.ATOMIC_BOOLEAN + CompactionPipeline.DEEP_OVERHEAD + + MemStoreCompactor.DEEP_OVERHEAD; + public CompactingMemStore(Configuration conf, CellComparator c, HStore store, RegionServicesForStores regionServices) throws IOException { super(conf, c); @@ -94,18 +94,6 @@ public class CompactingMemStore extends AbstractMemStore { LOG.info("Setting in-memory flush size threshold to " + inmemoryFlushSize); } - public static long getSegmentSize(Segment segment) { - return segment.getSize() - DEEP_OVERHEAD_PER_PIPELINE_ITEM; - } - - public static long getSegmentsSize(List list) { - long res = 0; - for (Segment segment : list) { - res += getSegmentSize(segment); - } - return res; - } - /** * @return Total memory occupied by this MemStore. * This is not thread safe and the memstore may be changed while computing its size. @@ -113,14 +101,24 @@ public class CompactingMemStore extends AbstractMemStore { */ @Override public long size() { - long res = 0; - for (Segment item : getSegments()) { - res += item.getSize(); + long res = this.active.getSize() + DEEP_OVERHEAD; + for (Segment item : pipeline.getSegments()) { + res += item.getSize() + Segment.DEEP_OVERHEAD; } return res; } /** + * Get the entire heap usage for this MemStore not including keys in the + * snapshot. + */ + @Override + public long heapSize() { + // TODO add pipeline heap size also. + return DEEP_OVERHEAD + this.active.heapSize() + this.snapshot.heapSize(); + } + + /** * This method is called when it is clear that the flush to disk is completed. * The store may do any post-flush actions at this point. * One example is to update the WAL with sequence number that is known only at the store level. @@ -145,7 +143,7 @@ public class CompactingMemStore extends AbstractMemStore { MutableSegment active = getActive(); // If snapshot currently has entries, then flusher failed or didn't call // cleanup. Log a warning. - if (!getSnapshot().isEmpty()) { + if (!this.snapshot.isEmpty()) { LOG.warn("Snapshot called again without clearing previous. " + "Doing nothing. Another ongoing flush or did we fail last attempt?"); } else { @@ -166,10 +164,11 @@ public class CompactingMemStore extends AbstractMemStore { * On flush, how much memory we will clear. * @return size of data that is going to be flushed */ - @Override public long getFlushableSize() { - long snapshotSize = getSnapshot().getSize(); - if(snapshotSize == 0) { - //if snapshot is empty the tail of the pipeline is flushed + @Override + public long getFlushableSize() { + long snapshotSize = getSnapshotSize(); + if (snapshotSize == 0) { + // if snapshot is empty the tail of the pipeline is flushed snapshotSize = pipeline.getTailSize(); } return snapshotSize > 0 ? snapshotSize : keySize(); @@ -191,10 +190,10 @@ public class CompactingMemStore extends AbstractMemStore { @Override public List getSegments() { List pipelineList = pipeline.getSegments(); - List list = new LinkedList(); + List list = new ArrayList(pipelineList.size() + 2); list.add(getActive()); list.addAll(pipelineList); - list.add(getSnapshot()); + list.add(this.snapshot); return list; } @@ -238,7 +237,7 @@ public class CompactingMemStore extends AbstractMemStore { list.add(item.getSegmentScanner(readPt, order)); order--; } - list.add(getSnapshot().getSegmentScanner(readPt, order)); + list.add(this.snapshot.getSegmentScanner(readPt, order)); return Collections. singletonList( new MemStoreScanner((AbstractMemStore) this, list, readPt)); } @@ -335,8 +334,6 @@ public class CompactingMemStore extends AbstractMemStore { private void pushActiveToPipeline(MutableSegment active) { if (!active.isEmpty()) { - long delta = DEEP_OVERHEAD_PER_PIPELINE_ITEM - DEEP_OVERHEAD; - active.setSize(active.getSize() + delta); pipeline.pushHead(active); resetCellSet(); } @@ -345,9 +342,7 @@ public class CompactingMemStore extends AbstractMemStore { private void pushTailToSnapshot() { ImmutableSegment tail = pipeline.pullTail(); if (!tail.isEmpty()) { - setSnapshot(tail); - long size = getSegmentSize(tail); - setSnapshotSize(size); + this.snapshot = tail; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 3f3bf8d..26fc3e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -24,7 +24,10 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; /** * The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments. @@ -39,13 +42,16 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; public class CompactionPipeline { private static final Log LOG = LogFactory.getLog(CompactionPipeline.class); + private final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT + + (2 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); + public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.LINKEDLIST; + private final RegionServicesForStores region; private LinkedList pipeline; private long version; private static final ImmutableSegment EMPTY_MEM_STORE_SEGMENT = SegmentFactory.instance() - .createImmutableSegment(null, - CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM); + .createImmutableSegment((CellComparator)null); public CompactionPipeline(RegionServicesForStores region) { this.region = region; @@ -105,18 +111,26 @@ public class CompactionPipeline { } if (region != null) { // update the global memstore size counter - long suffixSize = CompactingMemStore.getSegmentsSize(suffix); - long newSize = CompactingMemStore.getSegmentSize(segment); + long suffixSize = getSegmentsSize(suffix); + long newSize = segment.getSize(); long delta = suffixSize - newSize; long globalMemstoreSize = region.addAndGetGlobalMemstoreSize(-delta); if (LOG.isDebugEnabled()) { - LOG.debug("Suffix size: " + suffixSize + " compacted item size: " + newSize + LOG.debug("Suffix heap size: " + suffixSize + " compacted item heap size: " + newSize + " globalMemstoreSize: " + globalMemstoreSize); } } return true; } + private static long getSegmentsSize(List list) { + long res = 0; + for (Segment segment : list) { + res += segment.getSize(); + } + return res; + } + public boolean isEmpty() { return pipeline.isEmpty(); } @@ -142,7 +156,7 @@ public class CompactionPipeline { public long getTailSize() { if(isEmpty()) return 0; - return CompactingMemStore.getSegmentSize(pipeline.peekLast()); + return pipeline.peekLast().getSize(); } private void swapSuffix(LinkedList suffix, ImmutableSegment segment) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index c21dbb5..1c9d804 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -91,8 +91,7 @@ public class DefaultMemStore extends AbstractMemStore { if (!getActive().isEmpty()) { ImmutableSegment immutableSegment = SegmentFactory.instance(). createImmutableSegment(getActive()); - setSnapshot(immutableSegment); - setSnapshotSize(keySize()); + this.snapshot = immutableSegment; resetCellSet(); } } @@ -106,7 +105,7 @@ public class DefaultMemStore extends AbstractMemStore { */ @Override public long getFlushableSize() { - long snapshotSize = getSnapshot().getSize(); + long snapshotSize = this.snapshot.getSize(); return snapshotSize > 0 ? snapshotSize : keySize(); } @@ -117,7 +116,7 @@ public class DefaultMemStore extends AbstractMemStore { public List getScanners(long readPt) throws IOException { List list = new ArrayList(2); list.add(getActive().getSegmentScanner(readPt, 1)); - list.add(getSnapshot().getSegmentScanner(readPt, 0)); + list.add(this.snapshot.getSegmentScanner(readPt, 0)); return Collections. singletonList( new MemStoreScanner((AbstractMemStore) this, list, readPt)); } @@ -126,7 +125,7 @@ public class DefaultMemStore extends AbstractMemStore { protected List getSegments() throws IOException { List list = new ArrayList(2); list.add(getActive()); - list.add(getSnapshot()); + list.add(this.snapshot); return list; } @@ -144,12 +143,9 @@ public class DefaultMemStore extends AbstractMemStore { @Override public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) { } - /** - * @return Total memory occupied by this MemStore. - */ @Override public long size() { - return heapSize(); + return keySize() + DEEP_OVERHEAD; } /** @@ -170,6 +166,15 @@ public class DefaultMemStore extends AbstractMemStore { } /** + * Get the entire heap usage for this MemStore not including keys in the + * snapshot. + */ + @Override + public long heapSize() { + return DEEP_OVERHEAD + this.active.heapSize() + this.snapshot.heapSize(); + } + + /** * Code to help figure if our approximation of object heap sizes is close * enough. See hbase-900. Fills memstores then waits so user can heap * dump and bring up resultant hprof in something like jprofiler which diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 00d49d1..44b94b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -125,7 +125,9 @@ public interface MemStore extends HeapSize { List getScanners(long readPt) throws IOException; /** - * @return Total memory occupied by this MemStore. + * @return Total memory occupied by this MemStore. This includes active segment size and heap size + * overhead of this memstore but won't include any size occupied by the snapshot. We + * assume the snapshot will get cleared soon */ long size(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 042de0a..ab06af8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -24,6 +24,8 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; import java.io.IOException; import java.util.ArrayList; @@ -43,6 +45,11 @@ import java.util.concurrent.atomic.AtomicBoolean; class MemStoreCompactor { private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); + + private static final long FIXED_OVERHEAD = ClassSize.align(5 * ClassSize.REFERENCE + + Bytes.SIZEOF_LONG); + public static final long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.ATOMIC_BOOLEAN; + private CompactingMemStore compactingMemStore; private MemStoreScanner scanner; // scanner for pipeline only // scanner on top of MemStoreScanner that uses ScanQueryMatcher @@ -122,11 +129,8 @@ class MemStoreCompactor { * There is at most one thread per memstore instance. */ private void doCompaction() { - - ImmutableSegment result = SegmentFactory.instance() // create the scanner - .createImmutableSegment( - compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), - CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM); + MutableSegment result = SegmentFactory.instance().createMutableSegment( + compactingMemStore.getConfiguration(), compactingMemStore.getComparator()); // the compaction processing try { @@ -135,7 +139,8 @@ class MemStoreCompactor { // Phase II: swap the old compaction pipeline if (!isInterrupted.get()) { - if (compactingMemStore.swapCompactedSegments(versionedList, result)) { + if (compactingMemStore.swapCompactedSegments(versionedList, SegmentFactory.instance() + .createImmutableSegment(result))) { // update the wal so it can be truncated and not get too long compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater } else { @@ -178,7 +183,7 @@ class MemStoreCompactor { * Updates the given single Segment using the internal store scanner, * who in turn uses ScanQueryMatcher */ - private void compactSegments(Segment result) throws IOException { + private void compactSegments(MutableSegment result) throws IOException { List kvs = new ArrayList(); // get the limit to the size of the groups to be returned by compactingScanner @@ -198,8 +203,7 @@ class MemStoreCompactor { // now we just copy it to the new segment Cell newKV = result.maybeCloneWithAllocator(c); boolean mslabUsed = (newKV != c); - result.internalAdd(newKV, mslabUsed); - + result.add(newKV, mslabUsed); } kvs.clear(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java index e62249a..6f128b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java @@ -23,14 +23,16 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; +import com.google.common.annotations.VisibleForTesting; + /** * A mutable segment in memstore, specifically the active segment. */ @InterfaceAudience.Private public class MutableSegment extends Segment { - protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB, - long size) { - super(cellSet, comparator, memStoreLAB, size); + + protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) { + super(cellSet, comparator, memStoreLAB); } /** @@ -40,30 +42,35 @@ public class MutableSegment extends Segment { * @return the change in the heap size */ public long add(Cell cell, boolean mslabUsed) { - return internalAdd(cell, mslabUsed); + boolean succ = getCellSet().add(cell); + long s = AbstractMemStore.heapSizeChange(cell, succ); + // If there's already a same cell in the CellSet and we are using MSLAB, we must count in the + // MSLAB allocation size as well, or else there will be memory leak (occupied heap size larger + // than the counted number) + if (!succ && mslabUsed) { + s += getCellLength(cell); + } + updateMetaInfo(cell, s); + return s; } - /** - * Removes the given cell from the segment - * @return the change in the heap size - */ - public long rollback(Cell cell) { - Cell found = getCellSet().get(cell); - if (found != null && found.getSequenceId() == cell.getSequenceId()) { - long sz = AbstractMemStore.heapSizeChange(cell, true); - getCellSet().remove(cell); - incSize(-sz); - return sz; + protected void updateMetaInfo(Cell toAdd, long s) { + incSize(s); + getTimeRangeTracker().includeTimestamp(toAdd); + minSequenceId = Math.min(minSequenceId, toAdd.getSequenceId()); + // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call. + // When we use ACL CP or Visibility CP which deals with Tags during + // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not + // parse the byte[] to identify the tags length. + if (toAdd.getTagsLength() > 0) { + tagsPresent = true; } - return 0; } - - //methods for test - + /** - * Returns the first cell in the segment * @return the first cell in the segment */ + @VisibleForTesting Cell first() { return this.getCellSet().first(); } @@ -78,17 +85,4 @@ public class MutableSegment extends Segment { public long getMinTimestamp() { return getTimeRangeTracker().getMin(); } - - @Override - protected void updateMetaInfo(Cell toAdd, long s) { - getTimeRangeTracker().includeTimestamp(toAdd); - size.addAndGet(s); - // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call. - // When we use ACL CP or Visibility CP which deals with Tags during - // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not - // parse the byte[] to identify the tags length. - if(toAdd.getTagsLength() > 0) { - tagsPresent = true; - } - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java index 33c3bfb..0b8d8ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.ByteRange; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; import com.google.common.annotations.VisibleForTesting; @@ -43,21 +45,27 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceAudience.Private public abstract class Segment { - private volatile CellSet cellSet; - private final CellComparator comparator; - private long minSequenceId; - private volatile MemStoreLAB memStoreLAB; + public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT + 5 + * ClassSize.REFERENCE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_BOOLEAN); + public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.ATOMIC_LONG + + CellSet.DEEP_OVERHEAD + ClassSize.TIMERANGE_TRACKER; + + protected volatile CellSet cellSet; + protected final CellComparator comparator; + protected long minSequenceId; + protected volatile MemStoreLAB memStoreLAB; + // Sum of sizes of all Cells added to this Segment. Cell's heapSize is considered. This is not + // including the heap overhead of this class. protected final AtomicLong size; protected volatile boolean tagsPresent; - private final TimeRangeTracker timeRangeTracker; + protected final TimeRangeTracker timeRangeTracker; - protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB, - long size) { + protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) { this.cellSet = cellSet; this.comparator = comparator; this.minSequenceId = Long.MAX_VALUE; this.memStoreLAB = memStoreLAB; - this.size = new AtomicLong(size); + this.size = new AtomicLong(0); this.tagsPresent = false; this.timeRangeTracker = new TimeRangeTracker(); } @@ -89,7 +97,6 @@ public abstract class Segment { } /** - * Returns whether the segment has any cells * @return whether the segment has any cells */ public boolean isEmpty() { @@ -97,7 +104,6 @@ public abstract class Segment { } /** - * Returns number of cells in segment * @return number of cells in segment */ public int getCellsCount() { @@ -105,7 +111,6 @@ public abstract class Segment { } /** - * Returns the first cell in the segment that has equal or greater key than the given cell * @return the first cell in the segment that has equal or greater key than the given cell */ public Cell getFirstAfter(Cell cell) { @@ -199,6 +204,13 @@ public abstract class Segment { } /** + * @return The heap size of this Segment + */ + public long heapSize() { + return size.get() + DEEP_OVERHEAD; + } + + /** * Increases the heap size counter of the segment by the given delta */ public void incSize(long delta) { @@ -250,32 +262,6 @@ public abstract class Segment { return comparator; } - protected long internalAdd(Cell cell, boolean mslabUsed) { - boolean succ = getCellSet().add(cell); - long s = AbstractMemStore.heapSizeChange(cell, succ); - // If there's already a same cell in the CellSet and we are using MSLAB, we must count in the - // MSLAB allocation size as well, or else there will be memory leak (occupied heap size larger - // than the counted number) - if (!succ && mslabUsed) { - s += getCellLength(cell); - } - updateMetaInfo(cell, s); - return s; - } - - protected void updateMetaInfo(Cell toAdd, long s) { - getTimeRangeTracker().includeTimestamp(toAdd); - size.addAndGet(s); - minSequenceId = Math.min(minSequenceId, toAdd.getSequenceId()); - // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call. - // When we use ACL CP or Visibility CP which deals with Tags during - // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not - // parse the byte[] to identify the tags length. - if(toAdd.getTagsLength() > 0) { - tagsPresent = true; - } - } - /** * Returns a subset of the segment cell set, which starts with the given cell * @param firstCell a cell in the segment diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java index 7ac80ae..9b789e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java @@ -34,41 +34,36 @@ public final class SegmentFactory { static final boolean USEMSLAB_DEFAULT = true; static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class"; - private SegmentFactory() {} private static SegmentFactory instance = new SegmentFactory(); - public static SegmentFactory instance() { - return instance; + + private SegmentFactory() { } - public ImmutableSegment createImmutableSegment(final Configuration conf, - final CellComparator comparator, long size) { - MemStoreLAB memStoreLAB = getMemStoreLAB(conf); - MutableSegment segment = generateMutableSegment(conf, comparator, memStoreLAB, size); - return createImmutableSegment(segment); + public static SegmentFactory instance() { + return instance; } - public ImmutableSegment createImmutableSegment(CellComparator comparator, - long size) { - MutableSegment segment = generateMutableSegment(null, comparator, null, size); + public ImmutableSegment createImmutableSegment(CellComparator comparator) { + MutableSegment segment = generateMutableSegment(null, comparator, null); return createImmutableSegment(segment); } public ImmutableSegment createImmutableSegment(MutableSegment segment) { return new ImmutableSegment(segment); } - public MutableSegment createMutableSegment(final Configuration conf, - CellComparator comparator, long size) { + + public MutableSegment createMutableSegment(final Configuration conf, CellComparator comparator) { MemStoreLAB memStoreLAB = getMemStoreLAB(conf); - return generateMutableSegment(conf, comparator, memStoreLAB, size); + return generateMutableSegment(conf, comparator, memStoreLAB); } //****** private methods to instantiate concrete store segments **********// - private MutableSegment generateMutableSegment( - final Configuration conf, CellComparator comparator, MemStoreLAB memStoreLAB, long size) { + private MutableSegment generateMutableSegment(final Configuration conf, + CellComparator comparator, MemStoreLAB memStoreLAB) { // TBD use configuration to set type of segment CellSet set = new CellSet(comparator); - return new MutableSegment(set, comparator, memStoreLAB, size); + return new MutableSegment(set, comparator, memStoreLAB); } private MemStoreLAB getMemStoreLAB(Configuration conf) { @@ -80,5 +75,4 @@ public final class SegmentFactory { } return memStoreLAB; } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index 09e2271..b4f2b64 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.regionserver.CellSet; import org.apache.hadoop.hbase.regionserver.DefaultMemStore; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Segment; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -240,7 +241,7 @@ public class TestHeapSize { // CellSet cl = CellSet.class; expected = ClassSize.estimateBase(cl, false); - actual = ClassSize.CELL_SKIPLIST_SET; + actual = ClassSize.CELL_SET; if (expected != actual) { ClassSize.estimateBase(cl, true); assertEquals(expected, actual); @@ -295,21 +296,29 @@ public class TestHeapSize { // DefaultMemStore Overhead cl = DefaultMemStore.class; - actual = DefaultMemStore.FIXED_OVERHEAD; + actual = DefaultMemStore.DEEP_OVERHEAD; expected = ClassSize.estimateBase(cl, false); if(expected != actual) { ClassSize.estimateBase(cl, true); assertEquals(expected, actual); } - // DefaultMemStore Deep Overhead + // DefaultMemStore deep Overhead actual = DefaultMemStore.DEEP_OVERHEAD; expected = ClassSize.estimateBase(cl, false); + if(expected != actual) { + ClassSize.estimateBase(cl, true); + assertEquals(expected, actual); + } + + // Segment Deep overhead + cl = Segment.class; + actual = Segment.DEEP_OVERHEAD; expected += ClassSize.estimateBase(AtomicLong.class, false); expected += ClassSize.estimateBase(CellSet.class, false); expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false); expected += ClassSize.estimateBase(TimeRangeTracker.class, false); - if(expected != actual) { + if (expected != actual) { ClassSize.estimateBase(cl, true); ClassSize.estimateBase(AtomicLong.class, true); ClassSize.estimateBase(CellSet.class, true); @@ -317,7 +326,6 @@ public class TestHeapSize { ClassSize.estimateBase(TimeRangeTracker.class, true); assertEquals(expected, actual); } - // Store Overhead cl = HStore.class; actual = HStore.FIXED_OVERHEAD; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java index be604af..75d2280 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java @@ -19,15 +19,11 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CategoryBasedTimeout; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.TestMobSnapshotCloneIndependence; -import org.apache.hadoop.hbase.master.procedure.TestMasterFailoverWithProcedures; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; import org.apache.hadoop.hbase.wal.WAL; @@ -44,7 +40,6 @@ import org.junit.rules.TestRule; public class TestHRegionWithInMemoryFlush extends TestHRegion{ // Do not spin up clusters in here. If you need to spin up a cluster, do it // over in TestHRegionOnCluster. - private static final Log LOG = LogFactory.getLog(TestHRegionWithInMemoryFlush.class); @ClassRule public static final TestRule timeout = CategoryBasedTimeout.forClass(TestHRegionWithInMemoryFlush.class); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java index 2acfd12..c8263fc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java @@ -21,8 +21,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -43,7 +41,6 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -59,7 +56,6 @@ import static org.junit.Assert.assertTrue; @Category({ RegionServerTests.class, LargeTests.class }) public class TestWalAndCompactingMemStoreFlush { - private static final Log LOG = LogFactory.getLog(TestWalAndCompactingMemStoreFlush.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion"); public static final TableName TABLENAME = TableName.valueOf("TestWalAndCompactingMemStoreFlush", @@ -201,12 +197,12 @@ public class TestWalAndCompactingMemStoreFlush { // memstores of CF1, CF2 and CF3. String msg = "totalMemstoreSize="+totalMemstoreSize + " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD + - " DEEP_OVERHEAD_PER_PIPELINE_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM + " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI + " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI + " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ; - assertEquals(msg,totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, - cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI); + assertEquals(msg, totalMemstoreSize + 2 * CompactingMemStore.DEEP_OVERHEAD + + DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + + cf3MemstoreSizePhaseI); // Flush!!!!!!!!!!!!!!!!!!!!!! // We have big compacting memstore CF1 and two small memstores: @@ -219,11 +215,6 @@ public class TestWalAndCompactingMemStoreFlush { ((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory(); region.flush(false); - // CF3 should be compacted so wait here to be sure the compaction is done - while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore()) - .isMemStoreFlushingInMemory()) - Threads.sleep(10); - // Recalculate everything long cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); @@ -238,24 +229,20 @@ public class TestWalAndCompactingMemStoreFlush { s = s + "DefaultMemStore DEEP_OVERHEAD is:" + DefaultMemStore.DEEP_OVERHEAD + ", CompactingMemStore DEEP_OVERHEAD is:" + CompactingMemStore.DEEP_OVERHEAD - + ", CompactingMemStore DEEP_OVERHEAD_PER_PIPELINE_ITEM is:" + CompactingMemStore - .DEEP_OVERHEAD_PER_PIPELINE_ITEM + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n" + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n"; // CF1 was flushed to memory, but there is nothing to compact, should // remain the same size plus renewed empty skip-list - assertEquals(s, cf1MemstoreSizePhaseII, - cf1MemstoreSizePhaseI + CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM); + assertEquals(s, cf1MemstoreSizePhaseI + Segment.DEEP_OVERHEAD, cf1MemstoreSizePhaseII); // CF2 should become empty assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseII); // verify that CF3 was flushed to memory and was compacted (this is approximation check) - assertTrue(cf3MemstoreSizePhaseI/2+DefaultMemStore.DEEP_OVERHEAD + - CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM > - cf3MemstoreSizePhaseII); + assertTrue(cf3MemstoreSizePhaseI / 2 + CompactingMemStore.DEEP_OVERHEAD + Segment.DEEP_OVERHEAD + > cf3MemstoreSizePhaseII); assertTrue(cf3MemstoreSizePhaseI/2 < cf3MemstoreSizePhaseII); @@ -311,7 +298,7 @@ public class TestWalAndCompactingMemStoreFlush { // CF1's pipeline component (inserted before first flush) should be flushed to disk // CF2 should be flushed to disk - assertEquals(cf1MemstoreSizePhaseIII - cf1MemstoreSizePhaseI + DefaultMemStore.DEEP_OVERHEAD, + assertEquals(cf1MemstoreSizePhaseIII - cf1MemstoreSizePhaseI + CompactingMemStore.DEEP_OVERHEAD, cf1MemstoreSizePhaseIV); assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV); @@ -337,9 +324,9 @@ public class TestWalAndCompactingMemStoreFlush { long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region) .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); - assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSizePhaseV); + assertEquals(CompactingMemStore.DEEP_OVERHEAD, cf1MemstoreSizePhaseV); assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseV); - assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSizePhaseV); + assertEquals(CompactingMemStore.DEEP_OVERHEAD, cf3MemstoreSizePhaseV); // Because there is nothing in any memstore the WAL's LSN should be -1 assertEquals(smallestSeqInRegionCurrentMemstorePhaseV, HConstants.NO_SEQNUM); @@ -406,9 +393,9 @@ public class TestWalAndCompactingMemStoreFlush { long totalMemstoreSize = region.getMemstoreSize(); // Find the sizes of the memstores of each CF. - long cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); - long cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); - long cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); + long cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); // Compacting Memstore + long cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); // Normal Memstore + long cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); // Compacting Memstore // Some other sanity checks. assertTrue(cf1MemstoreSizePhaseI > 0); @@ -419,23 +406,16 @@ public class TestWalAndCompactingMemStoreFlush { // memstores of CF1, CF2 and CF3. String msg = "totalMemstoreSize="+totalMemstoreSize + " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD + - " DEEP_OVERHEAD_PER_PIPELINE_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM + " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI + " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI + " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ; - assertEquals(msg, totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, - cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI); + assertEquals(msg, totalMemstoreSize + 2 * CompactingMemStore.DEEP_OVERHEAD + + DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + + cf3MemstoreSizePhaseI); // Flush! ((CompactingMemStore) region.getStore(FAMILY1).getMemStore()).flushInMemory(); ((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory(); - // CF1 and CF3 should be compacted so wait here to be sure the compaction is done - while (((CompactingMemStore) region.getStore(FAMILY1).getMemStore()) - .isMemStoreFlushingInMemory()) - Threads.sleep(10); - while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore()) - .isMemStoreFlushingInMemory()) - Threads.sleep(10); region.flush(false); long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); @@ -485,13 +465,6 @@ public class TestWalAndCompactingMemStoreFlush { // Flush! ((CompactingMemStore) region.getStore(FAMILY1).getMemStore()).flushInMemory(); ((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory(); - // CF1 and CF3 should be compacted so wait here to be sure the compaction is done - while (((CompactingMemStore) region.getStore(FAMILY1).getMemStore()) - .isMemStoreFlushingInMemory()) - Threads.sleep(10); - while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore()) - .isMemStoreFlushingInMemory()) - Threads.sleep(10); region.flush(false); long smallestSeqInRegionCurrentMemstorePhaseIV =