set,
+ final GetClosestRowBeforeTracker state) {
+ KeyValue firstOnRow = state.getTargetKey();
+ for (Member p = memberOfPreviousRow(set, state, firstOnRow);
+ p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) {
+ // Make sure we don't fall out of our table.
+ if (!state.isTargetTable(p.kv)) break;
+ // Stop looking if we've exited the better candidate range.
+ if (!state.isBetterCandidate(p.kv)) break;
+ // Make into firstOnRow
+ firstOnRow = new KeyValue(p.kv.getRowArray(), p.kv.getRowOffset(), p.kv.getRowLength(),
+ HConstants.LATEST_TIMESTAMP);
+ // If we find something, break;
+ if (walkForwardInSingleRow(p.set, firstOnRow, state)) break;
+ }
+ }
+
+ /**
+ * Update or insert the specified KeyValues.
+ *
+ * For each KeyValue, insert into MemStore. This will atomically upsert the
+ * value for that row/family/qualifier. If a KeyValue did already exist,
+ * it will then be removed.
+ *
+ * Currently the memstoreTS is kept at 0 so as each insert happens, it will
+ * be immediately visible. May want to change this so it is atomic across
+ * all KeyValues.
+ *
+ * This is called under row lock, so Get operations will still see updates
+ * atomically. Scans will only see each KeyValue update as atomic.
+ *
+ * @param cells
+ * @param readpoint readpoint below which we can safely remove duplicate KVs
+ * @return change in memstore size
+ */
+ @Override
+ public long upsert(Iterable| cells, long readpoint) {
+ long size = 0;
+ for (Cell cell : cells) {
+ size += upsert(cell, readpoint);
+ }
+ return size;
+ }
+
+ /**
+ * Inserts the specified KeyValue into MemStore and deletes any existing
+ * versions of the same row/family/qualifier as the specified KeyValue.
+ *
+ * First, the specified KeyValue is inserted into the Memstore.
+ *
+ * If there are any existing KeyValues in this MemStore with the same row,
+ * family, and qualifier, they are removed.
+ *
+ * Callers must hold the read lock.
+ *
+ * @param cell
+ * @return change in size of MemStore
+ */
+ private long upsert(Cell cell, long readpoint) {
+ // Add the KeyValue to the MemStore
+ // Use the internalAdd method here since we (a) already have a lock
+ // and (b) cannot safely use the MSLAB here without potentially
+ // hitting OOME - see TestMemStore.testUpsertMSLAB for a
+ // test that triggers the pathological case if we don't avoid MSLAB
+ // here.
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ long addedSize = internalAdd(kv);
+
+ // Get the KeyValues for the row/family/qualifier regardless of timestamp.
+ // For this case we want to clean up any other puts
+ KeyValue firstKv = KeyValue.createFirstOnRow(
+ kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
+ kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(),
+ kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength());
+ SortedSet ss = kvset.tailSet(firstKv);
+ Iterator it = ss.iterator();
+ // versions visible to oldest scanner
+ int versionsVisible = 0;
+ while ( it.hasNext() ) {
+ KeyValue cur = it.next();
+
+ if (kv == cur) {
+ // ignore the one just put in
+ continue;
+ }
+ // check that this is the row and column we are interested in, otherwise bail
+ if (kv.matchingRow(cur) && kv.matchingQualifier(cur)) {
+ // only remove Puts that concurrent scanners cannot possibly see
+ if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
+ cur.getMvccVersion() <= readpoint) {
+ if (versionsVisible > 1) {
+ // if we get here we have seen at least one version visible to the oldest scanner,
+ // which means we can prove that no scanner will see this version
+
+ // false means there was a change, so give us the size.
+ long delta = heapSizeChange(cur, true);
+ addedSize -= delta;
+ this.size.addAndGet(-delta);
+ it.remove();
+ setOldestEditTimeToNow();
+ } else {
+ versionsVisible++;
+ }
+ }
+ } else {
+ // past the row or column, done
+ break;
+ }
+ }
+ return addedSize;
+ }
+
+ /*
+ * Immutable data structure to hold member found in set and the set it was
+ * found in. Include set because it is carrying context.
+ */
+ private static class Member {
+ final KeyValue kv;
+ final NavigableSet set;
+ Member(final NavigableSet s, final KeyValue kv) {
+ this.kv = kv;
+ this.set = s;
+ }
+ }
+
+ /*
+ * @param set Set to walk back in. Pass a first in row or we'll return
+ * same row (loop).
+ * @param state Utility and context.
+ * @param firstOnRow First item on the row after the one we want to find a
+ * member in.
+ * @return Null or member of row previous to firstOnRow
+ */
+ private Member memberOfPreviousRow(NavigableSet set,
+ final GetClosestRowBeforeTracker state, final KeyValue firstOnRow) {
+ NavigableSet head = set.headSet(firstOnRow, false);
+ if (head.isEmpty()) return null;
+ for (Iterator i = head.descendingIterator(); i.hasNext();) {
+ KeyValue found = i.next();
+ if (state.isExpired(found)) {
+ i.remove();
+ continue;
+ }
+ return new Member(head, found);
+ }
+ return null;
+ }
+
+ /**
+ * @return scanner on memstore and snapshot in this order.
+ */
+ @Override
+ public List getScanners(long readPt) {
+ return Collections. singletonList(new MemStoreScanner(readPt));
+ }
+
+ /**
+ * Check if this memstore may contain the required keys
+ * @param scan
+ * @return False if the key definitely does not exist in this Memstore
+ */
+ public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
+ return (timeRangeTracker.includesTimeRange(scan.getTimeRange()) ||
+ snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange()))
+ && (Math.max(timeRangeTracker.getMaximumTimestamp(),
+ snapshotTimeRangeTracker.getMaximumTimestamp()) >=
+ oldestUnexpiredTS);
+ }
+
+ public TimeRangeTracker getSnapshotTimeRangeTracker() {
+ return this.snapshotTimeRangeTracker;
+ }
+
+ /*
+ * MemStoreScanner implements the KeyValueScanner.
+ * It lets the caller scan the contents of a memstore -- both current
+ * map and snapshot.
+ * This behaves as if it were a real scanner but does not maintain position.
+ */
+ protected class MemStoreScanner extends NonLazyKeyValueScanner {
+ // Next row information for either kvset or snapshot
+ private KeyValue kvsetNextRow = null;
+ private KeyValue snapshotNextRow = null;
+
+ // last iterated KVs for kvset and snapshot (to restore iterator state after reseek)
+ private KeyValue kvsetItRow = null;
+ private KeyValue snapshotItRow = null;
+
+ // iterator based scanning.
+ private Iterator kvsetIt;
+ private Iterator snapshotIt;
+
+ // The kvset and snapshot at the time of creating this scanner
+ private KeyValueSkipListSet kvsetAtCreation;
+ private KeyValueSkipListSet snapshotAtCreation;
+
+ // the pre-calculated KeyValue to be returned by peek() or next()
+ private KeyValue theNext;
+
+ // The allocator and snapshot allocator at the time of creating this scanner
+ volatile MemStoreLAB allocatorAtCreation;
+ volatile MemStoreLAB snapshotAllocatorAtCreation;
+
+ // A flag represents whether could stop skipping KeyValues for MVCC
+ // if have encountered the next row. Only used for reversed scan
+ private boolean stopSkippingKVsIfNextRow = false;
+
+ private long readPoint;
+
+ /*
+ Some notes...
+
+ So memstorescanner is fixed at creation time. this includes pointers/iterators into
+ existing kvset/snapshot. during a snapshot creation, the kvset is null, and the
+ snapshot is moved. since kvset is null there is no point on reseeking on both,
+ we can save us the trouble. During the snapshot->hfile transition, the memstore
+ scanner is re-created by StoreScanner#updateReaders(). StoreScanner should
+ potentially do something smarter by adjusting the existing memstore scanner.
+
+ But there is a greater problem here, that being once a scanner has progressed
+ during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
+ if a scan lasts a little while, there is a chance for new entries in kvset to
+ become available but we will never see them. This needs to be handled at the
+ StoreScanner level with coordination with MemStoreScanner.
+
+ Currently, this problem is only partly managed: during the small amount of time
+ when the StoreScanner has not yet created a new MemStoreScanner, we will miss
+ the adds to kvset in the MemStoreScanner.
+ */
+
+ MemStoreScanner(long readPoint) {
+ super();
+
+ this.readPoint = readPoint;
+ kvsetAtCreation = kvset;
+ snapshotAtCreation = snapshot;
+ if (allocator != null) {
+ this.allocatorAtCreation = allocator;
+ this.allocatorAtCreation.incScannerCount();
+ }
+ if (snapshotAllocator != null) {
+ this.snapshotAllocatorAtCreation = snapshotAllocator;
+ this.snapshotAllocatorAtCreation.incScannerCount();
+ }
+ }
+
+ private KeyValue getNext(Iterator it) {
+ KeyValue startKV = theNext;
+ KeyValue v = null;
+ try {
+ while (it.hasNext()) {
+ v = it.next();
+ if (v.getMvccVersion() <= this.readPoint) {
+ return v;
+ }
+ if (stopSkippingKVsIfNextRow && startKV != null
+ && comparator.compareRows(v, startKV) > 0) {
+ return null;
+ }
+ }
+
+ return null;
+ } finally {
+ if (v != null) {
+ // in all cases, remember the last KV iterated to
+ if (it == snapshotIt) {
+ snapshotItRow = v;
+ } else {
+ kvsetItRow = v;
+ }
+ }
+ }
+ }
+
+ /**
+ * Set the scanner at the seek key.
+ * Must be called only once: there is no thread safety between the scanner
+ * and the memStore.
+ * @param key seek value
+ * @return false if the key is null or if there is no data
+ */
+ @Override
+ public synchronized boolean seek(KeyValue key) {
+ if (key == null) {
+ close();
+ return false;
+ }
+
+ // kvset and snapshot will never be null.
+ // if tailSet can't find anything, SortedSet is empty (not null).
+ kvsetIt = kvsetAtCreation.tailSet(key).iterator();
+ snapshotIt = snapshotAtCreation.tailSet(key).iterator();
+ kvsetItRow = null;
+ snapshotItRow = null;
+
+ return seekInSubLists(key);
+ }
+
+
+ /**
+ * (Re)initialize the iterators after a seek or a reseek.
+ */
+ private synchronized boolean seekInSubLists(KeyValue key){
+ kvsetNextRow = getNext(kvsetIt);
+ snapshotNextRow = getNext(snapshotIt);
+
+ // Calculate the next value
+ theNext = getLowest(kvsetNextRow, snapshotNextRow);
+
+ // has data
+ return (theNext != null);
+ }
+
+
+ /**
+ * Move forward on the sub-lists set previously by seek.
+ * @param key seek value (should be non-null)
+ * @return true if there is at least one KV to read, false otherwise
+ */
+ @Override
+ public synchronized boolean reseek(KeyValue key) {
+ /*
+ See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
+ This code is executed concurrently with flush and puts, without locks.
+ Two points must be known when working on this code:
+ 1) It's not possible to use the 'kvTail' and 'snapshot'
+ variables, as they are modified during a flush.
+ 2) The ideal implementation for performance would use the sub skip list
+ implicitly pointed by the iterators 'kvsetIt' and
+ 'snapshotIt'. Unfortunately the Java API does not offer a method to
+ get it. So we remember the last keys we iterated to and restore
+ the reseeked set to at least that point.
+ */
+
+ kvsetIt = kvsetAtCreation.tailSet(getHighest(key, kvsetItRow)).iterator();
+ snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
+
+ return seekInSubLists(key);
+ }
+
+
+ @Override
+ public synchronized KeyValue peek() {
+ //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
+ return theNext;
+ }
+
+ @Override
+ public synchronized KeyValue next() {
+ if (theNext == null) {
+ return null;
+ }
+
+ final KeyValue ret = theNext;
+
+ // Advance one of the iterators
+ if (theNext == kvsetNextRow) {
+ kvsetNextRow = getNext(kvsetIt);
+ } else {
+ snapshotNextRow = getNext(snapshotIt);
+ }
+
+ // Calculate the next value
+ theNext = getLowest(kvsetNextRow, snapshotNextRow);
+
+ //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
+ //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
+ // getLowest() + " threadpoint=" + readpoint);
+ return ret;
+ }
+
+ /*
+ * Returns the lower of the two key values, or null if they are both null.
+ * This uses comparator.compare() to compare the KeyValue using the memstore
+ * comparator.
+ */
+ private KeyValue getLowest(KeyValue first, KeyValue second) {
+ if (first == null && second == null) {
+ return null;
+ }
+ if (first != null && second != null) {
+ int compare = comparator.compare(first, second);
+ return (compare <= 0 ? first : second);
+ }
+ return (first != null ? first : second);
+ }
+
+ /*
+ * Returns the higher of the two key values, or null if they are both null.
+ * This uses comparator.compare() to compare the KeyValue using the memstore
+ * comparator.
+ */
+ private KeyValue getHighest(KeyValue first, KeyValue second) {
+ if (first == null && second == null) {
+ return null;
+ }
+ if (first != null && second != null) {
+ int compare = comparator.compare(first, second);
+ return (compare > 0 ? first : second);
+ }
+ return (first != null ? first : second);
+ }
+
+ public synchronized void close() {
+ this.kvsetNextRow = null;
+ this.snapshotNextRow = null;
+
+ this.kvsetIt = null;
+ this.snapshotIt = null;
+
+ if (allocatorAtCreation != null) {
+ this.allocatorAtCreation.decScannerCount();
+ this.allocatorAtCreation = null;
+ }
+ if (snapshotAllocatorAtCreation != null) {
+ this.snapshotAllocatorAtCreation.decScannerCount();
+ this.snapshotAllocatorAtCreation = null;
+ }
+
+ this.kvsetItRow = null;
+ this.snapshotItRow = null;
+ }
+
+ /**
+ * MemStoreScanner returns max value as sequence id because it will
+ * always have the latest data among all files.
+ */
+ @Override
+ public long getSequenceID() {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public boolean shouldUseScanner(Scan scan, SortedSet columns,
+ long oldestUnexpiredTS) {
+ return shouldSeek(scan, oldestUnexpiredTS);
+ }
+
+ /**
+ * Seek scanner to the given key first. If it returns false(means
+ * peek()==null) or scanner's peek row is bigger than row of given key, seek
+ * the scanner to the previous row of given key
+ */
+ @Override
+ public synchronized boolean backwardSeek(KeyValue key) {
+ seek(key);
+ if (peek() == null || comparator.compareRows(peek(), key) > 0) {
+ return seekToPreviousRow(key);
+ }
+ return true;
+ }
+
+ /**
+ * Separately get the KeyValue before the specified key from kvset and
+ * snapshotset, and use the row of higher one as the previous row of
+ * specified key, then seek to the first KeyValue of previous row
+ */
+ @Override
+ public synchronized boolean seekToPreviousRow(KeyValue key) {
+ KeyValue firstKeyOnRow = KeyValue.createFirstOnRow(key.getRow());
+ SortedSet kvHead = kvsetAtCreation.headSet(firstKeyOnRow);
+ KeyValue kvsetBeforeRow = kvHead.isEmpty() ? null : kvHead.last();
+ SortedSet snapshotHead = snapshotAtCreation
+ .headSet(firstKeyOnRow);
+ KeyValue snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead
+ .last();
+ KeyValue lastKVBeforeRow = getHighest(kvsetBeforeRow, snapshotBeforeRow);
+ if (lastKVBeforeRow == null) {
+ theNext = null;
+ return false;
+ }
+ KeyValue firstKeyOnPreviousRow = KeyValue
+ .createFirstOnRow(lastKVBeforeRow.getRow());
+ this.stopSkippingKVsIfNextRow = true;
+ seek(firstKeyOnPreviousRow);
+ this.stopSkippingKVsIfNextRow = false;
+ if (peek() == null
+ || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) {
+ return seekToPreviousRow(lastKVBeforeRow);
+ }
+ return true;
+ }
+
+ @Override
+ public synchronized boolean seekToLastRow() {
+ KeyValue first = kvsetAtCreation.isEmpty() ? null : kvsetAtCreation
+ .last();
+ KeyValue second = snapshotAtCreation.isEmpty() ? null
+ : snapshotAtCreation.last();
+ KeyValue higherKv = getHighest(first, second);
+ if (higherKv == null) {
+ return false;
+ }
+ KeyValue firstKvOnLastRow = KeyValue.createFirstOnRow(higherKv.getRow());
+ if (seek(firstKvOnLastRow)) {
+ return true;
+ } else {
+ return seekToPreviousRow(higherKv);
+ }
+
+ }
+ }
+
+ public final static long FIXED_OVERHEAD = ClassSize.align(
+ ClassSize.OBJECT + (11 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
+
+ public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
+ ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
+ (2 * ClassSize.KEYVALUE_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
+
+ /*
+ * Calculate how the MemStore size has changed. Includes overhead of the
+ * backing Map.
+ * @param kv
+ * @param notpresent True if the kv was NOT present in the set.
+ * @return Size
+ */
+ private static long heapSizeChange(final KeyValue kv, final boolean notpresent) {
+ return notpresent ?
+ ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()):
+ 0;
+ }
+
+ /**
+ * Get the entire heap usage for this MemStore not including keys in the
+ * snapshot.
+ */
+ @Override
+ public long heapSize() {
+ long heapSize = size.get();
+ if (this.snapshotInfo != null) {
+ heapSize += this.snapshotInfo.heapSize();
+ }
+ return heapSize;
+ }
+
+ @Override
+ public long size() {
+ return heapSize();
+ }
+
+ /**
+ * Code to help figure if our approximation of object heap sizes is close
+ * enough. See hbase-900. Fills memstores then waits so user can heap
+ * dump and bring up resultant hprof in something like jprofiler which
+ * allows you get 'deep size' on objects.
+ * @param args main args
+ */
+ public static void main(String [] args) {
+ RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
+ LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
+ runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
+ LOG.info("vmInputArguments=" + runtime.getInputArguments());
+ DefaultMemStore memstore1 = new DefaultMemStore();
+ // TODO: x32 vs x64
+ long size = 0;
+ final int count = 10000;
+ byte [] fam = Bytes.toBytes("col");
+ byte [] qf = Bytes.toBytes("umn");
+ byte [] empty = new byte[0];
+ for (int i = 0; i < count; i++) {
+ // Give each its own ts
+ size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
+ }
+ LOG.info("memstore1 estimated size=" + size);
+ for (int i = 0; i < count; i++) {
+ size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
+ }
+ LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
+ // Make a variably sized memstore.
+ DefaultMemStore memstore2 = new DefaultMemStore();
+ for (int i = 0; i < count; i++) {
+ size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i,
+ new byte[i]));
+ }
+ LOG.info("memstore2 estimated size=" + size);
+ final int seconds = 30;
+ LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
+ for (int i = 0; i < seconds; i++) {
+ // Thread.sleep(1000);
+ }
+ LOG.info("Exiting.");
+ }
+
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
index a5837c2..7d30b18 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
@@ -21,16 +21,13 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.util.CollectionBackedScanner;
+import org.apache.hadoop.hbase.regionserver.MemStore.SnapshotInfo;
import org.apache.hadoop.util.StringUtils;
/**
@@ -45,21 +42,21 @@ public class DefaultStoreFlusher extends StoreFlusher {
}
@Override
- public List flushSnapshot(SortedSet snapshot, long cacheFlushId,
- TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize,
- MonitoredTask status) throws IOException {
+ public List flushSnapshot(KeyValueScanner snapshotScanner, SnapshotInfo snapshotInfo,
+ long cacheFlushId, TimeRangeTracker snapshotTimeRangeTracker, MonitoredTask status)
+ throws IOException {
ArrayList result = new ArrayList();
- if (snapshot.size() == 0) return result; // don't flush if there are no entries
+ int cellsCount = snapshotInfo.getCellsCount();
+ if (cellsCount == 0) return result; // don't flush if there are no entries
// Use a store scanner to find which rows to flush.
long smallestReadPoint = store.getSmallestReadPoint();
- InternalScanner scanner = createScanner(snapshot, smallestReadPoint);
+ InternalScanner scanner = createScanner(snapshotScanner, smallestReadPoint);
if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}
StoreFile.Writer writer;
- long flushed = 0;
try {
// TODO: We can fail in the below block before we complete adding this flush to
// list of store files. Add cleanup of anything put on filesystem if we fail.
@@ -67,20 +64,19 @@ public class DefaultStoreFlusher extends StoreFlusher {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
writer = store.createWriterInTmp(
- snapshot.size(), store.getFamily().getCompression(), false, true, true);
+ cellsCount, store.getFamily().getCompression(), false, true, true);
writer.setTimeRangeTracker(snapshotTimeRangeTracker);
try {
- flushed = performFlush(scanner, writer, smallestReadPoint);
+ performFlush(scanner, writer, smallestReadPoint);
} finally {
finalizeWriter(writer, cacheFlushId, status);
}
}
} finally {
- flushedSize.set(flushed);
scanner.close();
}
LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize="
- + StringUtils.humanReadableInt(flushed) +
+ + StringUtils.humanReadableInt(snapshotInfo.getSize()) +
", hasBloomFilter=" + writer.hasGeneralBloom() +
", into tmp file " + writer.getPath());
result.add(writer.getPath());
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index bd9d791..85a7c82 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
+import org.apache.hadoop.hbase.regionserver.MemStore.SnapshotInfo;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -83,6 +84,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -116,6 +118,7 @@ import com.google.common.collect.Lists;
*/
@InterfaceAudience.Private
public class HStore implements Store {
+ private static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
"hbase.server.compactchecker.interval.multiplier";
public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
@@ -224,7 +227,9 @@ public class HStore implements Store {
// Why not just pass a HColumnDescriptor in here altogether? Even if have
// to clone it?
scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
- this.memstore = new MemStore(conf, this.comparator);
+ String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName());
+ this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
+ Configuration.class, KeyValue.KVComparator.class }, new Object[] { conf, this.comparator });
this.offPeakHours = OffPeakHours.getInstance(conf);
// Setting up cache configuration for this family
@@ -762,18 +767,16 @@ public class HStore implements Store {
* Write out current snapshot. Presumes {@link #snapshot()} has been called
* previously.
* @param logCacheFlushId flush sequence number
- * @param snapshot
+ * @param snapshotScanner
+ * @param snapshotInfo
* @param snapshotTimeRangeTracker
- * @param flushedSize The number of bytes flushed
* @param status
* @return The path name of the tmp file to which the store was flushed
* @throws IOException
*/
- protected List flushCache(final long logCacheFlushId,
- SortedSet snapshot,
- TimeRangeTracker snapshotTimeRangeTracker,
- AtomicLong flushedSize,
- MonitoredTask status) throws IOException {
+ protected List flushCache(final long logCacheFlushId, KeyValueScanner snapshotScanner,
+ SnapshotInfo snapshotInfo, TimeRangeTracker snapshotTimeRangeTracker, MonitoredTask status)
+ throws IOException {
// If an exception happens flushing, we let it out without clearing
// the memstore snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around.
@@ -783,8 +786,8 @@ public class HStore implements Store {
IOException lastException = null;
for (int i = 0; i < flushRetriesNumber; i++) {
try {
- List pathNames = flusher.flushSnapshot(
- snapshot, logCacheFlushId, snapshotTimeRangeTracker, flushedSize, status);
+ List pathNames = flusher.flushSnapshot(snapshotScanner, snapshotInfo,
+ logCacheFlushId, snapshotTimeRangeTracker, status);
Path lastPathName = null;
try {
for (Path pathName : pathNames) {
@@ -826,7 +829,6 @@ public class HStore implements Store {
private StoreFile commitFile(final Path path,
final long logCacheFlushId,
TimeRangeTracker snapshotTimeRangeTracker,
- AtomicLong flushedSize,
MonitoredTask status)
throws IOException {
// Write-out finished successfully, move into the right spot
@@ -915,11 +917,11 @@ public class HStore implements Store {
* @return Whether compaction is required.
*/
private boolean updateStorefiles(
- final List sfs, final SortedSet set) throws IOException {
+ final List sfs, final long snapshotId) throws IOException {
this.lock.writeLock().lock();
try {
this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
- this.memstore.clearSnapshot(set);
+ this.memstore.clearSnapshot(snapshotId);
} finally {
// We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
@@ -1821,7 +1823,7 @@ public class HStore implements Store {
@Override
public long getMemStoreSize() {
- return this.memstore.heapSize();
+ return this.memstore.size();
}
@Override
@@ -1862,38 +1864,6 @@ public class HStore implements Store {
return this.region.getSmallestReadPoint();
}
- /**
- * Used in tests. TODO: Remove
- *
- * Updates the value for the given row/family/qualifier. This function will always be seen as
- * atomic by other readers because it only puts a single KV to memstore. Thus no read/write
- * control necessary.
- * @param row row to update
- * @param f family to update
- * @param qualifier qualifier to update
- * @param newValue the new value to set into memstore
- * @return memstore size delta
- * @throws IOException
- */
- public long updateColumnValue(byte [] row, byte [] f,
- byte [] qualifier, long newValue)
- throws IOException {
-
- this.lock.readLock().lock();
- try {
- long now = EnvironmentEdgeManager.currentTimeMillis();
-
- return this.memstore.updateColumnValue(row,
- f,
- qualifier,
- newValue,
- now);
-
- } finally {
- this.lock.readLock().unlock();
- }
- }
-
@Override
public long upsert(Iterable cells, long readpoint) throws IOException {
this.lock.readLock().lock();
@@ -1912,10 +1882,10 @@ public class HStore implements Store {
private class StoreFlusherImpl implements StoreFlushContext {
private long cacheFlushSeqNum;
- private SortedSet snapshot;
private List tempFiles;
private TimeRangeTracker snapshotTimeRangeTracker;
- private final AtomicLong flushedSize = new AtomicLong();
+ private KeyValueScanner snapshotScanner;
+ private SnapshotInfo snapshotInfo;
private StoreFlusherImpl(long cacheFlushSeqNum) {
this.cacheFlushSeqNum = cacheFlushSeqNum;
@@ -1927,15 +1897,15 @@ public class HStore implements Store {
*/
@Override
public void prepare() {
- memstore.snapshot();
- this.snapshot = memstore.getSnapshot();
+ this.snapshotInfo = memstore.snapshot();
+ this.snapshotScanner = memstore.getSnapshotScanner();
this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker();
}
@Override
public void flushCache(MonitoredTask status) throws IOException {
- tempFiles = HStore.this.flushCache(
- cacheFlushSeqNum, snapshot, snapshotTimeRangeTracker, flushedSize, status);
+ tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshotScanner,
+ snapshotInfo, snapshotTimeRangeTracker, status);
}
@Override
@@ -1947,7 +1917,7 @@ public class HStore implements Store {
for (Path storeFilePath : tempFiles) {
try {
storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum,
- snapshotTimeRangeTracker, flushedSize, status));
+ snapshotTimeRangeTracker, status));
} catch (IOException ex) {
LOG.error("Failed to commit store file " + storeFilePath, ex);
// Try to delete the files we have committed before.
@@ -1970,7 +1940,7 @@ public class HStore implements Store {
}
}
// Add new file to store files. Clear snapshot too while we have the Store write lock.
- return HStore.this.updateStorefiles(storeFiles, snapshot);
+ return HStore.this.updateStorefiles(storeFiles, snapshotInfo.getId());
}
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index 8da61fd..203ea3b 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,1048 +15,140 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.regionserver;
-import java.lang.management.ManagementFactory;
-import java.lang.management.RuntimeMXBean;
import java.rmi.UnexpectedException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
-import java.util.NavigableSet;
-import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
- * The MemStore holds in-memory modifications to the Store. Modifications
- * are {@link KeyValue}s. When asked to flush, current memstore is moved
- * to snapshot and is cleared. We continue to serve edits out of new memstore
- * and backing snapshot until flusher reports in that the flush succeeded. At
- * this point we let the snapshot go.
- *
- * The MemStore functions should not be called in parallel. Callers should hold
- * write and read locks. This is done in {@link HStore}.
- *
- *
- * TODO: Adjust size of the memstore when we remove items because they have
- * been deleted.
- * TODO: With new KVSLS, need to make sure we update HeapSize with difference
- * in KV size.
+ * The MemStore holds in-memory modifications to the Store. Modifications are {@link Cell}s.
+ *
+ * The MemStore functions should not be called in parallel. Callers should hold write and read
+ * locks. This is done in {@link HStore}.
+ *
*/
@InterfaceAudience.Private
-public class MemStore implements HeapSize {
- private static final Log LOG = LogFactory.getLog(MemStore.class);
-
- static final String USEMSLAB_KEY =
- "hbase.hregion.memstore.mslab.enabled";
- private static final boolean USEMSLAB_DEFAULT = true;
-
- private Configuration conf;
-
- // MemStore. Use a KeyValueSkipListSet rather than SkipListSet because of the
- // better semantics. The Map will overwrite if passed a key it already had
- // whereas the Set will not add new KV if key is same though value might be
- // different. Value is not important -- just make sure always same
- // reference passed.
- volatile KeyValueSkipListSet kvset;
-
- // Snapshot of memstore. Made for flusher.
- volatile KeyValueSkipListSet snapshot;
-
- final KeyValue.KVComparator comparator;
-
- // Used to track own heapSize
- final AtomicLong size;
-
- // Used to track when to flush
- volatile long timeOfOldestEdit = Long.MAX_VALUE;
-
- TimeRangeTracker timeRangeTracker;
- TimeRangeTracker snapshotTimeRangeTracker;
-
- MemStoreChunkPool chunkPool;
- volatile MemStoreLAB allocator;
- volatile MemStoreLAB snapshotAllocator;
+public interface MemStore extends HeapSize {
/**
- * Default constructor. Used for tests.
+ * Creates a snapshot of the current memstore. Snapshot must be cleared by call to
+ * {@link #clearSnapshot(long id)}. Use {@link KeyValueScanner} from {@link #getSnapshotScanner()}
+ * to iterate over the snapshot.
+ *
+ * @return {@link SnapshotInfo}
*/
- public MemStore() {
- this(HBaseConfiguration.create(), KeyValue.COMPARATOR);
- }
-
- /**
- * Constructor.
- * @param c Comparator
- */
- public MemStore(final Configuration conf,
- final KeyValue.KVComparator c) {
- this.conf = conf;
- this.comparator = c;
- this.kvset = new KeyValueSkipListSet(c);
- this.snapshot = new KeyValueSkipListSet(c);
- timeRangeTracker = new TimeRangeTracker();
- snapshotTimeRangeTracker = new TimeRangeTracker();
- this.size = new AtomicLong(DEEP_OVERHEAD);
- if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
- this.chunkPool = MemStoreChunkPool.getPool(conf);
- this.allocator = new MemStoreLAB(conf, chunkPool);
- } else {
- this.allocator = null;
- this.chunkPool = null;
- }
- }
-
- void dump() {
- for (KeyValue kv: this.kvset) {
- LOG.info(kv);
- }
- for (KeyValue kv: this.snapshot) {
- LOG.info(kv);
- }
- }
+ SnapshotInfo snapshot();
/**
- * Creates a snapshot of the current memstore.
- * Snapshot must be cleared by call to {@link #clearSnapshot(SortedSet)}
- * To get the snapshot made by this method, use {@link #getSnapshot()}
+ * @return {@link KeyValueScanner} for iterating over the snapshot
*/
- void snapshot() {
- // If snapshot currently has entries, then flusher failed or didn't call
- // cleanup. Log a warning.
- if (!this.snapshot.isEmpty()) {
- LOG.warn("Snapshot called again without clearing previous. " +
- "Doing nothing. Another ongoing flush or did we fail last attempt?");
- } else {
- if (!this.kvset.isEmpty()) {
- this.snapshot = this.kvset;
- this.kvset = new KeyValueSkipListSet(this.comparator);
- this.snapshotTimeRangeTracker = this.timeRangeTracker;
- this.timeRangeTracker = new TimeRangeTracker();
- // Reset heap to not include any keys
- this.size.set(DEEP_OVERHEAD);
- this.snapshotAllocator = this.allocator;
- // Reset allocator so we get a fresh buffer for the new memstore
- if (allocator != null) {
- this.allocator = new MemStoreLAB(conf, chunkPool);
- }
- timeOfOldestEdit = Long.MAX_VALUE;
- }
- }
- }
+ KeyValueScanner getSnapshotScanner();
/**
- * Return the current snapshot.
- * Called by flusher to get current snapshot made by a previous
- * call to {@link #snapshot()}
- * @return Return snapshot.
+ * Clears the current snapshot of the Memstore.
+ * @param id
* @see #snapshot()
- * @see #clearSnapshot(SortedSet)
*/
- KeyValueSkipListSet getSnapshot() {
- return this.snapshot;
- }
-
- /**
- * The passed snapshot was successfully persisted; it can be let go.
- * @param ss The snapshot to clean out.
- * @throws UnexpectedException
- * @see #snapshot()
- */
- void clearSnapshot(final SortedSet ss)
- throws UnexpectedException {
- MemStoreLAB tmpAllocator = null;
- if (this.snapshot != ss) {
- throw new UnexpectedException("Current snapshot is " +
- this.snapshot + ", was passed " + ss);
- }
- // OK. Passed in snapshot is same as current snapshot. If not-empty,
- // create a new snapshot and let the old one go.
- if (!ss.isEmpty()) {
- this.snapshot = new KeyValueSkipListSet(this.comparator);
- this.snapshotTimeRangeTracker = new TimeRangeTracker();
- }
- if (this.snapshotAllocator != null) {
- tmpAllocator = this.snapshotAllocator;
- this.snapshotAllocator = null;
- }
- if (tmpAllocator != null) {
- tmpAllocator.close();
- }
- }
+ void clearSnapshot(long id) throws UnexpectedException;
/**
* Write an update
- * @param kv
+ * @param cell
* @return approximate size of the passed key and value.
*/
- long add(final KeyValue kv) {
- KeyValue toAdd = maybeCloneWithAllocator(kv);
- return internalAdd(toAdd);
- }
-
- long timeOfOldestEdit() {
- return timeOfOldestEdit;
- }
-
- private boolean addToKVSet(KeyValue e) {
- boolean b = this.kvset.add(e);
- setOldestEditTimeToNow();
- return b;
- }
-
- private boolean removeFromKVSet(KeyValue e) {
- boolean b = this.kvset.remove(e);
- setOldestEditTimeToNow();
- return b;
- }
-
- void setOldestEditTimeToNow() {
- if (timeOfOldestEdit == Long.MAX_VALUE) {
- timeOfOldestEdit = EnvironmentEdgeManager.currentTimeMillis();
- }
- }
+ long add(final Cell cell);
/**
- * Internal version of add() that doesn't clone KVs with the
- * allocator, and doesn't take the lock.
- *
- * Callers should ensure they already have the read lock taken
+ * @return Oldest timestamp of all the Cells in the MemStore
*/
- private long internalAdd(final KeyValue toAdd) {
- long s = heapSizeChange(toAdd, addToKVSet(toAdd));
- timeRangeTracker.includeTimestamp(toAdd);
- this.size.addAndGet(s);
- return s;
- }
-
- private KeyValue maybeCloneWithAllocator(KeyValue kv) {
- if (allocator == null) {
- return kv;
- }
-
- int len = kv.getLength();
- Allocation alloc = allocator.allocateBytes(len);
- if (alloc == null) {
- // The allocation was too large, allocator decided
- // not to do anything with it.
- return kv;
- }
- assert alloc.getData() != null;
- System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len);
- KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len);
- newKv.setMvccVersion(kv.getMvccVersion());
- return newKv;
- }
+ long timeOfOldestEdit();
/**
- * Remove n key from the memstore. Only kvs that have the same key and the
- * same memstoreTS are removed. It is ok to not update timeRangeTracker
- * in this call. It is possible that we can optimize this method by using
- * tailMap/iterator, but since this method is called rarely (only for
- * error recovery), we can leave those optimization for the future.
- * @param kv
+ * Remove n key from the memstore. Only kvs that have the same key and the same memstoreTS are
+ * removed. It is ok to not update timeRangeTracker in this call.
+ * @param cell
*/
- void rollback(final KeyValue kv) {
- // If the key is in the snapshot, delete it. We should not update
- // this.size, because that tracks the size of only the memstore and
- // not the snapshot. The flush of this snapshot to disk has not
- // yet started because Store.flush() waits for all rwcc transactions to
- // commit before starting the flush to disk.
- KeyValue found = this.snapshot.get(kv);
- if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
- this.snapshot.remove(kv);
- }
- // If the key is in the memstore, delete it. Update this.size.
- found = this.kvset.get(kv);
- if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
- removeFromKVSet(kv);
- long s = heapSizeChange(kv, true);
- this.size.addAndGet(-s);
- }
- }
+ void rollback(final Cell cell);
/**
* Write a delete
- * @param delete
+ * @param deleteCell
* @return approximate size of the passed key and value.
*/
- long delete(final KeyValue delete) {
- long s = 0;
- KeyValue toAdd = maybeCloneWithAllocator(delete);
- s += heapSizeChange(toAdd, addToKVSet(toAdd));
- timeRangeTracker.includeTimestamp(toAdd);
- this.size.addAndGet(s);
- return s;
- }
-
- /**
- * @param kv Find the row that comes after this one. If null, we return the
- * first.
- * @return Next row or null if none found.
- */
- KeyValue getNextRow(final KeyValue kv) {
- return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot));
- }
-
- /*
- * @param a
- * @param b
- * @return Return lowest of a or b or null if both a and b are null
- */
- private KeyValue getLowest(final KeyValue a, final KeyValue b) {
- if (a == null) {
- return b;
- }
- if (b == null) {
- return a;
- }
- return comparator.compareRows(a, b) <= 0? a: b;
- }
-
- /*
- * @param key Find row that follows this one. If null, return first.
- * @param map Set to look in for a row beyond row.
- * @return Next row or null if none found. If one found, will be a new
- * KeyValue -- can be destroyed by subsequent calls to this method.
- */
- private KeyValue getNextRow(final KeyValue key,
- final NavigableSet set) {
- KeyValue result = null;
- SortedSet tail = key == null? set: set.tailSet(key);
- // Iterate until we fall into the next row; i.e. move off current row
- for (KeyValue kv: tail) {
- if (comparator.compareRows(kv, key) <= 0)
- continue;
- // Note: Not suppressing deletes or expired cells. Needs to be handled
- // by higher up functions.
- result = kv;
- break;
- }
- return result;
- }
+ long delete(final Cell deleteCell);
/**
+ * Find the key that matches row exactly, or the one that immediately precedes it. The
+ * target row key is set in state.
* @param state column/delete tracking state
*/
- void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
- getRowKeyAtOrBefore(kvset, state);
- getRowKeyAtOrBefore(snapshot, state);
- }
-
- /*
- * @param set
- * @param state Accumulates deletes and candidates.
- */
- private void getRowKeyAtOrBefore(final NavigableSet set,
- final GetClosestRowBeforeTracker state) {
- if (set.isEmpty()) {
- return;
- }
- if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) {
- // Found nothing in row. Try backing up.
- getRowKeyBefore(set, state);
- }
- }
-
- /*
- * Walk forward in a row from firstOnRow. Presumption is that
- * we have been passed the first possible key on a row. As we walk forward
- * we accumulate deletes until we hit a candidate on the row at which point
- * we return.
- * @param set
- * @param firstOnRow First possible key on this row.
- * @param state
- * @return True if we found a candidate walking this row.
- */
- private boolean walkForwardInSingleRow(final SortedSet set,
- final KeyValue firstOnRow, final GetClosestRowBeforeTracker state) {
- boolean foundCandidate = false;
- SortedSet tail = set.tailSet(firstOnRow);
- if (tail.isEmpty()) return foundCandidate;
- for (Iterator i = tail.iterator(); i.hasNext();) {
- KeyValue kv = i.next();
- // Did we go beyond the target row? If so break.
- if (state.isTooFar(kv, firstOnRow)) break;
- if (state.isExpired(kv)) {
- i.remove();
- continue;
- }
- // If we added something, this row is a contender. break.
- if (state.handle(kv)) {
- foundCandidate = true;
- break;
- }
- }
- return foundCandidate;
- }
-
- /*
- * Walk backwards through the passed set a row at a time until we run out of
- * set or until we get a candidate.
- * @param set
- * @param state
- */
- private void getRowKeyBefore(NavigableSet set,
- final GetClosestRowBeforeTracker state) {
- KeyValue firstOnRow = state.getTargetKey();
- for (Member p = memberOfPreviousRow(set, state, firstOnRow);
- p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) {
- // Make sure we don't fall out of our table.
- if (!state.isTargetTable(p.kv)) break;
- // Stop looking if we've exited the better candidate range.
- if (!state.isBetterCandidate(p.kv)) break;
- // Make into firstOnRow
- firstOnRow = new KeyValue(p.kv.getRowArray(), p.kv.getRowOffset(), p.kv.getRowLength(),
- HConstants.LATEST_TIMESTAMP);
- // If we find something, break;
- if (walkForwardInSingleRow(p.set, firstOnRow, state)) break;
- }
- }
-
- /**
- * Only used by tests. TODO: Remove
- *
- * Given the specs of a column, update it, first by inserting a new record,
- * then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS
- * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
- * store will ensure that the insert/delete each are atomic. A scanner/reader will either
- * get the new value, or the old value and all readers will eventually only see the new
- * value after the old was removed.
- *
- * @param row
- * @param family
- * @param qualifier
- * @param newValue
- * @param now
- * @return Timestamp
- */
- long updateColumnValue(byte[] row,
- byte[] family,
- byte[] qualifier,
- long newValue,
- long now) {
- KeyValue firstKv = KeyValue.createFirstOnRow(
- row, family, qualifier);
- // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
- SortedSet snSs = snapshot.tailSet(firstKv);
- if (!snSs.isEmpty()) {
- KeyValue snKv = snSs.first();
- // is there a matching KV in the snapshot?
- if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
- if (snKv.getTimestamp() == now) {
- // poop,
- now += 1;
- }
- }
- }
-
- // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
- // But the timestamp should also be max(now, mostRecentTsInMemstore)
-
- // so we cant add the new KV w/o knowing what's there already, but we also
- // want to take this chance to delete some kvs. So two loops (sad)
-
- SortedSet ss = kvset.tailSet(firstKv);
- for (KeyValue kv : ss) {
- // if this isnt the row we are interested in, then bail:
- if (!kv.matchingColumn(family, qualifier) || !kv.matchingRow(firstKv)) {
- break; // rows dont match, bail.
- }
-
- // if the qualifier matches and it's a put, just RM it out of the kvset.
- if (kv.getTypeByte() == KeyValue.Type.Put.getCode() &&
- kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) {
- now = kv.getTimestamp();
- }
- }
-
- // create or update (upsert) a new KeyValue with
- // 'now' and a 0 memstoreTS == immediately visible
- List cells = new ArrayList| (1);
- cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
- return upsert(cells, 1L);
- }
+ void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state);
/**
- * Update or insert the specified KeyValues.
+ * Update or insert the specified cells.
*
- * For each KeyValue, insert into MemStore. This will atomically upsert the
- * value for that row/family/qualifier. If a KeyValue did already exist,
- * it will then be removed.
+ * For each Cell, insert into MemStore. This will atomically upsert the value for that
+ * row/family/qualifier. If a Cell did already exist, it will then be removed.
*
- * Currently the memstoreTS is kept at 0 so as each insert happens, it will
- * be immediately visible. May want to change this so it is atomic across
- * all KeyValues.
+ * Currently the memstoreTS is kept at 0 so as each insert happens, it will be immediately
+ * visible. May want to change this so it is atomic across all KeyValues.
*
- * This is called under row lock, so Get operations will still see updates
- * atomically. Scans will only see each KeyValue update as atomic.
- *
+ * This is called under row lock, so Get operations will still see updates atomically. Scans will
+ * only see each KeyValue update as atomic.
* @param cells
- * @param readpoint readpoint below which we can safely remove duplicate KVs
+ * @param readpoint readpoint below which we can safely remove duplicate Cells.
* @return change in memstore size
*/
- public long upsert(Iterable cells, long readpoint) {
- long size = 0;
- for (Cell cell : cells) {
- size += upsert(cell, readpoint);
- }
- return size;
- }
+ long upsert(Iterable| cells, long readpoint);
/**
- * Inserts the specified KeyValue into MemStore and deletes any existing
- * versions of the same row/family/qualifier as the specified KeyValue.
- *
- * First, the specified KeyValue is inserted into the Memstore.
- *
- * If there are any existing KeyValues in this MemStore with the same row,
- * family, and qualifier, they are removed.
- *
- * Callers must hold the read lock.
- *
- * @param cell
- * @return change in size of MemStore
- */
- private long upsert(Cell cell, long readpoint) {
- // Add the KeyValue to the MemStore
- // Use the internalAdd method here since we (a) already have a lock
- // and (b) cannot safely use the MSLAB here without potentially
- // hitting OOME - see TestMemStore.testUpsertMSLAB for a
- // test that triggers the pathological case if we don't avoid MSLAB
- // here.
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- long addedSize = internalAdd(kv);
-
- // Get the KeyValues for the row/family/qualifier regardless of timestamp.
- // For this case we want to clean up any other puts
- KeyValue firstKv = KeyValue.createFirstOnRow(
- kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
- kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(),
- kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength());
- SortedSet ss = kvset.tailSet(firstKv);
- Iterator it = ss.iterator();
- // versions visible to oldest scanner
- int versionsVisible = 0;
- while ( it.hasNext() ) {
- KeyValue cur = it.next();
-
- if (kv == cur) {
- // ignore the one just put in
- continue;
- }
- // check that this is the row and column we are interested in, otherwise bail
- if (kv.matchingRow(cur) && kv.matchingQualifier(cur)) {
- // only remove Puts that concurrent scanners cannot possibly see
- if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
- cur.getMvccVersion() <= readpoint) {
- if (versionsVisible > 1) {
- // if we get here we have seen at least one version visible to the oldest scanner,
- // which means we can prove that no scanner will see this version
-
- // false means there was a change, so give us the size.
- long delta = heapSizeChange(cur, true);
- addedSize -= delta;
- this.size.addAndGet(-delta);
- it.remove();
- setOldestEditTimeToNow();
- } else {
- versionsVisible++;
- }
- }
- } else {
- // past the row or column, done
- break;
- }
- }
- return addedSize;
- }
-
- /*
- * Immutable data structure to hold member found in set and the set it was
- * found in. Include set because it is carrying context.
- */
- private static class Member {
- final KeyValue kv;
- final NavigableSet set;
- Member(final NavigableSet s, final KeyValue kv) {
- this.kv = kv;
- this.set = s;
- }
- }
-
- /*
- * @param set Set to walk back in. Pass a first in row or we'll return
- * same row (loop).
- * @param state Utility and context.
- * @param firstOnRow First item on the row after the one we want to find a
- * member in.
- * @return Null or member of row previous to firstOnRow
+ * @return scanner over the memstore. This might include scanner over the snapshot when one is
+ * present.
*/
- private Member memberOfPreviousRow(NavigableSet set,
- final GetClosestRowBeforeTracker state, final KeyValue firstOnRow) {
- NavigableSet head = set.headSet(firstOnRow, false);
- if (head.isEmpty()) return null;
- for (Iterator i = head.descendingIterator(); i.hasNext();) {
- KeyValue found = i.next();
- if (state.isExpired(found)) {
- i.remove();
- continue;
- }
- return new Member(head, found);
- }
- return null;
- }
+ List getScanners(long readPt);
/**
- * @return scanner on memstore and snapshot in this order.
+ * @return {@link TimeRangeTracker} for all the Cells in the snapshot.
*/
- List getScanners(long readPt) {
- return Collections.singletonList(
- new MemStoreScanner(readPt));
- }
+ TimeRangeTracker getSnapshotTimeRangeTracker();
- /**
- * Check if this memstore may contain the required keys
- * @param scan
- * @return False if the key definitely does not exist in this Memstore
- */
- public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
- return (timeRangeTracker.includesTimeRange(scan.getTimeRange()) ||
- snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange()))
- && (Math.max(timeRangeTracker.getMaximumTimestamp(),
- snapshotTimeRangeTracker.getMaximumTimestamp()) >=
- oldestUnexpiredTS);
- }
+ long size();
- public TimeRangeTracker getSnapshotTimeRangeTracker() {
- return this.snapshotTimeRangeTracker;
- }
-
- /*
- * MemStoreScanner implements the KeyValueScanner.
- * It lets the caller scan the contents of a memstore -- both current
- * map and snapshot.
- * This behaves as if it were a real scanner but does not maintain position.
+ /**
+ * Holds details of the snapshot taken on this Memstore. Details include the snapshot's
+ * identifier, count of cells in it and total memory size occupied by all the cells.
*/
- protected class MemStoreScanner extends NonLazyKeyValueScanner {
- // Next row information for either kvset or snapshot
- private KeyValue kvsetNextRow = null;
- private KeyValue snapshotNextRow = null;
-
- // last iterated KVs for kvset and snapshot (to restore iterator state after reseek)
- private KeyValue kvsetItRow = null;
- private KeyValue snapshotItRow = null;
-
- // iterator based scanning.
- private Iterator kvsetIt;
- private Iterator snapshotIt;
-
- // The kvset and snapshot at the time of creating this scanner
- private KeyValueSkipListSet kvsetAtCreation;
- private KeyValueSkipListSet snapshotAtCreation;
-
- // the pre-calculated KeyValue to be returned by peek() or next()
- private KeyValue theNext;
-
- // The allocator and snapshot allocator at the time of creating this scanner
- volatile MemStoreLAB allocatorAtCreation;
- volatile MemStoreLAB snapshotAllocatorAtCreation;
-
- // A flag represents whether could stop skipping KeyValues for MVCC
- // if have encountered the next row. Only used for reversed scan
- private boolean stopSkippingKVsIfNextRow = false;
-
- private long readPoint;
-
- /*
- Some notes...
-
- So memstorescanner is fixed at creation time. this includes pointers/iterators into
- existing kvset/snapshot. during a snapshot creation, the kvset is null, and the
- snapshot is moved. since kvset is null there is no point on reseeking on both,
- we can save us the trouble. During the snapshot->hfile transition, the memstore
- scanner is re-created by StoreScanner#updateReaders(). StoreScanner should
- potentially do something smarter by adjusting the existing memstore scanner.
-
- But there is a greater problem here, that being once a scanner has progressed
- during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
- if a scan lasts a little while, there is a chance for new entries in kvset to
- become available but we will never see them. This needs to be handled at the
- StoreScanner level with coordination with MemStoreScanner.
-
- Currently, this problem is only partly managed: during the small amount of time
- when the StoreScanner has not yet created a new MemStoreScanner, we will miss
- the adds to kvset in the MemStoreScanner.
- */
-
- MemStoreScanner(long readPoint) {
- super();
-
- this.readPoint = readPoint;
- kvsetAtCreation = kvset;
- snapshotAtCreation = snapshot;
- if (allocator != null) {
- this.allocatorAtCreation = allocator;
- this.allocatorAtCreation.incScannerCount();
- }
- if (snapshotAllocator != null) {
- this.snapshotAllocatorAtCreation = snapshotAllocator;
- this.snapshotAllocatorAtCreation.incScannerCount();
- }
- }
-
- private KeyValue getNext(Iterator it) {
- KeyValue startKV = theNext;
- KeyValue v = null;
- try {
- while (it.hasNext()) {
- v = it.next();
- if (v.getMvccVersion() <= this.readPoint) {
- return v;
- }
- if (stopSkippingKVsIfNextRow && startKV != null
- && comparator.compareRows(v, startKV) > 0) {
- return null;
- }
- }
-
- return null;
- } finally {
- if (v != null) {
- // in all cases, remember the last KV iterated to
- if (it == snapshotIt) {
- snapshotItRow = v;
- } else {
- kvsetItRow = v;
- }
- }
- }
- }
-
- /**
- * Set the scanner at the seek key.
- * Must be called only once: there is no thread safety between the scanner
- * and the memStore.
- * @param key seek value
- * @return false if the key is null or if there is no data
- */
- @Override
- public synchronized boolean seek(KeyValue key) {
- if (key == null) {
- close();
- return false;
- }
-
- // kvset and snapshot will never be null.
- // if tailSet can't find anything, SortedSet is empty (not null).
- kvsetIt = kvsetAtCreation.tailSet(key).iterator();
- snapshotIt = snapshotAtCreation.tailSet(key).iterator();
- kvsetItRow = null;
- snapshotItRow = null;
-
- return seekInSubLists(key);
- }
-
-
- /**
- * (Re)initialize the iterators after a seek or a reseek.
- */
- private synchronized boolean seekInSubLists(KeyValue key){
- kvsetNextRow = getNext(kvsetIt);
- snapshotNextRow = getNext(snapshotIt);
+ static class SnapshotInfo {
+ private long id;
+ private int cellsCount;
+ private long size;
- // Calculate the next value
- theNext = getLowest(kvsetNextRow, snapshotNextRow);
-
- // has data
- return (theNext != null);
- }
-
-
- /**
- * Move forward on the sub-lists set previously by seek.
- * @param key seek value (should be non-null)
- * @return true if there is at least one KV to read, false otherwise
- */
- @Override
- public synchronized boolean reseek(KeyValue key) {
- /*
- See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
- This code is executed concurrently with flush and puts, without locks.
- Two points must be known when working on this code:
- 1) It's not possible to use the 'kvTail' and 'snapshot'
- variables, as they are modified during a flush.
- 2) The ideal implementation for performance would use the sub skip list
- implicitly pointed by the iterators 'kvsetIt' and
- 'snapshotIt'. Unfortunately the Java API does not offer a method to
- get it. So we remember the last keys we iterated to and restore
- the reseeked set to at least that point.
- */
-
- kvsetIt = kvsetAtCreation.tailSet(getHighest(key, kvsetItRow)).iterator();
- snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
-
- return seekInSubLists(key);
- }
-
-
- @Override
- public synchronized KeyValue peek() {
- //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
- return theNext;
- }
-
- @Override
- public synchronized KeyValue next() {
- if (theNext == null) {
- return null;
- }
-
- final KeyValue ret = theNext;
-
- // Advance one of the iterators
- if (theNext == kvsetNextRow) {
- kvsetNextRow = getNext(kvsetIt);
- } else {
- snapshotNextRow = getNext(snapshotIt);
- }
-
- // Calculate the next value
- theNext = getLowest(kvsetNextRow, snapshotNextRow);
-
- //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
- //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
- // getLowest() + " threadpoint=" + readpoint);
- return ret;
- }
-
- /*
- * Returns the lower of the two key values, or null if they are both null.
- * This uses comparator.compare() to compare the KeyValue using the memstore
- * comparator.
- */
- private KeyValue getLowest(KeyValue first, KeyValue second) {
- if (first == null && second == null) {
- return null;
- }
- if (first != null && second != null) {
- int compare = comparator.compare(first, second);
- return (compare <= 0 ? first : second);
- }
- return (first != null ? first : second);
- }
-
- /*
- * Returns the higher of the two key values, or null if they are both null.
- * This uses comparator.compare() to compare the KeyValue using the memstore
- * comparator.
- */
- private KeyValue getHighest(KeyValue first, KeyValue second) {
- if (first == null && second == null) {
- return null;
- }
- if (first != null && second != null) {
- int compare = comparator.compare(first, second);
- return (compare > 0 ? first : second);
- }
- return (first != null ? first : second);
- }
-
- public synchronized void close() {
- this.kvsetNextRow = null;
- this.snapshotNextRow = null;
-
- this.kvsetIt = null;
- this.snapshotIt = null;
-
- if (allocatorAtCreation != null) {
- this.allocatorAtCreation.decScannerCount();
- this.allocatorAtCreation = null;
- }
- if (snapshotAllocatorAtCreation != null) {
- this.snapshotAllocatorAtCreation.decScannerCount();
- this.snapshotAllocatorAtCreation = null;
- }
-
- this.kvsetItRow = null;
- this.snapshotItRow = null;
+ SnapshotInfo(long id, int cellsCount, long size) {
+ this.id = id;
+ this.cellsCount = cellsCount;
+ this.size = size;
}
- /**
- * MemStoreScanner returns max value as sequence id because it will
- * always have the latest data among all files.
- */
- @Override
- public long getSequenceID() {
- return Long.MAX_VALUE;
+ public long getId() {
+ return id;
}
- @Override
- public boolean shouldUseScanner(Scan scan, SortedSet columns,
- long oldestUnexpiredTS) {
- return shouldSeek(scan, oldestUnexpiredTS);
+ public int getCellsCount() {
+ return cellsCount;
}
- /**
- * Seek scanner to the given key first. If it returns false(means
- * peek()==null) or scanner's peek row is bigger than row of given key, seek
- * the scanner to the previous row of given key
- */
- @Override
- public synchronized boolean backwardSeek(KeyValue key) {
- seek(key);
- if (peek() == null || comparator.compareRows(peek(), key) > 0) {
- return seekToPreviousRow(key);
- }
- return true;
+ public long getSize() {
+ return size;
}
- /**
- * Separately get the KeyValue before the specified key from kvset and
- * snapshotset, and use the row of higher one as the previous row of
- * specified key, then seek to the first KeyValue of previous row
- */
- @Override
- public synchronized boolean seekToPreviousRow(KeyValue key) {
- KeyValue firstKeyOnRow = KeyValue.createFirstOnRow(key.getRow());
- SortedSet kvHead = kvsetAtCreation.headSet(firstKeyOnRow);
- KeyValue kvsetBeforeRow = kvHead.isEmpty() ? null : kvHead.last();
- SortedSet snapshotHead = snapshotAtCreation
- .headSet(firstKeyOnRow);
- KeyValue snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead
- .last();
- KeyValue lastKVBeforeRow = getHighest(kvsetBeforeRow, snapshotBeforeRow);
- if (lastKVBeforeRow == null) {
- theNext = null;
- return false;
- }
- KeyValue firstKeyOnPreviousRow = KeyValue
- .createFirstOnRow(lastKVBeforeRow.getRow());
- this.stopSkippingKVsIfNextRow = true;
- seek(firstKeyOnPreviousRow);
- this.stopSkippingKVsIfNextRow = false;
- if (peek() == null
- || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) {
- return seekToPreviousRow(lastKVBeforeRow);
- }
- return true;
- }
-
- @Override
- public synchronized boolean seekToLastRow() {
- KeyValue first = kvsetAtCreation.isEmpty() ? null : kvsetAtCreation
- .last();
- KeyValue second = snapshotAtCreation.isEmpty() ? null
- : snapshotAtCreation.last();
- KeyValue higherKv = getHighest(first, second);
- if (higherKv == null) {
- return false;
- }
- KeyValue firstKvOnLastRow = KeyValue.createFirstOnRow(higherKv.getRow());
- if (seek(firstKvOnLastRow)) {
- return true;
- } else {
- return seekToPreviousRow(higherKv);
- }
-
- }
- }
-
- public final static long FIXED_OVERHEAD = ClassSize.align(
- ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
-
- public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
- ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
- (2 * ClassSize.KEYVALUE_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
-
- /*
- * Calculate how the MemStore size has changed. Includes overhead of the
- * backing Map.
- * @param kv
- * @param notpresent True if the kv was NOT present in the set.
- * @return Size
- */
- static long heapSizeChange(final KeyValue kv, final boolean notpresent) {
- return notpresent ?
- ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()):
- 0;
- }
-
- /**
- * Get the entire heap usage for this MemStore not including keys in the
- * snapshot.
- */
- @Override
- public long heapSize() {
- return size.get();
- }
-
- /**
- * Get the heap usage of KVs in this MemStore.
- */
- public long keySize() {
- return heapSize() - DEEP_OVERHEAD;
- }
-
- /**
- * Code to help figure if our approximation of object heap sizes is close
- * enough. See hbase-900. Fills memstores then waits so user can heap
- * dump and bring up resultant hprof in something like jprofiler which
- * allows you get 'deep size' on objects.
- * @param args main args
- */
- public static void main(String [] args) {
- RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
- LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
- runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
- LOG.info("vmInputArguments=" + runtime.getInputArguments());
- MemStore memstore1 = new MemStore();
- // TODO: x32 vs x64
- long size = 0;
- final int count = 10000;
- byte [] fam = Bytes.toBytes("col");
- byte [] qf = Bytes.toBytes("umn");
- byte [] empty = new byte[0];
- for (int i = 0; i < count; i++) {
- // Give each its own ts
- size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
- }
- LOG.info("memstore1 estimated size=" + size);
- for (int i = 0; i < count; i++) {
- size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
- }
- LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
- // Make a variably sized memstore.
- MemStore memstore2 = new MemStore();
- for (int i = 0; i < count; i++) {
- size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i,
- new byte[i]));
- }
- LOG.info("memstore2 estimated size=" + size);
- final int seconds = 30;
- LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
- for (int i = 0; i < seconds; i++) {
- // Thread.sleep(1000);
+ public long heapSize() {
+ return ClassSize.align(ClassSize.OBJECT + (2 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_INT);
}
- LOG.info("Exiting.");
}
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
index efd250b..c6384fc 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
@@ -22,11 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -36,8 +32,8 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.MemStore.SnapshotInfo;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
-import org.apache.hadoop.hbase.util.CollectionBackedScanner;
/**
* Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one).
@@ -55,16 +51,16 @@ abstract class StoreFlusher {
/**
* Turns a snapshot of memstore into a set of store files.
- * @param snapshot Memstore snapshot.
+ * @param snapshotScanner
+ * @param snapshotInfo
* @param cacheFlushSeqNum Log cache flush sequence number.
* @param snapshotTimeRangeTracker Time range tracker from the memstore
* pertaining to the snapshot.
- * @param flushedSize Out parameter for the size of the KVs flushed.
* @param status Task that represents the flush operation and may be updated with status.
* @return List of files written. Can be empty; must not be null.
*/
- public abstract List flushSnapshot(SortedSet snapshot, long cacheFlushSeqNum,
- TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, MonitoredTask status)
+ public abstract List flushSnapshot(KeyValueScanner snapshotScanner, SnapshotInfo snapshotInfo,
+ long cacheFlushSeqNum, TimeRangeTracker snapshotTimeRangeTracker, MonitoredTask status)
throws IOException;
protected void finalizeWriter(StoreFile.Writer writer, long cacheFlushSeqNum,
@@ -81,12 +77,12 @@ abstract class StoreFlusher {
/**
* Creates the scanner for flushing snapshot. Also calls coprocessors.
+ * @param memstoreScanner
+ * @param smallestReadPoint
* @return The scanner; null if coprocessor is canceling the flush.
*/
- protected InternalScanner createScanner(SortedSet snapshot,
+ protected InternalScanner createScanner(KeyValueScanner memstoreScanner,
long smallestReadPoint) throws IOException {
- KeyValueScanner memstoreScanner =
- new CollectionBackedScanner(snapshot, store.getComparator());
InternalScanner scanner = null;
if (store.getCoprocessorHost() != null) {
scanner = store.getCoprocessorHost().preFlushScannerOpen(store, memstoreScanner);
@@ -115,15 +111,13 @@ abstract class StoreFlusher {
* @param scanner Scanner to get data from.
* @param sink Sink to write data to. Could be StoreFile.Writer.
* @param smallestReadPoint Smallest read point used for the flush.
- * @return Bytes flushed.
*/
- protected long performFlush(InternalScanner scanner,
+ protected void performFlush(InternalScanner scanner,
Compactor.CellSink sink, long smallestReadPoint) throws IOException {
int compactionKVMax =
conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
List kvs = new ArrayList();
boolean hasMore;
- long flushed = 0;
do {
hasMore = scanner.next(kvs, compactionKVMax);
if (!kvs.isEmpty()) {
@@ -139,11 +133,10 @@ abstract class StoreFlusher {
kv.setMvccVersion(0);
}
sink.append(kv);
- flushed += MemStore.heapSizeChange(kv, true);
}
kvs.clear();
}
} while (hasMore);
- return flushed;
}
+
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
index a2ece5d..4d231f0 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
@@ -21,21 +21,17 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
-import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.MemStore.SnapshotInfo;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
-import org.apache.hadoop.hbase.util.CollectionBackedScanner;
import com.google.common.annotations.VisibleForTesting;
@@ -57,33 +53,32 @@ public class StripeStoreFlusher extends StoreFlusher {
}
@Override
- public List flushSnapshot(SortedSet snapshot, long cacheFlushSeqNum,
- final TimeRangeTracker tracker, AtomicLong flushedSize, MonitoredTask status)
- throws IOException {
+ public List flushSnapshot(KeyValueScanner snapshotScanner, SnapshotInfo snapshotInfo,
+ long cacheFlushSeqNum, final TimeRangeTracker tracker, MonitoredTask status)
+ throws IOException {
List result = null;
- int kvCount = snapshot.size();
- if (kvCount == 0) return result; // don't flush if there are no entries
+ int cellsCount = snapshotInfo.getCellsCount();
+ if (cellsCount == 0) return result; // don't flush if there are no entries
long smallestReadPoint = store.getSmallestReadPoint();
- InternalScanner scanner = createScanner(snapshot, smallestReadPoint);
+ InternalScanner scanner = createScanner(snapshotScanner, smallestReadPoint);
if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}
// Let policy select flush method.
- StripeFlushRequest req = this.policy.selectFlush(this.stripes, kvCount);
+ StripeFlushRequest req = this.policy.selectFlush(this.stripes, cellsCount);
- long flushedBytes = 0;
boolean success = false;
StripeMultiFileWriter mw = null;
try {
mw = req.createWriter(); // Writer according to the policy.
- StripeMultiFileWriter.WriterFactory factory = createWriterFactory(tracker, kvCount);
+ StripeMultiFileWriter.WriterFactory factory = createWriterFactory(tracker, cellsCount);
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
mw.init(storeScanner, factory, store.getComparator());
synchronized (flushLock) {
- flushedBytes = performFlush(scanner, mw, smallestReadPoint);
+ performFlush(scanner, mw, smallestReadPoint);
result = mw.commitWriters(cacheFlushSeqNum, false);
success = true;
}
@@ -100,7 +95,6 @@ public class StripeStoreFlusher extends StoreFlusher {
}
}
}
- flushedSize.set(flushedBytes);
try {
scanner.close();
} catch (IOException ex) {
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index 5f8458a..d6e010e 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -47,7 +47,7 @@ import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.KeyValueSkipListSet;
-import org.apache.hadoop.hbase.regionserver.MemStore;
+import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.ClassSize;
import org.junit.BeforeClass;
@@ -292,8 +292,8 @@ public class TestHeapSize {
}
// MemStore Overhead
- cl = MemStore.class;
- actual = MemStore.FIXED_OVERHEAD;
+ cl = DefaultMemStore.class;
+ actual = DefaultMemStore.FIXED_OVERHEAD;
expected = ClassSize.estimateBase(cl, false);
if(expected != actual) {
ClassSize.estimateBase(cl, true);
@@ -301,7 +301,7 @@ public class TestHeapSize {
}
// MemStore Deep Overhead
- actual = MemStore.DEEP_OVERHEAD;
+ actual = DefaultMemStore.DEEP_OVERHEAD;
expected = ClassSize.estimateBase(cl, false);
expected += ClassSize.estimateBase(AtomicLong.class, false);
expected += (2 * ClassSize.estimateBase(KeyValueSkipListSet.class, false));
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index e518074..68ac35d 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -1154,13 +1154,9 @@ public class TestHRegion {
put.add(kv);
// checkAndPut with wrong value
- HStore store = (HStore) region.getStore(fam1);
- store.memstore.kvset.size();
-
boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(
val1), put, true);
assertEquals(true, res);
- store.memstore.kvset.size();
Get get = new Get(row1);
get.addColumn(fam2, qf1);
@@ -1673,10 +1669,11 @@ public class TestHRegion {
// extract the key values out the memstore:
// This is kinda hacky, but better than nothing...
long now = System.currentTimeMillis();
- KeyValue firstKv = ((HStore) region.getStore(fam1)).memstore.kvset.first();
+ DefaultMemStore memstore = (DefaultMemStore) ((HStore) region.getStore(fam1)).memstore;
+ KeyValue firstKv = memstore.kvset.first();
assertTrue(firstKv.getTimestamp() <= now);
now = firstKv.getTimestamp();
- for (KeyValue kv : ((HStore) region.getStore(fam1)).memstore.kvset) {
+ for (KeyValue kv : memstore.kvset) {
assertTrue(kv.getTimestamp() <= now);
now = kv.getTimestamp();
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
index 3794ab6..51f62dc 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
@@ -19,8 +19,6 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
import java.rmi.UnexpectedException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -43,6 +41,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.MemStore.SnapshotInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -56,20 +55,17 @@ import com.google.common.collect.Lists;
@Category(MediumTests.class)
public class TestMemStore extends TestCase {
private final Log LOG = LogFactory.getLog(this.getClass());
- private MemStore memstore;
+ private DefaultMemStore memstore;
private static final int ROW_COUNT = 10;
private static final int QUALIFIER_COUNT = ROW_COUNT;
private static final byte [] FAMILY = Bytes.toBytes("column");
- private static final byte [] CONTENTS = Bytes.toBytes("contents");
- private static final byte [] BASIC = Bytes.toBytes("basic");
- private static final String CONTENTSTR = "contentstr";
private MultiVersionConsistencyControl mvcc;
@Override
public void setUp() throws Exception {
super.setUp();
this.mvcc = new MultiVersionConsistencyControl();
- this.memstore = new MemStore();
+ this.memstore = new DefaultMemStore();
}
public void testPutSameKey() {
@@ -154,8 +150,8 @@ public class TestMemStore extends TestCase {
assertEquals("count=" + count + ", result=" + result, rowCount, result.size());
count++;
if (count == snapshotIndex) {
- this.memstore.snapshot();
- this.memstore.clearSnapshot(this.memstore.getSnapshot());
+ SnapshotInfo snapshotInfo = this.memstore.snapshot();
+ this.memstore.clearSnapshot(snapshotInfo.getId());
// Added more rows into kvset. But the scanner wont see these rows.
addRows(this.memstore, ts);
LOG.info("Snapshotted, cleared it and then added values (which wont be seen)");
@@ -195,7 +191,7 @@ public class TestMemStore extends TestCase {
verifyScanAcrossSnapshot2(kv1, kv2);
// use case 3: first in snapshot second in kvset
- this.memstore = new MemStore();
+ this.memstore = new DefaultMemStore();
this.memstore.add(kv1.clone());
this.memstore.snapshot();
this.memstore.add(kv2.clone());
@@ -379,13 +375,13 @@ public class TestMemStore extends TestCase {
final byte[] q1 = Bytes.toBytes("q1");
final MultiVersionConsistencyControl mvcc;
- final MemStore memstore;
+ final DefaultMemStore memstore;
AtomicReference caughtException;
public ReadOwnWritesTester(int id,
- MemStore memstore,
+ DefaultMemStore memstore,
MultiVersionConsistencyControl mvcc,
AtomicReference caughtException)
{
@@ -459,13 +455,12 @@ public class TestMemStore extends TestCase {
for (int i = 0; i < snapshotCount; i++) {
addRows(this.memstore);
runSnapshot(this.memstore);
- KeyValueSkipListSet ss = this.memstore.getSnapshot();
- assertEquals("History not being cleared", 0, ss.size());
+ assertEquals("History not being cleared", 0, this.memstore.snapshot.size());
}
}
public void testMultipleVersionsSimple() throws Exception {
- MemStore m = new MemStore(new Configuration(), KeyValue.COMPARATOR);
+ DefaultMemStore m = new DefaultMemStore(new Configuration(), KeyValue.COMPARATOR);
byte [] row = Bytes.toBytes("testRow");
byte [] family = Bytes.toBytes("testFamily");
byte [] qf = Bytes.toBytes("testQualifier");
@@ -755,52 +750,6 @@ public class TestMemStore extends TestCase {
//assertTrue(!memstore.shouldSeek(scan));
}
- ////////////////////////////////////
- //Test for upsert with MSLAB
- ////////////////////////////////////
-
- /**
- * Test a pathological pattern that shows why we can't currently
- * use the MSLAB for upsert workloads. This test inserts data
- * in the following pattern:
- *
- * - row0001 through row1000 (fills up one 2M Chunk)
- * - row0002 through row1001 (fills up another 2M chunk, leaves one reference
- * to the first chunk
- * - row0003 through row1002 (another chunk, another dangling reference)
- *
- * This causes OOME pretty quickly if we use MSLAB for upsert
- * since each 2M chunk is held onto by a single reference.
- */
- public void testUpsertMSLAB() throws Exception {
- Configuration conf = HBaseConfiguration.create();
- conf.setBoolean(MemStore.USEMSLAB_KEY, true);
- memstore = new MemStore(conf, KeyValue.COMPARATOR);
-
- int ROW_SIZE = 2048;
- byte[] qualifier = new byte[ROW_SIZE - 4];
-
- MemoryMXBean bean = ManagementFactory.getMemoryMXBean();
- for (int i = 0; i < 3; i++) { System.gc(); }
- long usageBefore = bean.getHeapMemoryUsage().getUsed();
-
- long size = 0;
- long ts=0;
-
- for (int newValue = 0; newValue < 1000; newValue++) {
- for (int row = newValue; row < newValue + 1000; row++) {
- byte[] rowBytes = Bytes.toBytes(row);
- size += memstore.updateColumnValue(rowBytes, FAMILY, qualifier, newValue, ++ts);
- }
- }
- System.out.println("Wrote " + ts + " vals");
- for (int i = 0; i < 3; i++) { System.gc(); }
- long usageAfter = bean.getHeapMemoryUsage().getUsed();
- System.out.println("Memory used: " + (usageAfter - usageBefore)
- + " (heapsize: " + memstore.heapSize() +
- " size: " + size + ")");
- }
-
//////////////////////////////////////////////////////////////////////////////
// Helpers
//////////////////////////////////////////////////////////////////////////////
@@ -816,7 +765,7 @@ public class TestMemStore extends TestCase {
*/
public void testUpsertMemstoreSize() throws Exception {
Configuration conf = HBaseConfiguration.create();
- memstore = new MemStore(conf, KeyValue.COMPARATOR);
+ memstore = new DefaultMemStore(conf, KeyValue.COMPARATOR);
long oldSize = memstore.size.get();
List l = new ArrayList();
@@ -853,7 +802,7 @@ public class TestMemStore extends TestCase {
try {
EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
EnvironmentEdgeManager.injectEdge(edge);
- MemStore memstore = new MemStore();
+ DefaultMemStore memstore = new DefaultMemStore();
long t = memstore.timeOfOldestEdit();
assertEquals(t, Long.MAX_VALUE);
@@ -939,7 +888,7 @@ public class TestMemStore extends TestCase {
* @return How many rows we added.
* @throws IOException
*/
- private int addRows(final MemStore hmc) {
+ private int addRows(final DefaultMemStore hmc) {
return addRows(hmc, HConstants.LATEST_TIMESTAMP);
}
@@ -949,7 +898,7 @@ public class TestMemStore extends TestCase {
* @return How many rows we added.
* @throws IOException
*/
- private int addRows(final MemStore hmc, final long ts) {
+ private int addRows(final DefaultMemStore hmc, final long ts) {
for (int i = 0; i < ROW_COUNT; i++) {
long timestamp = ts == HConstants.LATEST_TIMESTAMP?
System.currentTimeMillis(): ts;
@@ -962,16 +911,15 @@ public class TestMemStore extends TestCase {
return ROW_COUNT;
}
- private long runSnapshot(final MemStore hmc) throws UnexpectedException {
+ private long runSnapshot(final DefaultMemStore hmc) throws UnexpectedException {
// Save off old state.
- int oldHistorySize = hmc.getSnapshot().size();
- hmc.snapshot();
- KeyValueSkipListSet ss = hmc.getSnapshot();
+ int oldHistorySize = hmc.snapshot.size();
+ SnapshotInfo snapshotInfo = hmc.snapshot();
// Make some assertions about what just happened.
- assertTrue("History size has not increased", oldHistorySize < ss.size());
+ assertTrue("History size has not increased", oldHistorySize < hmc.snapshot.size());
long t = memstore.timeOfOldestEdit();
assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE);
- hmc.clearSnapshot(ss);
+ hmc.clearSnapshot(snapshotInfo.getId());
return t;
}
@@ -998,7 +946,7 @@ public class TestMemStore extends TestCase {
return new KeyValue(row, Bytes.toBytes("test_col"), null,
HConstants.LATEST_TIMESTAMP, value);
}
- private static void addRows(int count, final MemStore mem) {
+ private static void addRows(int count, final DefaultMemStore mem) {
long nanos = System.nanoTime();
for (int i = 0 ; i < count ; i++) {
@@ -1018,7 +966,7 @@ public class TestMemStore extends TestCase {
}
- static void doScan(MemStore ms, int iteration) throws IOException {
+ static void doScan(DefaultMemStore ms, int iteration) throws IOException {
long nanos = System.nanoTime();
KeyValueScanner s = ms.getScanners(0).get(0);
s.seek(KeyValue.createFirstOnRow(new byte[]{}));
@@ -1033,7 +981,7 @@ public class TestMemStore extends TestCase {
public static void main(String [] args) throws IOException {
MultiVersionConsistencyControl mvcc = new MultiVersionConsistencyControl();
- MemStore ms = new MemStore();
+ DefaultMemStore ms = new DefaultMemStore();
long n1 = System.nanoTime();
addRows(25000, ms);
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
index 6936c5b..e13d581 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
@@ -28,6 +28,7 @@ import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.regionserver.MemStore.SnapshotInfo;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
@@ -47,7 +48,7 @@ public class TestMemStoreChunkPool {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- conf.setBoolean(MemStore.USEMSLAB_KEY, true);
+ conf.setBoolean(DefaultMemStore.USEMSLAB_KEY, true);
conf.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
chunkPoolDisabledBeforeTest = MemStoreChunkPool.chunkPoolDisabled;
MemStoreChunkPool.chunkPoolDisabled = false;
@@ -107,7 +108,7 @@ public class TestMemStoreChunkPool {
byte[] qf5 = Bytes.toBytes("testqualifier5");
byte[] val = Bytes.toBytes("testval");
- MemStore memstore = new MemStore();
+ DefaultMemStore memstore = new DefaultMemStore();
// Setting up memstore
memstore.add(new KeyValue(row, fam, qf1, val));
@@ -115,8 +116,7 @@ public class TestMemStoreChunkPool {
memstore.add(new KeyValue(row, fam, qf3, val));
// Creating a snapshot
- memstore.snapshot();
- KeyValueSkipListSet snapshot = memstore.getSnapshot();
+ SnapshotInfo snapshotInfo = memstore.snapshot();
assertEquals(3, memstore.snapshot.size());
// Adding value to "new" memstore
@@ -124,7 +124,7 @@ public class TestMemStoreChunkPool {
memstore.add(new KeyValue(row, fam, qf4, val));
memstore.add(new KeyValue(row, fam, qf5, val));
assertEquals(2, memstore.kvset.size());
- memstore.clearSnapshot(snapshot);
+ memstore.clearSnapshot(snapshotInfo.getId());
int chunkCount = chunkPool.getPoolSize();
assertTrue(chunkCount > 0);
@@ -145,7 +145,7 @@ public class TestMemStoreChunkPool {
byte[] qf7 = Bytes.toBytes("testqualifier7");
byte[] val = Bytes.toBytes("testval");
- MemStore memstore = new MemStore();
+ DefaultMemStore memstore = new DefaultMemStore();
// Setting up memstore
memstore.add(new KeyValue(row, fam, qf1, val));
@@ -153,8 +153,7 @@ public class TestMemStoreChunkPool {
memstore.add(new KeyValue(row, fam, qf3, val));
// Creating a snapshot
- memstore.snapshot();
- KeyValueSkipListSet snapshot = memstore.getSnapshot();
+ SnapshotInfo snapshotInfo = memstore.snapshot();
assertEquals(3, memstore.snapshot.size());
// Adding value to "new" memstore
@@ -167,7 +166,7 @@ public class TestMemStoreChunkPool {
List scanners = memstore.getScanners(0);
// Shouldn't putting back the chunks to pool,since some scanners are opening
// based on their data
- memstore.clearSnapshot(snapshot);
+ memstore.clearSnapshot(snapshotInfo.getId());
assertTrue(chunkPool.getPoolSize() == 0);
@@ -181,8 +180,7 @@ public class TestMemStoreChunkPool {
chunkPool.clearChunks();
// Creating another snapshot
- memstore.snapshot();
- snapshot = memstore.getSnapshot();
+ snapshotInfo = memstore.snapshot();
// Adding more value
memstore.add(new KeyValue(row, fam, qf6, val));
memstore.add(new KeyValue(row, fam, qf7, val));
@@ -194,7 +192,7 @@ public class TestMemStoreChunkPool {
}
// Since no opening scanner, the chunks of snapshot should be put back to
// pool
- memstore.clearSnapshot(snapshot);
+ memstore.clearSnapshot(snapshotInfo.getId());
assertTrue(chunkPool.getPoolSize() > 0);
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
index 8842b90..ad9f524 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
@@ -114,7 +114,7 @@ public class TestReversibleScanners {
@Test
public void testReversibleMemstoreScanner() throws IOException {
- MemStore memstore = new MemStore();
+ DefaultMemStore memstore = new DefaultMemStore();
writeMemstore(memstore);
List scanners = memstore.getScanners(Long.MAX_VALUE);
seekTestOfReversibleKeyValueScanner(scanners.get(0));
@@ -144,7 +144,7 @@ public class TestReversibleScanners {
TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
hfilePath).withFileContext(hFileContext).build();
- MemStore memstore = new MemStore();
+ DefaultMemStore memstore = new DefaultMemStore();
writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1,
writer2 });
@@ -234,7 +234,7 @@ public class TestReversibleScanners {
TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
hfilePath).withFileContext(hFileContext).build();
- MemStore memstore = new MemStore();
+ DefaultMemStore memstore = new DefaultMemStore();
writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1,
writer2 });
@@ -403,7 +403,7 @@ public class TestReversibleScanners {
verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
}
- private StoreScanner getReversibleStoreScanner(MemStore memstore,
+ private StoreScanner getReversibleStoreScanner(DefaultMemStore memstore,
StoreFile sf1, StoreFile sf2, Scan scan, ScanType scanType,
ScanInfo scanInfo, int readPoint) throws IOException {
List scanners = getScanners(memstore, sf1, sf2, null,
@@ -472,7 +472,7 @@ public class TestReversibleScanners {
assertEquals(null, kvHeap.peek());
}
- private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore,
+ private ReversedKeyValueHeap getReversibleKeyValueHeap(DefaultMemStore memstore,
StoreFile sf1, StoreFile sf2, byte[] startRow, int readPoint)
throws IOException {
List scanners = getScanners(memstore, sf1, sf2, startRow,
@@ -482,7 +482,7 @@ public class TestReversibleScanners {
return kvHeap;
}
- private List getScanners(MemStore memstore, StoreFile sf1,
+ private List getScanners(DefaultMemStore memstore, StoreFile sf1,
StoreFile sf2, byte[] startRow, boolean doSeek, int readPoint)
throws IOException {
List fileScanners = StoreFileScanner
@@ -625,7 +625,7 @@ public class TestReversibleScanners {
}
}
- private static void writeMemstoreAndStoreFiles(MemStore memstore,
+ private static void writeMemstoreAndStoreFiles(DefaultMemStore memstore,
final StoreFile.Writer[] writers) throws IOException {
Random rand = new Random();
try {
@@ -658,7 +658,7 @@ public class TestReversibleScanners {
}
}
- private static void writeMemstore(MemStore memstore) throws IOException {
+ private static void writeMemstore(DefaultMemStore memstore) throws IOException {
// Add half of the keyvalues to memstore
for (int i = 0; i < ROWSIZE; i++) {
for (int j = 0; j < QUALSIZE; j++) {
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 793b839..0e35393 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -73,7 +72,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
-import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.util.Progressable;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
@@ -442,7 +440,7 @@ public class TestStore extends TestCase {
this.store.snapshot();
flushStore(store, id++);
assertEquals(storeFilessize, this.store.getStorefiles().size());
- assertEquals(0, this.store.memstore.kvset.size());
+ assertEquals(0, ((DefaultMemStore)this.store.memstore).kvset.size());
}
private void assertCheck() {
@@ -452,189 +450,12 @@ public class TestStore extends TestCase {
}
}
- //////////////////////////////////////////////////////////////////////////////
- // IncrementColumnValue tests
- //////////////////////////////////////////////////////////////////////////////
- /*
- * test the internal details of how ICV works, especially during a flush scenario.
- */
- public void testIncrementColumnValue_ICVDuringFlush()
- throws IOException, InterruptedException {
- init(this.getName());
-
- long oldValue = 1L;
- long newValue = 3L;
- this.store.add(new KeyValue(row, family, qf1,
- System.currentTimeMillis(),
- Bytes.toBytes(oldValue)));
-
- // snapshot the store.
- this.store.snapshot();
-
- // add other things:
- this.store.add(new KeyValue(row, family, qf2,
- System.currentTimeMillis(),
- Bytes.toBytes(oldValue)));
-
- // update during the snapshot.
- long ret = this.store.updateColumnValue(row, family, qf1, newValue);
-
- // memstore should have grown by some amount.
- assertTrue(ret > 0);
-
- // then flush.
- flushStore(store, id++);
- assertEquals(1, this.store.getStorefiles().size());
- // from the one we inserted up there, and a new one
- assertEquals(2, this.store.memstore.kvset.size());
-
- // how many key/values for this row are there?
- Get get = new Get(row);
- get.addColumn(family, qf1);
- get.setMaxVersions(); // all versions.
- List results = new ArrayList();
-
- results = HBaseTestingUtility.getFromStoreFile(store, get);
- assertEquals(2, results.size());
-
- long ts1 = results.get(0).getTimestamp();
- long ts2 = results.get(1).getTimestamp();
-
- assertTrue(ts1 > ts2);
-
- assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
- assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
- }
-
@Override
protected void tearDown() throws Exception {
super.tearDown();
EnvironmentEdgeManagerTestHelper.reset();
}
- public void testICV_negMemstoreSize() throws IOException {
- init(this.getName());
-
- long time = 100;
- ManualEnvironmentEdge ee = new ManualEnvironmentEdge();
- ee.setValue(time);
- EnvironmentEdgeManagerTestHelper.injectEdge(ee);
- long newValue = 3L;
- long size = 0;
-
-
- size += this.store.add(new KeyValue(Bytes.toBytes("200909091000"), family, qf1,
- System.currentTimeMillis(),
- Bytes.toBytes(newValue)));
- size += this.store.add(new KeyValue(Bytes.toBytes("200909091200"), family, qf1,
- System.currentTimeMillis(),
- Bytes.toBytes(newValue)));
- size += this.store.add(new KeyValue(Bytes.toBytes("200909091300"), family, qf1,
- System.currentTimeMillis(),
- Bytes.toBytes(newValue)));
- size += this.store.add(new KeyValue(Bytes.toBytes("200909091400"), family, qf1,
- System.currentTimeMillis(),
- Bytes.toBytes(newValue)));
- size += this.store.add(new KeyValue(Bytes.toBytes("200909091500"), family, qf1,
- System.currentTimeMillis(),
- Bytes.toBytes(newValue)));
-
-
- for ( int i = 0 ; i < 10000 ; ++i) {
- newValue++;
-
- long ret = this.store.updateColumnValue(row, family, qf1, newValue);
- long ret2 = this.store.updateColumnValue(row2, family, qf1, newValue);
-
- if (ret != 0) System.out.println("ret: " + ret);
- if (ret2 != 0) System.out.println("ret2: " + ret2);
-
- assertTrue("ret: " + ret, ret >= 0);
- size += ret;
- assertTrue("ret2: " + ret2, ret2 >= 0);
- size += ret2;
-
-
- if (i % 1000 == 0)
- ee.setValue(++time);
- }
-
- long computedSize=0;
- for (KeyValue kv : this.store.memstore.kvset) {
- long kvsize = MemStore.heapSizeChange(kv, true);
- //System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize());
- computedSize += kvsize;
- }
- assertEquals(computedSize, size);
- }
-
- public void testIncrementColumnValue_SnapshotFlushCombo() throws Exception {
- ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
- EnvironmentEdgeManagerTestHelper.injectEdge(mee);
- init(this.getName());
-
- long oldValue = 1L;
- long newValue = 3L;
- this.store.add(new KeyValue(row, family, qf1,
- EnvironmentEdgeManager.currentTimeMillis(),
- Bytes.toBytes(oldValue)));
-
- // snapshot the store.
- this.store.snapshot();
-
- // update during the snapshot, the exact same TS as the Put (lololol)
- long ret = this.store.updateColumnValue(row, family, qf1, newValue);
-
- // memstore should have grown by some amount.
- assertTrue(ret > 0);
-
- // then flush.
- flushStore(store, id++);
- assertEquals(1, this.store.getStorefiles().size());
- assertEquals(1, this.store.memstore.kvset.size());
-
- // now increment again:
- newValue += 1;
- this.store.updateColumnValue(row, family, qf1, newValue);
-
- // at this point we have a TS=1 in snapshot, and a TS=2 in kvset, so increment again:
- newValue += 1;
- this.store.updateColumnValue(row, family, qf1, newValue);
-
- // the second TS should be TS=2 or higher., even though 'time=1' right now.
-
-
- // how many key/values for this row are there?
- Get get = new Get(row);
- get.addColumn(family, qf1);
- get.setMaxVersions(); // all versions.
- List results = new ArrayList| ();
-
- results = HBaseTestingUtility.getFromStoreFile(store, get);
- assertEquals(2, results.size());
-
- long ts1 = results.get(0).getTimestamp();
- long ts2 = results.get(1).getTimestamp();
-
- assertTrue(ts1 > ts2);
- assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
- assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
-
- mee.setValue(2); // time goes up slightly
- newValue += 1;
- this.store.updateColumnValue(row, family, qf1, newValue);
-
- results = HBaseTestingUtility.getFromStoreFile(store, get);
- assertEquals(2, results.size());
-
- ts1 = results.get(0).getTimestamp();
- ts2 = results.get(1).getTimestamp();
-
- assertTrue(ts1 > ts2);
- assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
- assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
- }
-
public void testHandleErrorsInFlush() throws Exception {
LOG.info("Setting up a faulty file system that cannot write");
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index 5aeb31b..5bed885 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -26,7 +26,6 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
-import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -69,10 +68,12 @@ import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.regionserver.MemStore.SnapshotInfo;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
@@ -560,14 +561,14 @@ public class TestWALReplay {
super(conf, store);
}
@Override
- public List flushSnapshot(SortedSet snapshot, long cacheFlushId,
- TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, MonitoredTask status)
- throws IOException {
+ public List flushSnapshot(KeyValueScanner snapshotScanner, SnapshotInfo snapshotInfo,
+ long cacheFlushId, TimeRangeTracker snapshotTimeRangeTracker, MonitoredTask status)
+ throws IOException {
if (throwExceptionWhenFlushing.get()) {
throw new IOException("Simulated exception by tests");
}
- return super.flushSnapshot(snapshot, cacheFlushId, snapshotTimeRangeTracker,
- flushedSize, status);
+ return super.flushSnapshot(snapshotScanner, snapshotInfo, cacheFlushId,
+ snapshotTimeRangeTracker, status);
}
};
| | | | | | | | | | | | | |