diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java index 41c93ea..c1e48ac 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java @@ -45,6 +45,9 @@ public class ClassSize { /** Overhead for ArrayList(0) */ public static final int ARRAYLIST; + /** Overhead for LinkedList(0) */ + public static final int LINKEDLIST; + /** Overhead for ByteBuffer */ public static final int BYTE_BUFFER; @@ -105,8 +108,8 @@ public class ClassSize { /** Overhead for TimeRangeTracker */ public static final int TIMERANGE_TRACKER; - /** Overhead for CellSkipListSet */ - public static final int CELL_SKIPLIST_SET; + /** Overhead for CellSet */ + public static final int CELL_SET; public static final int STORE_SERVICES; @@ -233,6 +236,8 @@ public class ClassSize { ARRAYLIST = align(OBJECT + REFERENCE + (2 * Bytes.SIZEOF_INT)) + align(ARRAY); + LINKEDLIST = align(OBJECT + (2 * Bytes.SIZEOF_INT) + (2 * REFERENCE)); + //noinspection PointlessArithmeticExpression BYTE_BUFFER = align(OBJECT + REFERENCE + (5 * Bytes.SIZEOF_INT) + @@ -282,7 +287,7 @@ public class ClassSize { TIMERANGE_TRACKER = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2); - CELL_SKIPLIST_SET = align(OBJECT + REFERENCE); + CELL_SET = align(OBJECT + REFERENCE); STORE_SERVICES = align(OBJECT + REFERENCE + ATOMIC_LONG); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index 2b9910f..527a9ba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -50,34 +50,28 @@ public abstract class AbstractMemStore implements MemStore { private final CellComparator comparator; // active segment absorbs write operations - private volatile MutableSegment active; + protected volatile MutableSegment active; // Snapshot of memstore. Made for flusher. - private volatile ImmutableSegment snapshot; + protected volatile ImmutableSegment snapshot; protected volatile long snapshotId; // Used to track when to flush private volatile long timeOfOldestEdit; - public final static long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + - (4 * ClassSize.REFERENCE) + - (2 * Bytes.SIZEOF_LONG)); - - public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + - (ClassSize.ATOMIC_LONG + ClassSize.TIMERANGE_TRACKER + - ClassSize.CELL_SKIPLIST_SET + ClassSize.CONCURRENT_SKIPLISTMAP)); - + public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT + + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)); + public final static long DEEP_OVERHEAD = FIXED_OVERHEAD; protected AbstractMemStore(final Configuration conf, final CellComparator c) { this.conf = conf; this.comparator = c; resetCellSet(); - this.snapshot = SegmentFactory.instance().createImmutableSegment(conf, c, 0); + this.snapshot = SegmentFactory.instance().createImmutableSegment(c); this.snapshotId = NO_SNAPSHOT_ID; } protected void resetCellSet() { // Reset heap to not include any keys - this.active = SegmentFactory.instance().createMutableSegment(conf, comparator, DEEP_OVERHEAD); + this.active = SegmentFactory.instance().createMutableSegment(conf, comparator); this.timeOfOldestEdit = Long.MAX_VALUE; } @@ -177,25 +171,15 @@ public abstract class AbstractMemStore implements MemStore { // create a new snapshot and let the old one go. Segment oldSnapshot = this.snapshot; if (!this.snapshot.isEmpty()) { - this.snapshot = SegmentFactory.instance().createImmutableSegment( - getComparator(), 0); + this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator); } this.snapshotId = NO_SNAPSHOT_ID; oldSnapshot.close(); } - /** - * Get the entire heap usage for this MemStore not including keys in the - * snapshot. - */ - @Override - public long heapSize() { - return getActive().getSize(); - } - @Override public long getSnapshotSize() { - return getSnapshot().getSize(); + return this.snapshot.keySize(); } @Override @@ -265,7 +249,7 @@ public abstract class AbstractMemStore implements MemStore { continue; } // check that this is the row and column we are interested in, otherwise bail - if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) { + if (CellUtil.matchingRows(cell, cur) && CellUtil.matchingQualifier(cell, cur)) { // only remove Puts that concurrent scanners cannot possibly see if (cur.getTypeByte() == KeyValue.Type.Put.getCode() && cur.getSequenceId() <= readpoint) { @@ -346,7 +330,7 @@ public abstract class AbstractMemStore implements MemStore { Cell snc = snapshot.getFirstAfter(firstCell); if(snc != null) { // is there a matching Cell in the snapshot? - if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) { + if (CellUtil.matchingRows(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) { if (snc.getTimestamp() == now) { now += 1; } @@ -362,7 +346,7 @@ public abstract class AbstractMemStore implements MemStore { for (Cell cell : ss) { // if this isnt the row we are interested in, then bail: if (!CellUtil.matchingColumn(cell, family, qualifier) - || !CellUtil.matchingRow(cell, firstCell)) { + || !CellUtil.matchingRows(cell, firstCell)) { break; // rows dont match, bail. } @@ -406,8 +390,11 @@ public abstract class AbstractMemStore implements MemStore { } } + /** + * @return The size of the active segment. Means sum of all cell's size. + */ protected long keySize() { - return heapSize() - DEEP_OVERHEAD; + return this.active.keySize(); } protected CellComparator getComparator() { @@ -422,13 +409,13 @@ public abstract class AbstractMemStore implements MemStore { return snapshot; } - protected AbstractMemStore setSnapshot(ImmutableSegment snapshot) { - this.snapshot = snapshot; - return this; - } - - protected void setSnapshotSize(long snapshotSize) { - getSnapshot().setSize(snapshotSize); + /** + * Get the entire heap usage for this MemStore not including keys in the + * snapshot. + */ + @Override + public long heapSize() { + return size(); } /** @@ -437,7 +424,6 @@ public abstract class AbstractMemStore implements MemStore { protected abstract void checkActiveSize(); /** - * Returns an ordered list of segments from most recent to oldest in memstore * @return an ordered list of segments from most recent to oldest in memstore */ protected abstract List getSegments() throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java index 4433302..802609f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ClassSize; /** * A {@link java.util.Set} of {@link Cell}s, where an add will overwrite the entry if already @@ -39,6 +40,10 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; */ @InterfaceAudience.Private public class CellSet implements NavigableSet { + + private final static long FIXED_OVERHEAD = ClassSize.CELL_SET; + public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP; + // Implemented on top of a {@link java.util.concurrent.ConcurrentSkipListMap} // Differ from CSLS in one respect, where CSLS does "Adds the specified element to this set if it // is not already present.", this implementation "Adds the specified element to this set EVEN diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index e27acce..bcc43ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; @@ -51,9 +50,6 @@ import org.apache.hadoop.hbase.wal.WAL; */ @InterfaceAudience.Private public class CompactingMemStore extends AbstractMemStore { - public final static long DEEP_OVERHEAD_PER_PIPELINE_ITEM = ClassSize.align( - ClassSize.TIMERANGE_TRACKER + ClassSize.TIMERANGE + - ClassSize.CELL_SKIPLIST_SET + ClassSize.CONCURRENT_SKIPLISTMAP); // Default fraction of in-memory-flush size w.r.t. flush-to-disk size public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY = "hbase.memstore.inmemoryflush.threshold.factor"; @@ -69,6 +65,10 @@ public class CompactingMemStore extends AbstractMemStore { private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false); private final AtomicBoolean allowCompaction = new AtomicBoolean(true); + public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD + 6 * ClassSize.REFERENCE + + Bytes.SIZEOF_LONG + 2 * ClassSize.ATOMIC_BOOLEAN + CompactionPipeline.DEEP_OVERHEAD + + MemStoreCompactor.DEEP_OVERHEAD; + public CompactingMemStore(Configuration conf, CellComparator c, HStore store, RegionServicesForStores regionServices) throws IOException { super(conf, c); @@ -94,18 +94,6 @@ public class CompactingMemStore extends AbstractMemStore { LOG.info("Setting in-memory flush size threshold to " + inmemoryFlushSize); } - public static long getSegmentSize(Segment segment) { - return segment.getSize() - DEEP_OVERHEAD_PER_PIPELINE_ITEM; - } - - public static long getSegmentsSize(List list) { - long res = 0; - for (Segment segment : list) { - res += getSegmentSize(segment); - } - return res; - } - /** * @return Total memory occupied by this MemStore. * This is not thread safe and the memstore may be changed while computing its size. @@ -113,9 +101,9 @@ public class CompactingMemStore extends AbstractMemStore { */ @Override public long size() { - long res = 0; - for (Segment item : getSegments()) { - res += item.getSize(); + long res = DEEP_OVERHEAD + this.active.size(); + for (Segment item : pipeline.getSegments()) { + res += item.size(); } return res; } @@ -145,7 +133,7 @@ public class CompactingMemStore extends AbstractMemStore { MutableSegment active = getActive(); // If snapshot currently has entries, then flusher failed or didn't call // cleanup. Log a warning. - if (!getSnapshot().isEmpty()) { + if (!this.snapshot.isEmpty()) { LOG.warn("Snapshot called again without clearing previous. " + "Doing nothing. Another ongoing flush or did we fail last attempt?"); } else { @@ -166,10 +154,11 @@ public class CompactingMemStore extends AbstractMemStore { * On flush, how much memory we will clear. * @return size of data that is going to be flushed */ - @Override public long getFlushableSize() { - long snapshotSize = getSnapshot().getSize(); - if(snapshotSize == 0) { - //if snapshot is empty the tail of the pipeline is flushed + @Override + public long getFlushableSize() { + long snapshotSize = getSnapshotSize(); + if (snapshotSize == 0) { + // if snapshot is empty the tail of the pipeline is flushed snapshotSize = pipeline.getTailSize(); } return snapshotSize > 0 ? snapshotSize : keySize(); @@ -191,10 +180,10 @@ public class CompactingMemStore extends AbstractMemStore { @Override public List getSegments() { List pipelineList = pipeline.getSegments(); - List list = new LinkedList(); + List list = new ArrayList(pipelineList.size() + 2); list.add(getActive()); list.addAll(pipelineList); - list.add(getSnapshot()); + list.add(this.snapshot); return list; } @@ -238,7 +227,7 @@ public class CompactingMemStore extends AbstractMemStore { list.add(item.getSegmentScanner(readPt, order)); order--; } - list.add(getSnapshot().getSegmentScanner(readPt, order)); + list.add(this.snapshot.getSegmentScanner(readPt, order)); return Collections. singletonList( new MemStoreScanner((AbstractMemStore) this, list, readPt)); } @@ -314,7 +303,7 @@ public class CompactingMemStore extends AbstractMemStore { } private boolean shouldFlushInMemory() { - if (getActive().getSize() > inmemoryFlushSize) { + if (getActive().size() > inmemoryFlushSize) { // size above flush threshold return inMemoryFlushInProgress.compareAndSet(false, true); } @@ -335,8 +324,6 @@ public class CompactingMemStore extends AbstractMemStore { private void pushActiveToPipeline(MutableSegment active) { if (!active.isEmpty()) { - long delta = DEEP_OVERHEAD_PER_PIPELINE_ITEM - DEEP_OVERHEAD; - active.setSize(active.getSize() + delta); pipeline.pushHead(active); resetCellSet(); } @@ -345,9 +332,7 @@ public class CompactingMemStore extends AbstractMemStore { private void pushTailToSnapshot() { ImmutableSegment tail = pipeline.pullTail(); if (!tail.isEmpty()) { - setSnapshot(tail); - long size = getSegmentSize(tail); - setSnapshotSize(size); + this.snapshot = tail; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 3f3bf8d..30ffcab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -24,7 +24,10 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; /** * The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments. @@ -39,13 +42,16 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; public class CompactionPipeline { private static final Log LOG = LogFactory.getLog(CompactionPipeline.class); + private final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT + + (2 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); + public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.LINKEDLIST; + private final RegionServicesForStores region; private LinkedList pipeline; private long version; private static final ImmutableSegment EMPTY_MEM_STORE_SEGMENT = SegmentFactory.instance() - .createImmutableSegment(null, - CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM); + .createImmutableSegment((CellComparator)null); public CompactionPipeline(RegionServicesForStores region) { this.region = region; @@ -105,18 +111,26 @@ public class CompactionPipeline { } if (region != null) { // update the global memstore size counter - long suffixSize = CompactingMemStore.getSegmentsSize(suffix); - long newSize = CompactingMemStore.getSegmentSize(segment); + long suffixSize = getSegmentsSize(suffix); + long newSize = segment.keySize(); long delta = suffixSize - newSize; long globalMemstoreSize = region.addAndGetGlobalMemstoreSize(-delta); if (LOG.isDebugEnabled()) { - LOG.debug("Suffix size: " + suffixSize + " compacted item size: " + newSize + LOG.debug("Suffix heap size: " + suffixSize + " compacted item heap size: " + newSize + " globalMemstoreSize: " + globalMemstoreSize); } } return true; } + private static long getSegmentsSize(List list) { + long res = 0; + for (Segment segment : list) { + res += segment.keySize(); + } + return res; + } + public boolean isEmpty() { return pipeline.isEmpty(); } @@ -142,7 +156,7 @@ public class CompactionPipeline { public long getTailSize() { if(isEmpty()) return 0; - return CompactingMemStore.getSegmentSize(pipeline.peekLast()); + return pipeline.peekLast().keySize(); } private void swapSuffix(LinkedList suffix, ImmutableSegment segment) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index c21dbb5..dbeadce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -91,8 +91,7 @@ public class DefaultMemStore extends AbstractMemStore { if (!getActive().isEmpty()) { ImmutableSegment immutableSegment = SegmentFactory.instance(). createImmutableSegment(getActive()); - setSnapshot(immutableSegment); - setSnapshotSize(keySize()); + this.snapshot = immutableSegment; resetCellSet(); } } @@ -106,7 +105,7 @@ public class DefaultMemStore extends AbstractMemStore { */ @Override public long getFlushableSize() { - long snapshotSize = getSnapshot().getSize(); + long snapshotSize = getSnapshotSize(); return snapshotSize > 0 ? snapshotSize : keySize(); } @@ -117,7 +116,7 @@ public class DefaultMemStore extends AbstractMemStore { public List getScanners(long readPt) throws IOException { List list = new ArrayList(2); list.add(getActive().getSegmentScanner(readPt, 1)); - list.add(getSnapshot().getSegmentScanner(readPt, 0)); + list.add(this.snapshot.getSegmentScanner(readPt, 0)); return Collections. singletonList( new MemStoreScanner((AbstractMemStore) this, list, readPt)); } @@ -126,7 +125,7 @@ public class DefaultMemStore extends AbstractMemStore { protected List getSegments() throws IOException { List list = new ArrayList(2); list.add(getActive()); - list.add(getSnapshot()); + list.add(this.snapshot); return list; } @@ -144,12 +143,9 @@ public class DefaultMemStore extends AbstractMemStore { @Override public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) { } - /** - * @return Total memory occupied by this MemStore. - */ @Override public long size() { - return heapSize(); + return this.active.size() + DEEP_OVERHEAD; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index 13d9fbf..a87ca9e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -18,10 +18,10 @@ */ package org.apache.hadoop.hbase.regionserver; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.TimeRange; +import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.CollectionBackedScanner; /** @@ -32,6 +32,10 @@ import org.apache.hadoop.hbase.util.CollectionBackedScanner; */ @InterfaceAudience.Private public class ImmutableSegment extends Segment { + + public static final long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD + ClassSize.REFERENCE + + ClassSize.TIMERANGE; + /** * This is an immutable segment so use the read-only TimeRange rather than the heavy-weight * TimeRangeTracker with all its synchronization when doing time range stuff. @@ -64,4 +68,10 @@ public class ImmutableSegment extends Segment { return this.timeRange.getMin(); } + /** + * @return the heap size of the segment + */ + public long size() { + return cellSize.get() + DEEP_OVERHEAD; + } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 00d49d1..44b94b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -125,7 +125,9 @@ public interface MemStore extends HeapSize { List getScanners(long readPt) throws IOException; /** - * @return Total memory occupied by this MemStore. + * @return Total memory occupied by this MemStore. This includes active segment size and heap size + * overhead of this memstore but won't include any size occupied by the snapshot. We + * assume the snapshot will get cleared soon */ long size(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 042de0a..ab06af8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -24,6 +24,8 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; import java.io.IOException; import java.util.ArrayList; @@ -43,6 +45,11 @@ import java.util.concurrent.atomic.AtomicBoolean; class MemStoreCompactor { private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); + + private static final long FIXED_OVERHEAD = ClassSize.align(5 * ClassSize.REFERENCE + + Bytes.SIZEOF_LONG); + public static final long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.ATOMIC_BOOLEAN; + private CompactingMemStore compactingMemStore; private MemStoreScanner scanner; // scanner for pipeline only // scanner on top of MemStoreScanner that uses ScanQueryMatcher @@ -122,11 +129,8 @@ class MemStoreCompactor { * There is at most one thread per memstore instance. */ private void doCompaction() { - - ImmutableSegment result = SegmentFactory.instance() // create the scanner - .createImmutableSegment( - compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), - CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM); + MutableSegment result = SegmentFactory.instance().createMutableSegment( + compactingMemStore.getConfiguration(), compactingMemStore.getComparator()); // the compaction processing try { @@ -135,7 +139,8 @@ class MemStoreCompactor { // Phase II: swap the old compaction pipeline if (!isInterrupted.get()) { - if (compactingMemStore.swapCompactedSegments(versionedList, result)) { + if (compactingMemStore.swapCompactedSegments(versionedList, SegmentFactory.instance() + .createImmutableSegment(result))) { // update the wal so it can be truncated and not get too long compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater } else { @@ -178,7 +183,7 @@ class MemStoreCompactor { * Updates the given single Segment using the internal store scanner, * who in turn uses ScanQueryMatcher */ - private void compactSegments(Segment result) throws IOException { + private void compactSegments(MutableSegment result) throws IOException { List kvs = new ArrayList(); // get the limit to the size of the groups to be returned by compactingScanner @@ -198,8 +203,7 @@ class MemStoreCompactor { // now we just copy it to the new segment Cell newKV = result.maybeCloneWithAllocator(c); boolean mslabUsed = (newKV != c); - result.internalAdd(newKV, mslabUsed); - + result.add(newKV, mslabUsed); } kvs.clear(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java index f64979f..1bb4511 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java @@ -36,7 +36,7 @@ public class MemStoreSnapshot { public MemStoreSnapshot(long id, ImmutableSegment snapshot) { this.id = id; this.cellsCount = snapshot.getCellsCount(); - this.size = snapshot.getSize(); + this.size = snapshot.keySize(); this.timeRangeTracker = snapshot.getTimeRangeTracker(); this.scanner = snapshot.getKeyValueScanner(); this.tagsPresent = snapshot.isTagsPresent(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java index e62249a..77db4a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java @@ -23,14 +23,16 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; +import com.google.common.annotations.VisibleForTesting; + /** * A mutable segment in memstore, specifically the active segment. */ @InterfaceAudience.Private public class MutableSegment extends Segment { - protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB, - long size) { - super(cellSet, comparator, memStoreLAB, size); + + protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) { + super(cellSet, comparator, memStoreLAB); } /** @@ -40,55 +42,47 @@ public class MutableSegment extends Segment { * @return the change in the heap size */ public long add(Cell cell, boolean mslabUsed) { - return internalAdd(cell, mslabUsed); + boolean succ = getCellSet().add(cell); + long s = AbstractMemStore.heapSizeChange(cell, succ); + // If there's already a same cell in the CellSet and we are using MSLAB, we must count in the + // MSLAB allocation size as well, or else there will be memory leak (occupied heap size larger + // than the counted number) + if (!succ && mslabUsed) { + s += getCellLength(cell); + } + updateMetaInfo(cell, s); + return s; } - /** - * Removes the given cell from the segment - * @return the change in the heap size - */ - public long rollback(Cell cell) { - Cell found = getCellSet().get(cell); - if (found != null && found.getSequenceId() == cell.getSequenceId()) { - long sz = AbstractMemStore.heapSizeChange(cell, true); - getCellSet().remove(cell); - incSize(-sz); - return sz; + protected void updateMetaInfo(Cell toAdd, long s) { + incSize(s); + this.timeRangeTracker.includeTimestamp(toAdd); + minSequenceId = Math.min(minSequenceId, toAdd.getSequenceId()); + // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call. + // When we use ACL CP or Visibility CP which deals with Tags during + // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not + // parse the byte[] to identify the tags length. + if (toAdd.getTagsLength() > 0) { + tagsPresent = true; } - return 0; } - //methods for test - /** - * Returns the first cell in the segment * @return the first cell in the segment */ + @VisibleForTesting Cell first() { return this.getCellSet().first(); } @Override public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) { - return (getTimeRangeTracker().includesTimeRange(scan.getTimeRange()) - && (getTimeRangeTracker().getMax() >= oldestUnexpiredTS)); + return (this.timeRangeTracker.includesTimeRange(scan.getTimeRange()) + && (this.timeRangeTracker.getMax() >= oldestUnexpiredTS)); } @Override public long getMinTimestamp() { - return getTimeRangeTracker().getMin(); - } - - @Override - protected void updateMetaInfo(Cell toAdd, long s) { - getTimeRangeTracker().includeTimestamp(toAdd); - size.addAndGet(s); - // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call. - // When we use ACL CP or Visibility CP which deals with Tags during - // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not - // parse the byte[] to identify the tags length. - if(toAdd.getTagsLength() > 0) { - tagsPresent = true; - } + return this.timeRangeTracker.getMin(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java index 33c3bfb..8db9453 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.ByteRange; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; import com.google.common.annotations.VisibleForTesting; @@ -43,21 +45,27 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceAudience.Private public abstract class Segment { - private volatile CellSet cellSet; - private final CellComparator comparator; - private long minSequenceId; - private volatile MemStoreLAB memStoreLAB; - protected final AtomicLong size; + public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT + 5 + * ClassSize.REFERENCE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_BOOLEAN); + public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.ATOMIC_LONG + + CellSet.DEEP_OVERHEAD + ClassSize.TIMERANGE_TRACKER; + + protected volatile CellSet cellSet; + protected final CellComparator comparator; + protected long minSequenceId; + protected volatile MemStoreLAB memStoreLAB; + // Sum of sizes of all Cells added to this Segment. Cell's heapSize is considered. This is not + // including the heap overhead of this class. + protected final AtomicLong cellSize; protected volatile boolean tagsPresent; - private final TimeRangeTracker timeRangeTracker; + protected final TimeRangeTracker timeRangeTracker; - protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB, - long size) { + protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) { this.cellSet = cellSet; this.comparator = comparator; this.minSequenceId = Long.MAX_VALUE; this.memStoreLAB = memStoreLAB; - this.size = new AtomicLong(size); + this.cellSize = new AtomicLong(0); this.tagsPresent = false; this.timeRangeTracker = new TimeRangeTracker(); } @@ -67,7 +75,7 @@ public abstract class Segment { this.comparator = segment.getComparator(); this.minSequenceId = segment.getMinSequenceId(); this.memStoreLAB = segment.getMemStoreLAB(); - this.size = new AtomicLong(segment.getSize()); + this.cellSize = new AtomicLong(segment.keySize()); this.tagsPresent = segment.isTagsPresent(); this.timeRangeTracker = segment.getTimeRangeTracker(); } @@ -89,7 +97,6 @@ public abstract class Segment { } /** - * Returns whether the segment has any cells * @return whether the segment has any cells */ public boolean isEmpty() { @@ -97,7 +104,6 @@ public abstract class Segment { } /** - * Returns number of cells in segment * @return number of cells in segment */ public int getCellsCount() { @@ -105,7 +111,6 @@ public abstract class Segment { } /** - * Returns the first cell in the segment that has equal or greater key than the given cell * @return the first cell in the segment that has equal or greater key than the given cell */ public Cell getFirstAfter(Cell cell) { @@ -120,9 +125,8 @@ public abstract class Segment { * Closing a segment before it is being discarded */ public void close() { - MemStoreLAB mslab = getMemStoreLAB(); - if(mslab != null) { - mslab.close(); + if (this.memStoreLAB != null) { + this.memStoreLAB.close(); } // do not set MSLab to null as scanners may still be reading the data here and need to decrease // the counter when they finish @@ -134,12 +138,12 @@ public abstract class Segment { * @return either the given cell or its clone */ public Cell maybeCloneWithAllocator(Cell cell) { - if (getMemStoreLAB() == null) { + if (this.memStoreLAB == null) { return cell; } int len = getCellLength(cell); - ByteRange alloc = getMemStoreLAB().allocateBytes(len); + ByteRange alloc = this.memStoreLAB.allocateBytes(len); if (alloc == null) { // The allocation was too large, allocator decided // not to do anything with it. @@ -169,40 +173,36 @@ public abstract class Segment { } public void incScannerCount() { - if(getMemStoreLAB() != null) { - getMemStoreLAB().incScannerCount(); + if (this.memStoreLAB != null) { + this.memStoreLAB.incScannerCount(); } } public void decScannerCount() { - if(getMemStoreLAB() != null) { - getMemStoreLAB().decScannerCount(); + if (this.memStoreLAB != null) { + this.memStoreLAB.decScannerCount(); } } /** - * Setting the heap size of the segment - used to account for different class overheads - * @return this object + * @return Sum of all cell's size. */ - - public Segment setSize(long size) { - this.size.set(size); - return this; + public long keySize() { + return cellSize.get(); } - + /** - * Returns the heap size of the segment * @return the heap size of the segment */ - public long getSize() { - return size.get(); + public long size() { + return keySize() + DEEP_OVERHEAD; } /** * Increases the heap size counter of the segment by the given delta */ public void incSize(long delta) { - size.addAndGet(delta); + cellSize.addAndGet(delta); } public long getMinSequenceId() { @@ -250,32 +250,6 @@ public abstract class Segment { return comparator; } - protected long internalAdd(Cell cell, boolean mslabUsed) { - boolean succ = getCellSet().add(cell); - long s = AbstractMemStore.heapSizeChange(cell, succ); - // If there's already a same cell in the CellSet and we are using MSLAB, we must count in the - // MSLAB allocation size as well, or else there will be memory leak (occupied heap size larger - // than the counted number) - if (!succ && mslabUsed) { - s += getCellLength(cell); - } - updateMetaInfo(cell, s); - return s; - } - - protected void updateMetaInfo(Cell toAdd, long s) { - getTimeRangeTracker().includeTimestamp(toAdd); - size.addAndGet(s); - minSequenceId = Math.min(minSequenceId, toAdd.getSequenceId()); - // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call. - // When we use ACL CP or Visibility CP which deals with Tags during - // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not - // parse the byte[] to identify the tags length. - if(toAdd.getTagsLength() > 0) { - tagsPresent = true; - } - } - /** * Returns a subset of the segment cell set, which starts with the given cell * @param firstCell a cell in the segment @@ -286,7 +260,7 @@ public abstract class Segment { } @VisibleForTesting - public MemStoreLAB getMemStoreLAB() { + MemStoreLAB getMemStoreLAB() { return memStoreLAB; } @@ -305,7 +279,8 @@ public abstract class Segment { String res = "Store segment of type "+this.getClass().getName()+"; "; res += "isEmpty "+(isEmpty()?"yes":"no")+"; "; res += "cellCount "+getCellsCount()+"; "; - res += "size "+getSize()+"; "; + res += "cellsSize "+keySize()+"; "; + res += "heapSize "+size()+"; "; res += "Min ts "+getMinTimestamp()+"; "; return res; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java index 7ac80ae..9b789e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java @@ -34,41 +34,36 @@ public final class SegmentFactory { static final boolean USEMSLAB_DEFAULT = true; static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class"; - private SegmentFactory() {} private static SegmentFactory instance = new SegmentFactory(); - public static SegmentFactory instance() { - return instance; + + private SegmentFactory() { } - public ImmutableSegment createImmutableSegment(final Configuration conf, - final CellComparator comparator, long size) { - MemStoreLAB memStoreLAB = getMemStoreLAB(conf); - MutableSegment segment = generateMutableSegment(conf, comparator, memStoreLAB, size); - return createImmutableSegment(segment); + public static SegmentFactory instance() { + return instance; } - public ImmutableSegment createImmutableSegment(CellComparator comparator, - long size) { - MutableSegment segment = generateMutableSegment(null, comparator, null, size); + public ImmutableSegment createImmutableSegment(CellComparator comparator) { + MutableSegment segment = generateMutableSegment(null, comparator, null); return createImmutableSegment(segment); } public ImmutableSegment createImmutableSegment(MutableSegment segment) { return new ImmutableSegment(segment); } - public MutableSegment createMutableSegment(final Configuration conf, - CellComparator comparator, long size) { + + public MutableSegment createMutableSegment(final Configuration conf, CellComparator comparator) { MemStoreLAB memStoreLAB = getMemStoreLAB(conf); - return generateMutableSegment(conf, comparator, memStoreLAB, size); + return generateMutableSegment(conf, comparator, memStoreLAB); } //****** private methods to instantiate concrete store segments **********// - private MutableSegment generateMutableSegment( - final Configuration conf, CellComparator comparator, MemStoreLAB memStoreLAB, long size) { + private MutableSegment generateMutableSegment(final Configuration conf, + CellComparator comparator, MemStoreLAB memStoreLAB) { // TBD use configuration to set type of segment CellSet set = new CellSet(comparator); - return new MutableSegment(set, comparator, memStoreLAB, size); + return new MutableSegment(set, comparator, memStoreLAB); } private MemStoreLAB getMemStoreLAB(Configuration conf) { @@ -80,5 +75,4 @@ public final class SegmentFactory { } return memStoreLAB; } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index 09e2271..6d82f3f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.regionserver.CellSet; import org.apache.hadoop.hbase.regionserver.DefaultMemStore; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Segment; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -240,7 +241,7 @@ public class TestHeapSize { // CellSet cl = CellSet.class; expected = ClassSize.estimateBase(cl, false); - actual = ClassSize.CELL_SKIPLIST_SET; + actual = ClassSize.CELL_SET; if (expected != actual) { ClassSize.estimateBase(cl, true); assertEquals(expected, actual); @@ -305,11 +306,20 @@ public class TestHeapSize { // DefaultMemStore Deep Overhead actual = DefaultMemStore.DEEP_OVERHEAD; expected = ClassSize.estimateBase(cl, false); + if(expected != actual) { + ClassSize.estimateBase(cl, true); + assertEquals(expected, actual); + } + + // Segment Deep overhead + cl = Segment.class; + actual = Segment.DEEP_OVERHEAD; + expected = ClassSize.estimateBase(cl, false); expected += ClassSize.estimateBase(AtomicLong.class, false); expected += ClassSize.estimateBase(CellSet.class, false); expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false); expected += ClassSize.estimateBase(TimeRangeTracker.class, false); - if(expected != actual) { + if (expected != actual) { ClassSize.estimateBase(cl, true); ClassSize.estimateBase(AtomicLong.class, true); ClassSize.estimateBase(CellSet.class, true); @@ -317,7 +327,6 @@ public class TestHeapSize { ClassSize.estimateBase(TimeRangeTracker.class, true); assertEquals(expected, actual); } - // Store Overhead cl = HStore.class; actual = HStore.FIXED_OVERHEAD; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index c5aae00..c74e67a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -373,7 +373,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { private long runSnapshot(final AbstractMemStore hmc, boolean useForce) throws IOException { // Save off old state. - long oldHistorySize = hmc.getSnapshot().getSize(); + long oldHistorySize = hmc.getSnapshot().keySize(); long prevTimeStamp = hmc.timeOfOldestEdit(); hmc.snapshot(); @@ -543,10 +543,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore { memstore.add(new KeyValue(row, fam, qf1, 3, val)); assertEquals(3, memstore.getActive().getCellsCount()); - while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) { - Threads.sleep(10); - } - assertTrue(chunkPool.getPoolSize() == 0); // Chunks will be put back to pool after close scanners; @@ -593,9 +589,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore { long size = memstore.getFlushableSize(); ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact - while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) { - Threads.sleep(10); - } assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(376, regionServicesForStores.getGlobalMemstoreTotalSize()); @@ -621,9 +614,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore { long size = memstore.getFlushableSize(); ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact - while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) { - Threads.sleep(1000); - } assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(376, regionServicesForStores.getGlobalMemstoreTotalSize()); @@ -632,9 +622,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore { size = memstore.getFlushableSize(); ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact - while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) { - Threads.sleep(10); - } assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize()); @@ -663,9 +650,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore { String tstStr = "\n\nFlushable size after first flush in memory:" + size + ". Is MemmStore in compaction?:" + ((CompactingMemStore)memstore).isMemStoreFlushingInMemory(); - while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) { - Threads.sleep(10); - } assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(376, regionServicesForStores.getGlobalMemstoreTotalSize()); @@ -689,9 +673,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore { ((CompactingMemStore)memstore).enableCompaction(); size = memstore.getFlushableSize(); ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact - while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) { - Threads.sleep(10); - } assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize()); @@ -710,7 +691,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { private void addRowsByKeys(final AbstractMemStore hmc, String[] keys) { byte[] fam = Bytes.toBytes("testfamily"); byte[] qf = Bytes.toBytes("testqualifier"); - long size = hmc.getActive().getSize(); + long size = hmc.getActive().keySize(); for (int i = 0; i < keys.length; i++) { long timestamp = System.currentTimeMillis(); Threads.sleep(1); // to make sure each kv gets a different ts @@ -720,7 +701,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { hmc.add(kv); LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); } - regionServicesForStores.addAndGetGlobalMemstoreSize(hmc.getActive().getSize() - size); + regionServicesForStores.addAndGetGlobalMemstoreSize(hmc.getActive().keySize() - size); } private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java index be604af..75d2280 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java @@ -19,15 +19,11 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CategoryBasedTimeout; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.TestMobSnapshotCloneIndependence; -import org.apache.hadoop.hbase.master.procedure.TestMasterFailoverWithProcedures; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; import org.apache.hadoop.hbase.wal.WAL; @@ -44,7 +40,6 @@ import org.junit.rules.TestRule; public class TestHRegionWithInMemoryFlush extends TestHRegion{ // Do not spin up clusters in here. If you need to spin up a cluster, do it // over in TestHRegionOnCluster. - private static final Log LOG = LogFactory.getLog(TestHRegionWithInMemoryFlush.class); @ClassRule public static final TestRule timeout = CategoryBasedTimeout.forClass(TestHRegionWithInMemoryFlush.class); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java index 2acfd12..2a6df9d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java @@ -21,8 +21,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -43,7 +41,6 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -59,7 +56,6 @@ import static org.junit.Assert.assertTrue; @Category({ RegionServerTests.class, LargeTests.class }) public class TestWalAndCompactingMemStoreFlush { - private static final Log LOG = LogFactory.getLog(TestWalAndCompactingMemStoreFlush.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion"); public static final TableName TABLENAME = TableName.valueOf("TestWalAndCompactingMemStoreFlush", @@ -201,12 +197,13 @@ public class TestWalAndCompactingMemStoreFlush { // memstores of CF1, CF2 and CF3. String msg = "totalMemstoreSize="+totalMemstoreSize + " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD + - " DEEP_OVERHEAD_PER_PIPELINE_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM + " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI + " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI + " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ; - assertEquals(msg,totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, - cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI); + assertEquals(msg, totalMemstoreSize + 2 + * (CompactingMemStore.DEEP_OVERHEAD + Segment.DEEP_OVERHEAD) + + (DefaultMemStore.DEEP_OVERHEAD + Segment.DEEP_OVERHEAD), cf1MemstoreSizePhaseI + + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI); // Flush!!!!!!!!!!!!!!!!!!!!!! // We have big compacting memstore CF1 and two small memstores: @@ -219,11 +216,6 @@ public class TestWalAndCompactingMemStoreFlush { ((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory(); region.flush(false); - // CF3 should be compacted so wait here to be sure the compaction is done - while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore()) - .isMemStoreFlushingInMemory()) - Threads.sleep(10); - // Recalculate everything long cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); @@ -238,24 +230,20 @@ public class TestWalAndCompactingMemStoreFlush { s = s + "DefaultMemStore DEEP_OVERHEAD is:" + DefaultMemStore.DEEP_OVERHEAD + ", CompactingMemStore DEEP_OVERHEAD is:" + CompactingMemStore.DEEP_OVERHEAD - + ", CompactingMemStore DEEP_OVERHEAD_PER_PIPELINE_ITEM is:" + CompactingMemStore - .DEEP_OVERHEAD_PER_PIPELINE_ITEM + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n" + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n"; // CF1 was flushed to memory, but there is nothing to compact, should // remain the same size plus renewed empty skip-list - assertEquals(s, cf1MemstoreSizePhaseII, - cf1MemstoreSizePhaseI + CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM); + assertEquals(s, cf1MemstoreSizePhaseI + ImmutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseII); // CF2 should become empty - assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseII); + assertEquals(DefaultMemStore.DEEP_OVERHEAD + Segment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII); // verify that CF3 was flushed to memory and was compacted (this is approximation check) - assertTrue(cf3MemstoreSizePhaseI/2+DefaultMemStore.DEEP_OVERHEAD + - CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM > - cf3MemstoreSizePhaseII); + assertTrue(cf3MemstoreSizePhaseI / 2 + CompactingMemStore.DEEP_OVERHEAD + ImmutableSegment.DEEP_OVERHEAD + > cf3MemstoreSizePhaseII); assertTrue(cf3MemstoreSizePhaseI/2 < cf3MemstoreSizePhaseII); @@ -311,9 +299,9 @@ public class TestWalAndCompactingMemStoreFlush { // CF1's pipeline component (inserted before first flush) should be flushed to disk // CF2 should be flushed to disk - assertEquals(cf1MemstoreSizePhaseIII - cf1MemstoreSizePhaseI + DefaultMemStore.DEEP_OVERHEAD, + assertEquals(cf1MemstoreSizePhaseIII - cf1MemstoreSizePhaseI + CompactingMemStore.DEEP_OVERHEAD + Segment.DEEP_OVERHEAD, cf1MemstoreSizePhaseIV); - assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV); + assertEquals(DefaultMemStore.DEEP_OVERHEAD + Segment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV); // CF3 shouldn't have been touched. assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII); @@ -337,9 +325,9 @@ public class TestWalAndCompactingMemStoreFlush { long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region) .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); - assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSizePhaseV); - assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseV); - assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSizePhaseV); + assertEquals(CompactingMemStore.DEEP_OVERHEAD + Segment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV); + assertEquals(DefaultMemStore.DEEP_OVERHEAD + Segment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV); + assertEquals(CompactingMemStore.DEEP_OVERHEAD + Segment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV); // Because there is nothing in any memstore the WAL's LSN should be -1 assertEquals(smallestSeqInRegionCurrentMemstorePhaseV, HConstants.NO_SEQNUM); @@ -406,9 +394,9 @@ public class TestWalAndCompactingMemStoreFlush { long totalMemstoreSize = region.getMemstoreSize(); // Find the sizes of the memstores of each CF. - long cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); - long cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); - long cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); + long cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize(); // Compacting Memstore + long cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize(); // Normal Memstore + long cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize(); // Compacting Memstore // Some other sanity checks. assertTrue(cf1MemstoreSizePhaseI > 0); @@ -419,23 +407,17 @@ public class TestWalAndCompactingMemStoreFlush { // memstores of CF1, CF2 and CF3. String msg = "totalMemstoreSize="+totalMemstoreSize + " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD + - " DEEP_OVERHEAD_PER_PIPELINE_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM + " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI + " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI + " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ; - assertEquals(msg, totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, - cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI); + assertEquals(msg, totalMemstoreSize + 2 + * (CompactingMemStore.DEEP_OVERHEAD + Segment.DEEP_OVERHEAD) + + (DefaultMemStore.DEEP_OVERHEAD + Segment.DEEP_OVERHEAD), cf1MemstoreSizePhaseI + + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI); // Flush! ((CompactingMemStore) region.getStore(FAMILY1).getMemStore()).flushInMemory(); ((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory(); - // CF1 and CF3 should be compacted so wait here to be sure the compaction is done - while (((CompactingMemStore) region.getStore(FAMILY1).getMemStore()) - .isMemStoreFlushingInMemory()) - Threads.sleep(10); - while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore()) - .isMemStoreFlushingInMemory()) - Threads.sleep(10); region.flush(false); long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); @@ -447,7 +429,7 @@ public class TestWalAndCompactingMemStoreFlush { long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); // CF2 should have been cleared - assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseII); + assertEquals(DefaultMemStore.DEEP_OVERHEAD + Segment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII); String s = "\n\n----------------------------------\n" + "Upon initial insert and flush, LSN of CF1 is:" @@ -485,13 +467,6 @@ public class TestWalAndCompactingMemStoreFlush { // Flush! ((CompactingMemStore) region.getStore(FAMILY1).getMemStore()).flushInMemory(); ((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory(); - // CF1 and CF3 should be compacted so wait here to be sure the compaction is done - while (((CompactingMemStore) region.getStore(FAMILY1).getMemStore()) - .isMemStoreFlushingInMemory()) - Threads.sleep(10); - while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore()) - .isMemStoreFlushingInMemory()) - Threads.sleep(10); region.flush(false); long smallestSeqInRegionCurrentMemstorePhaseIV =