.../hadoop/hbase/mob/DefaultMobStoreFlusher.java | 8 +- .../hbase/mob/mapreduce/MemStoreWrapper.java | 14 ++- .../hbase/regionserver/AbstractMemStore.java | 67 +++++------ .../hbase/regionserver/CompactingMemStore.java | 109 +++++++++++------- .../hbase/regionserver/CompactionPipeline.java | 17 +++ .../hadoop/hbase/regionserver/DefaultMemStore.java | 37 +++++-- .../hbase/regionserver/DefaultStoreFlusher.java | 6 +- .../apache/hadoop/hbase/regionserver/HStore.java | 4 +- .../hbase/regionserver/MemStoreCompactor.java | 1 - .../hbase/regionserver/MemStoreSnapshot.java | 35 ++++-- .../hbase/regionserver/PipelineMemstore.java | 123 +++++++++++++++++++++ .../hbase/regionserver/StripeStoreFlusher.java | 8 +- .../hbase/regionserver/TestCompactingMemStore.java | 91 +++++++++++---- .../hbase/regionserver/TestDefaultMemStore.java | 9 +- .../hbase/regionserver/TestMemStoreChunkPool.java | 4 +- 15 files changed, 404 insertions(+), 129 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java index eb6c739..32d73d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java @@ -35,13 +35,16 @@ import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.AbstractMemStore; import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; import org.apache.hadoop.hbase.regionserver.HMobStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.MemStoreScanner; import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.MemStoreScanner.Type; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.StringUtils; @@ -104,7 +107,10 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher { // Use a store scanner to find which rows to flush. long smallestReadPoint = store.getSmallestReadPoint(); - InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint); + MemStoreScanner msScanner = + new MemStoreScanner(((AbstractMemStore) store.getMemStore()), + snapshot.getScanner(), Long.MAX_VALUE, Type.COMPACT_FORWARD); + InternalScanner scanner = createScanner(msScanner, smallestReadPoint); if (scanner == null) { return result; // NULL scanner returned from coprocessor hooks means skip normal processing } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java index 5955cc2..ece8b6f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.mob.mapreduce; import java.io.IOException; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -140,12 +141,13 @@ public class MemStoreWrapper { LOG.info("Create files under a temp directory " + mobFileWriter.getPath().toString()); byte[] referenceValue = Bytes.toBytes(relativePath); - KeyValueScanner scanner = snapshot.getScanner(); + // TODO : To be changed + List scanner = snapshot.getScanner(); Cell cell = null; - while (null != (cell = scanner.next())) { + while (null != (cell = scanner.get(0).next())) { mobFileWriter.append(cell); } - scanner.close(); + scanner.get(0).close(); // Write out the log sequence number that corresponds to this output // hfile. The hfile is current up to and including logCacheFlushId. mobFileWriter.appendMetadata(Long.MAX_VALUE, false, snapshot.getCellsCount()); @@ -155,12 +157,12 @@ public class MemStoreWrapper { context.getCounter(SweepCounter.FILE_AFTER_MERGE_OR_CLEAN).increment(1); // write reference/fileName back to the store files of HBase. scanner = snapshot.getScanner(); - scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW)); + scanner.get(0).seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW)); cell = null; Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE, Bytes.toBytes(this.table.getName().toString())); long updatedCount = 0; - while (null != (cell = scanner.next())) { + while (null != (cell = scanner.get(0).next())) { KeyValue reference = MobUtils.createMobRefKeyValue(cell, referenceValue, tableNameTag); Put put = new Put(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength()); @@ -170,7 +172,7 @@ public class MemStoreWrapper { } table.flush(); context.getCounter(SweepCounter.RECORDS_UPDATED).increment(updatedCount); - scanner.close(); + scanner.get(0).close(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index 6769b5e..b886328 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NavigableSet; @@ -44,7 +45,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @InterfaceAudience.Private public abstract class AbstractMemStore implements MemStore { - private static final long NO_SNAPSHOT_ID = -1; + protected static final long NO_SNAPSHOT_ID = -1; private final Configuration conf; private final CellComparator comparator; @@ -52,7 +53,7 @@ public abstract class AbstractMemStore implements MemStore { // active segment absorbs write operations private volatile MutableSegment active; // Snapshot of memstore. Made for flusher. - private volatile ImmutableSegment snapshot; + private volatile List snapshot; protected volatile long snapshotId; // Used to track when to flush private volatile long timeOfOldestEdit; @@ -71,7 +72,8 @@ public abstract class AbstractMemStore implements MemStore { this.conf = conf; this.comparator = c; resetCellSet(); - this.snapshot = SegmentFactory.instance().createImmutableSegment(conf, c, 0); + this.snapshot = + Collections.singletonList(SegmentFactory.instance().createImmutableSegment(c, 0)); this.snapshotId = NO_SNAPSHOT_ID; } @@ -162,28 +164,6 @@ public abstract class AbstractMemStore implements MemStore { } /** - * The passed snapshot was successfully persisted; it can be let go. - * @param id Id of the snapshot to clean out. - * @see MemStore#snapshot() - */ - @Override - public void clearSnapshot(long id) throws UnexpectedStateException { - if (this.snapshotId != id) { - throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed " - + id); - } - // OK. Passed in snapshot is same as current snapshot. If not-empty, - // create a new snapshot and let the old one go. - Segment oldSnapshot = this.snapshot; - if (!this.snapshot.isEmpty()) { - this.snapshot = SegmentFactory.instance().createImmutableSegment( - getComparator(), 0); - } - this.snapshotId = NO_SNAPSHOT_ID; - oldSnapshot.close(); - } - - /** * Get the entire heap usage for this MemStore not including keys in the * snapshot. */ @@ -194,7 +174,11 @@ public abstract class AbstractMemStore implements MemStore { @Override public long getSnapshotSize() { - return getSnapshot().getSize(); + long size = 0l; + for (ImmutableSegment segment : getSnapshot()) { + size += segment.getSize(); + } + return size; } @Override @@ -218,7 +202,9 @@ public abstract class AbstractMemStore implements MemStore { protected void dump(Log log) { active.dump(log); - snapshot.dump(log); + for (ImmutableSegment segment : snapshot) { + segment.dump(log); + } } @@ -342,7 +328,7 @@ public abstract class AbstractMemStore implements MemStore { long newValue, long now) { Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier); // Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit. - Cell snc = snapshot.getFirstAfter(firstCell); + Cell snc = getFirstAfterInSnapshot(firstCell); if(snc != null) { // is there a matching Cell in the snapshot? if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) { @@ -379,6 +365,22 @@ public abstract class AbstractMemStore implements MemStore { return upsert(cells, 1L); } + // TODO : verify this + protected Cell getFirstAfterInSnapshot(Cell firstCell) { + Cell firstCellToReturn = null; + for (ImmutableSegment segment : getSnapshot()) { + if (firstCellToReturn == null) { + firstCellToReturn = segment.getFirstAfter(firstCell); + } else { + Cell cell = segment.getFirstAfter(firstCell); + if (this.comparator.compare(cell, firstCellToReturn) < 0) { + firstCellToReturn = cell; + } + } + } + return firstCellToReturn; + } + private Cell maybeCloneWithAllocator(Cell cell) { return active.maybeCloneWithAllocator(cell); } @@ -414,19 +416,18 @@ public abstract class AbstractMemStore implements MemStore { return active; } - protected ImmutableSegment getSnapshot() { + protected List getSnapshot() { return snapshot; } - protected AbstractMemStore setSnapshot(ImmutableSegment snapshot) { + protected AbstractMemStore setSnapshot(List snapshot) { this.snapshot = snapshot; return this; } - protected void setSnapshotSize(long snapshotSize) { - getSnapshot().setSize(snapshotSize); + protected void setSnapshotSize(ImmutableSegment segment, long size) { + segment.setSize(size); } - /** * Check whether anything need to be done based on the current active set size */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index d4bbfa6..21b9fc9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; -import java.util.Iterator; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; @@ -63,11 +62,11 @@ public class CompactingMemStore extends AbstractMemStore { private static final Log LOG = LogFactory.getLog(CompactingMemStore.class); private Store store; private RegionServicesForStores regionServices; - private CompactionPipeline pipeline; + protected CompactionPipeline pipeline; private MemStoreCompactor compactor; // the threshold on active size for in-memory flush private long inmemoryFlushSize; - private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false); + protected final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false); @VisibleForTesting private final AtomicBoolean allowCompaction = new AtomicBoolean(true); @@ -162,11 +161,38 @@ public class CompactingMemStore extends AbstractMemStore { } /** + * The passed snapshot was successfully persisted; it can be let go. + * @param id Id of the snapshot to clean out. + * @see MemStore#snapshot() + */ + @Override + public void clearSnapshot(long id) throws UnexpectedStateException { + if (this.snapshotId != id) { + throw new UnexpectedStateException( + "Current snapshot id is " + this.snapshotId + ",passed " + id); + } + // OK. Passed in snapshot is same as current snapshot. If not-empty, + // create a new snapshot and let the old one go. + List oldSnapshot = this.getSnapshot(); + for (ImmutableSegment segment : oldSnapshot) { + if (!segment.isEmpty()) { + // clear + this.setSnapshot(Collections + .singletonList(SegmentFactory.instance().createImmutableSegment(getComparator(), 0))); + break; + } + } + this.snapshotId = NO_SNAPSHOT_ID; + for (ImmutableSegment segment : oldSnapshot) { + segment.close(); + } + } + /** * On flush, how much memory we will clear. * @return size of data that is going to be flushed */ @Override public long getFlushableSize() { - long snapshotSize = getSnapshot().getSize(); + long snapshotSize = getSnapshotSize(); if(snapshotSize == 0) { //if snapshot is empty the tail of the pipeline is flushed snapshotSize = pipeline.getTailSize(); @@ -193,7 +219,7 @@ public class CompactingMemStore extends AbstractMemStore { List list = new LinkedList(); list.add(getActive()); list.addAll(pipelineList); - list.add(getSnapshot()); + list.addAll(getSnapshot()); return list; } @@ -240,7 +266,9 @@ public class CompactingMemStore extends AbstractMemStore { list.add(item.getSegmentScanner(readPt, order)); order--; } - list.add(getSnapshot().getSegmentScanner(readPt, order)); + for(ImmutableSegment segment : getSnapshot()) { + list.add(segment.getSegmentScanner(readPt, order)); + } return Collections. singletonList( new MemStoreScanner((AbstractMemStore) this, list, readPt)); } @@ -272,35 +300,38 @@ public class CompactingMemStore extends AbstractMemStore { // otherwise there is a deadlock @VisibleForTesting void flushInMemory() throws IOException { - // Phase I: Update the pipeline - getRegionServices().blockUpdates(); - try { - MutableSegment active = getActive(); - if (LOG.isTraceEnabled()) { - LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline, " - + "and initiating compaction."); - } - pushActiveToPipeline(active); - } finally { - getRegionServices().unblockUpdates(); - } - // Phase II: Compact the pipeline + // setting the inMemoryFlushInProgress flag again for the case this method is invoked + // directly (only in tests) in the common path setting from true to true is idempotent + // Speculative compaction execution, may be interrupted if flush is forced while + // compaction is in progress + inMemoryFlushInProgress.set(true); try { - if (allowCompaction.get() && inMemoryFlushInProgress.compareAndSet(false, true)) { - // setting the inMemoryFlushInProgress flag again for the case this method is invoked - // directly (only in tests) in the common path setting from true to true is idempotent - // Speculative compaction execution, may be interrupted if flush is forced while - // compaction is in progress - compactor.startCompaction(); - } else { + // Phase I: Update the pipeline + getRegionServices().blockUpdates(); + try { + MutableSegment active = getActive(); if (LOG.isTraceEnabled()) { - LOG.trace("Already inmemory compaction is progress"); + LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline, " + + "and initiating compaction."); } + pushActiveToPipeline(active); + } finally { + getRegionServices().unblockUpdates(); + } + if (!allowCompaction.get()) { + return; } - } catch (IOException e) { - LOG.warn("Unable to run memstore compaction. region " - + getRegionServices().getRegionInfo().getRegionNameAsString() - + "store: "+ getFamilyName(), e); + // Phase II: Compact the pipeline + try { + compactor.startCompaction(); + } catch (IOException e) { + LOG.warn("Unable to run memstore compaction. region " + + getRegionServices().getRegionInfo().getRegionNameAsString() + "store: " + + getFamilyName(), + e); + } + } finally { + inMemoryFlushInProgress.set(false); } } @@ -312,10 +343,10 @@ public class CompactingMemStore extends AbstractMemStore { return getRegionServices().getInMemoryCompactionPool(); } - private boolean shouldFlushInMemory() { + protected boolean shouldFlushInMemory() { if(getActive().getSize() > inmemoryFlushSize) { // size above flush threshold - return (allowCompaction.get() && !inMemoryFlushInProgress.get()); + return inMemoryFlushInProgress.compareAndSet(false, true); } return false; } @@ -325,14 +356,14 @@ public class CompactingMemStore extends AbstractMemStore { * The compaction may still happen if the request was sent too late * Non-blocking request */ - private void stopCompaction() { + protected void stopCompaction() { if (inMemoryFlushInProgress.get()) { compactor.stopCompact(); inMemoryFlushInProgress.set(false); } } - private void pushActiveToPipeline(MutableSegment active) { + protected void pushActiveToPipeline(MutableSegment active) { if (!active.isEmpty()) { long delta = DEEP_OVERHEAD_PER_PIPELINE_ITEM - DEEP_OVERHEAD; active.setSize(active.getSize() + delta); @@ -344,13 +375,13 @@ public class CompactingMemStore extends AbstractMemStore { private void pushTailToSnapshot() { ImmutableSegment tail = pipeline.pullTail(); if (!tail.isEmpty()) { - setSnapshot(tail); + setSnapshot(Collections.singletonList(tail)); long size = getSegmentSize(tail); - setSnapshotSize(size); + setSnapshotSize(tail, size); } } - private RegionServicesForStores getRegionServices() { + protected RegionServicesForStores getRegionServices() { return regionServices; } @@ -407,7 +438,7 @@ public class CompactingMemStore extends AbstractMemStore { } // debug method - private void debug() { + protected void debug() { String msg = "active size="+getActive().getSize(); msg += " threshold="+IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT* inmemoryFlushSize; msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index e33ceae..fd8ca4c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -77,6 +77,23 @@ public class CompactionPipeline { return res; } } + + // TODO : Unify many similar APIs + public List getSegmentsAndIncrementVersion() { + synchronized (pipeline) { + List res = new LinkedList(pipeline); + // Increment the version also + version++; + return res; + } + } + + public void removeCurrentPipeline(List segmentsInPipeLine) { + synchronized (pipeline) { + // do not close the segments. + pipeline.removeAll(segmentsInPipeLine); + } + } /** * Swaps the versioned list at the tail of the pipeline with the new compacted segment. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 8904f46..78d6a45 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -84,7 +84,7 @@ public class DefaultMemStore extends AbstractMemStore { public MemStoreSnapshot snapshot() { // If snapshot currently has entries, then flusher failed or didn't call // cleanup. Log a warning. - if (!getSnapshot().isEmpty()) { + if (!getSnapshot().get(0).isEmpty()) { LOG.warn("Snapshot called again without clearing previous. " + "Doing nothing. Another ongoing flush or did we fail last attempt?"); } else { @@ -92,8 +92,8 @@ public class DefaultMemStore extends AbstractMemStore { if (!getActive().isEmpty()) { ImmutableSegment immutableSegment = SegmentFactory.instance(). createImmutableSegment(getActive()); - setSnapshot(immutableSegment); - setSnapshotSize(keySize()); + setSnapshot(Collections.singletonList(immutableSegment)); + getSnapshot().get(0).setSize(keySize()); resetCellSet(); } } @@ -102,13 +102,36 @@ public class DefaultMemStore extends AbstractMemStore { } /** + * The passed snapshot was successfully persisted; it can be let go. + * @param id Id of the snapshot to clean out. + * @see MemStore#snapshot() + */ + @Override + public void clearSnapshot(long id) throws UnexpectedStateException { + if (this.snapshotId != id) { + throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed " + + id); + } + // OK. Passed in snapshot is same as current snapshot. If not-empty, + // create a new snapshot and let the old one go. + List oldSnapshot = this.getSnapshot(); + if (!this.getSnapshot().get(0).isEmpty()) { + this.setSnapshot(Collections.singletonList(SegmentFactory.instance().createImmutableSegment( + getComparator(), 0))); + } + this.snapshotId = NO_SNAPSHOT_ID; + for (ImmutableSegment segment : oldSnapshot) { + segment.close(); + } + } + /** * On flush, how much memory we will clear from the active cell set. * * @return size of data that is going to be flushed from active set */ @Override public long getFlushableSize() { - long snapshotSize = getSnapshot().getSize(); + long snapshotSize = getSnapshot().get(0).getSize(); return snapshotSize > 0 ? snapshotSize : keySize(); } @@ -119,7 +142,7 @@ public class DefaultMemStore extends AbstractMemStore { public List getScanners(long readPt) throws IOException { List list = new ArrayList(2); list.add(getActive().getSegmentScanner(readPt, 1)); - list.add(getSnapshot().getSegmentScanner(readPt, 0)); + list.add(getSnapshot().get(0).getSegmentScanner(readPt, 0)); return Collections. singletonList( new MemStoreScanner((AbstractMemStore) this, list, readPt)); } @@ -128,7 +151,7 @@ public class DefaultMemStore extends AbstractMemStore { protected List getSegments() throws IOException { List list = new ArrayList(2); list.add(getActive()); - list.add(getSnapshot()); + list.add(getSnapshot().get(0)); return list; } @@ -140,7 +163,7 @@ public class DefaultMemStore extends AbstractMemStore { Cell getNextRow(final Cell cell) { return getLowest( getNextRow(cell, getActive().getCellSet()), - getNextRow(cell, getSnapshot().getCellSet())); + getNextRow(cell, getSnapshot().get(0).getCellSet())); } @Override public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java index 711b987..69e331f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.MemStoreScanner.Type; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.util.StringUtils; @@ -52,7 +53,10 @@ public class DefaultStoreFlusher extends StoreFlusher { // Use a store scanner to find which rows to flush. long smallestReadPoint = store.getSmallestReadPoint(); - InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint); + MemStoreScanner msScanner = + new MemStoreScanner(((AbstractMemStore) store.getMemStore()), + snapshot.getScanner(), Long.MAX_VALUE, Type.COMPACT_FORWARD); + InternalScanner scanner = createScanner(msScanner, smallestReadPoint); if (scanner == null) { return result; // NULL scanner returned from coprocessor hooks means skip normal processing } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index f536c26..65b9121 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -231,8 +231,8 @@ public class HStore implements Store { scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator); String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName()); if (family.isInMemoryCompaction()) { - className = CompactingMemStore.class.getName(); - this.memstore = new CompactingMemStore(conf, this.comparator, this, + className = PipelineMemstore.class.getName(); + this.memstore = new PipelineMemstore(conf, this.comparator, this, this.getHRegion().getRegionServicesForStores()); } else { this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index abe8531..fff9a21 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -142,7 +142,6 @@ class MemStoreCompactor { return; } finally { releaseResources(); - compactingMemStore.setInMemoryFlushInProgress(false); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java index 28ab693..c477fac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.util.LinkedList; +import java.util.List; + import org.apache.hadoop.hbase.classification.InterfaceAudience; /** @@ -28,19 +31,27 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; public class MemStoreSnapshot { private final long id; - private final int cellsCount; - private final long size; - private final TimeRangeTracker timeRangeTracker; - private final KeyValueScanner scanner; + private int cellsCount; + private long size; + private TimeRangeTracker timeRangeTracker; + private final List scanner; private final boolean tagsPresent; - - public MemStoreSnapshot(long id, ImmutableSegment snapshot) { + public MemStoreSnapshot(long id, List snapshot) { + this.scanner = new LinkedList(); this.id = id; - this.cellsCount = snapshot.getCellsCount(); - this.size = snapshot.getSize(); - this.timeRangeTracker = snapshot.getTimeRangeTracker(); - this.scanner = snapshot.getKeyValueScanner(); - this.tagsPresent = snapshot.isTagsPresent(); + boolean tags = false; + for (ImmutableSegment segment : snapshot) { + this.cellsCount += segment.getCellsCount(); + this.size += segment.getSize(); + this.scanner.add(segment.getKeyValueScanner()); + if (!tags) { + tags = segment.isTagsPresent(); + } + // All are default only here. lets see if one timerange tracker is enough TODO + // just ensure we set this. + this.timeRangeTracker = segment.getTimeRangeTracker(); + } + this.tagsPresent = tags; } /** @@ -74,7 +85,7 @@ public class MemStoreSnapshot { /** * @return {@link KeyValueScanner} for iterating over the snapshot */ - public KeyValueScanner getScanner() { + public List getScanner() { return this.scanner; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PipelineMemstore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PipelineMemstore.java new file mode 100644 index 0000000..00c71f2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PipelineMemstore.java @@ -0,0 +1,123 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import com.google.common.annotations.VisibleForTesting; + +public class PipelineMemstore extends CompactingMemStore { + + private static final Log LOG = LogFactory.getLog(PipelineMemstore.class); + public PipelineMemstore(Configuration conf, CellComparator c, HStore store, + RegionServicesForStores regionServices) throws IOException { + super(conf, c, store, regionServices); + } + /** + * On flush, how much memory we will clear. + * @return size of data that is going to be flushed + */ + @Override public long getFlushableSize() { + long snapshotSize = getSnapshotSize(); + if(snapshotSize == 0) { + for(Segment segment : pipeline.getSegments()) { + snapshotSize += CompactingMemStore.getSegmentSize(segment); + } + // add the current active also to the flushable size + snapshotSize += keySize(); + } + return snapshotSize > 0 ? snapshotSize : keySize(); + } + + /** + * Push the current active memstore segment into the pipeline + * and create a snapshot of the tail of current compaction pipeline + * Snapshot must be cleared by call to {@link #clearSnapshot}. + * {@link #clearSnapshot(long)}. + * @return {@link MemStoreSnapshot} + */ + @Override + public MemStoreSnapshot snapshot() { + MutableSegment active = getActive(); + // If snapshot currently has entries, then flusher failed or didn't call + // cleanup. Log a warning. + for (ImmutableSegment segment : getSnapshot()) { + if (!segment.isEmpty()) { + LOG.warn("Snapshot called again without clearing previous. " + + "Doing nothing. Another ongoing flush or did we fail last attempt?"); + return new MemStoreSnapshot(snapshotId, getSnapshot()); + } + } + LOG.info( + "FLUSHING TO DISK: region " + getRegionServices().getRegionInfo().getRegionNameAsString() + + "store: " + getFamilyName()); + stopCompaction(); + pushActiveToPipeline(active); + // get the existing segments in the pipeline + List segmentsInPipeLine = getImmutableSegmentsForSnapshot(); + snapshotId = EnvironmentEdgeManager.currentTime(); + setSnapshotInternal(segmentsInPipeLine); + removeExistingPipeline(segmentsInPipeLine); + // remove the elements from the pipeline + return new MemStoreSnapshot(snapshotId, getSnapshot()); + } + + private void setSnapshotInternal(List segmentList) { + setSnapshot(segmentList); + for (ImmutableSegment segment : segmentList) { + setSnapshotSize(segment, getSegmentSize(segment)); + } + } + + public List getImmutableSegmentsForSnapshot() { + return pipeline.getSegmentsAndIncrementVersion(); + } + + private void removeExistingPipeline(List segmentsInPipeLine) { + pipeline.removeCurrentPipeline(segmentsInPipeLine); + } + + // internally used method, externally visible only for tests + // when invoked directly from tests it must be verified that the caller doesn't hold updatesLock, + // otherwise there is a deadlock + @VisibleForTesting + void flushInMemory() throws IOException { + // Phase I: Update the pipeline + getRegionServices().blockUpdates(); + try { + MutableSegment active = getActive(); + if (LOG.isTraceEnabled()) { + LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline, " + + "and initiating compaction."); + } + pushActiveToPipeline(active); + } finally { + getRegionServices().unblockUpdates(); + // reset it + inMemoryFlushInProgress.set(false); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java index 34e8497..266a583 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; +import org.apache.hadoop.hbase.regionserver.MemStoreScanner.Type; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; @@ -63,9 +64,12 @@ public class StripeStoreFlusher extends StoreFlusher { int cellsCount = snapshot.getCellsCount(); if (cellsCount == 0) return result; // don't flush if there are no entries + // Use a store scanner to find which rows to flush. long smallestReadPoint = store.getSmallestReadPoint(); - InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint); - if (scanner == null) { + MemStoreScanner msScanner = + new MemStoreScanner(((AbstractMemStore) store.getMemStore()), + snapshot.getScanner(), Long.MAX_VALUE, Type.COMPACT_FORWARD); + InternalScanner scanner = createScanner(msScanner, smallestReadPoint); if (scanner == null) { return result; // NULL scanner returned from coprocessor hooks means skip normal processing } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 4b76ffe..0791128 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -146,7 +146,11 @@ public class TestCompactingMemStore extends TestDefaultMemStore { for (int i = 0; i < snapshotCount; i++) { addRows(this.memstore); runSnapshot(this.memstore, true); - assertEquals("History not being cleared", 0, this.memstore.getSnapshot().getCellsCount()); + int cellCount = 0; + for (ImmutableSegment segment : memstore.getSnapshot()) { + cellCount += segment.getCellsCount(); + } + assertEquals("History not being cleared", 0, cellCount); } } @@ -221,11 +225,19 @@ public class TestCompactingMemStore extends TestDefaultMemStore { memstore.add(new KeyValue(row, fam, qf2, val)); memstore.add(new KeyValue(row, fam, qf3, val)); //Pushing to pipeline - ((CompactingMemStore)memstore).flushInMemory(); - assertEquals(0, memstore.getSnapshot().getCellsCount()); + ((CompactingMemStore) memstore).flushInMemory(); + int cellCount = 0; + for (ImmutableSegment segment : memstore.getSnapshot()) { + cellCount += segment.getCellsCount(); + } + assertEquals(0, cellCount); //Creating a snapshot memstore.snapshot(); - assertEquals(3, memstore.getSnapshot().getCellsCount()); + cellCount = 0; + for (ImmutableSegment segment : memstore.getSnapshot()) { + cellCount += segment.getCellsCount(); + } + assertEquals(3, cellCount); //Adding value to "new" memstore assertEquals(0, memstore.getActive().getCellsCount()); memstore.add(new KeyValue(row, fam, qf4, val)); @@ -365,7 +377,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { private long runSnapshot(final AbstractMemStore hmc, boolean useForce) throws IOException { // Save off old state. - long oldHistorySize = hmc.getSnapshot().getSize(); + long oldHistorySize = hmc.getSnapshotSize(); long prevTimeStamp = hmc.timeOfOldestEdit(); hmc.snapshot(); @@ -415,7 +427,11 @@ public class TestCompactingMemStore extends TestDefaultMemStore { // Creating a snapshot MemStoreSnapshot snapshot = memstore.snapshot(); - assertEquals(3, memstore.getSnapshot().getCellsCount()); + int cellCount = 0; + for (ImmutableSegment segment : memstore.getSnapshot()) { + cellCount += segment.getCellsCount(); + } + assertEquals(3, cellCount); // Adding value to "new" memstore assertEquals(0, memstore.getActive().getCellsCount()); @@ -450,7 +466,11 @@ public class TestCompactingMemStore extends TestDefaultMemStore { // Creating a snapshot MemStoreSnapshot snapshot = memstore.snapshot(); - assertEquals(3, memstore.getSnapshot().getCellsCount()); + int cellCount = 0; + for (ImmutableSegment segment : memstore.getSnapshot()) { + cellCount += segment.getCellsCount(); + } + assertEquals(3, cellCount); // Adding value to "new" memstore assertEquals(0, memstore.getActive().getCellsCount()); @@ -588,14 +608,21 @@ public class TestCompactingMemStore extends TestDefaultMemStore { while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) { Threads.sleep(10); } - assertEquals(0, memstore.getSnapshot().getCellsCount()); + int cellCount = 0; + for (ImmutableSegment segment : memstore.getSnapshot()) { + cellCount += segment.getCellsCount(); + } + assertEquals(0, cellCount); assertEquals(528, regionServicesForStores.getGlobalMemstoreTotalSize()); size = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot region.addAndGetGlobalMemstoreSize(-size); // simulate flusher - ImmutableSegment s = memstore.getSnapshot(); - assertEquals(3, s.getCellsCount()); + cellCount = 0; + for (ImmutableSegment segment : memstore.getSnapshot()) { + cellCount += segment.getCellsCount(); + } + assertEquals(3, cellCount); assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize()); memstore.clearSnapshot(snapshot.getId()); @@ -615,7 +642,11 @@ public class TestCompactingMemStore extends TestDefaultMemStore { while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) { Threads.sleep(1000); } - assertEquals(0, memstore.getSnapshot().getCellsCount()); + int cellCount = 0; + for (ImmutableSegment segment : memstore.getSnapshot()) { + cellCount += segment.getCellsCount(); + } + assertEquals(0, cellCount); assertEquals(528, regionServicesForStores.getGlobalMemstoreTotalSize()); addRowsByKeys(memstore, keys2); @@ -626,14 +657,21 @@ public class TestCompactingMemStore extends TestDefaultMemStore { while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) { Threads.sleep(10); } - assertEquals(0, memstore.getSnapshot().getCellsCount()); + cellCount = 0; + for (ImmutableSegment segment : memstore.getSnapshot()) { + cellCount += segment.getCellsCount(); + } + assertEquals(0, cellCount); assertEquals(704, regionServicesForStores.getGlobalMemstoreTotalSize()); size = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot region.addAndGetGlobalMemstoreSize(-size); // simulate flusher - ImmutableSegment s = memstore.getSnapshot(); - assertEquals(4, s.getCellsCount()); + cellCount = 0; + for (ImmutableSegment segment : memstore.getSnapshot()) { + cellCount += segment.getCellsCount(); + } + assertEquals(4, cellCount); assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize()); memstore.clearSnapshot(snapshot.getId()); @@ -657,7 +695,11 @@ public class TestCompactingMemStore extends TestDefaultMemStore { while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) { Threads.sleep(10); } - assertEquals(0, memstore.getSnapshot().getCellsCount()); + int cellCount = 0; + for (ImmutableSegment segment : memstore.getSnapshot()) { + cellCount += segment.getCellsCount(); + } + assertEquals(0, cellCount); assertEquals(528, regionServicesForStores.getGlobalMemstoreTotalSize()); addRowsByKeys(memstore, keys2); @@ -671,7 +713,11 @@ public class TestCompactingMemStore extends TestDefaultMemStore { ((CompactingMemStore)memstore).disableCompaction(); size = memstore.getFlushableSize(); ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction - assertEquals(0, memstore.getSnapshot().getCellsCount()); + cellCount = 0; + for (ImmutableSegment segment : memstore.getSnapshot()) { + cellCount += segment.getCellsCount(); + } + assertEquals(0, cellCount); assertEquals(1056, regionServicesForStores.getGlobalMemstoreTotalSize()); addRowsByKeys(memstore, keys3); @@ -683,14 +729,21 @@ public class TestCompactingMemStore extends TestDefaultMemStore { while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) { Threads.sleep(10); } - assertEquals(0, memstore.getSnapshot().getCellsCount()); + cellCount = 0; + for (ImmutableSegment segment : memstore.getSnapshot()) { + cellCount += segment.getCellsCount(); + } + assertEquals(0, cellCount); assertEquals(704, regionServicesForStores.getGlobalMemstoreTotalSize()); size = memstore.getFlushableSize(); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot region.addAndGetGlobalMemstoreSize(-size); // simulate flusher - ImmutableSegment s = memstore.getSnapshot(); - assertEquals(4, s.getCellsCount()); + cellCount = 0; + for (ImmutableSegment segment : memstore.getSnapshot()) { + cellCount += segment.getCellsCount(); + } + assertEquals(4, cellCount); assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize()); memstore.clearSnapshot(snapshot.getId()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 0c4029d..96cccad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -497,7 +497,7 @@ public class TestDefaultMemStore { for (int i = 0; i < snapshotCount; i++) { addRows(this.memstore); runSnapshot(this.memstore); - assertEquals("History not being cleared", 0, this.memstore.getSnapshot().getCellsCount()); + assertEquals("History not being cleared", 0, this.memstore.getSnapshot().get(0).getCellsCount()); } } @@ -595,7 +595,7 @@ public class TestDefaultMemStore { memstore.add(new KeyValue(row, fam, qf3, val)); //Creating a snapshot memstore.snapshot(); - assertEquals(3, memstore.getSnapshot().getCellsCount()); + assertEquals(3, memstore.getSnapshot().get(0).getCellsCount()); //Adding value to "new" memstore assertEquals(0, memstore.getActive().getCellsCount()); memstore.add(new KeyValue(row, fam ,qf4, val)); @@ -1021,10 +1021,11 @@ public class TestDefaultMemStore { private long runSnapshot(final AbstractMemStore hmc) throws UnexpectedStateException { // Save off old state. - int oldHistorySize = hmc.getSnapshot().getCellsCount(); + int oldHistorySize = hmc.getSnapshot().get(0).getCellsCount(); MemStoreSnapshot snapshot = hmc.snapshot(); // Make some assertions about what just happened. - assertTrue("History size has not increased", oldHistorySize < hmc.getSnapshot().getCellsCount + assertTrue("History size has not increased", + oldHistorySize < hmc.getSnapshot().get(0).getCellsCount ()); long t = memstore.timeOfOldestEdit(); assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index 6059fe0..5780642 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -124,7 +124,7 @@ public class TestMemStoreChunkPool { // Creating a snapshot MemStoreSnapshot snapshot = memstore.snapshot(); - assertEquals(3, memstore.getSnapshot().getCellsCount()); + assertEquals(3, memstore.getSnapshot().get(0).getCellsCount()); // Adding value to "new" memstore assertEquals(0, memstore.getActive().getCellsCount()); @@ -161,7 +161,7 @@ public class TestMemStoreChunkPool { // Creating a snapshot MemStoreSnapshot snapshot = memstore.snapshot(); - assertEquals(3, memstore.getSnapshot().getCellsCount()); + assertEquals(3, memstore.getSnapshot().get(0).getCellsCount()); // Adding value to "new" memstore assertEquals(0, memstore.getActive().getCellsCount());