diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java new file mode 100644 index 0000000..99052d2 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -0,0 +1,1006 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; +import java.rmi.UnexpectedException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableSet; +import java.util.SortedSet; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.CollectionBackedScanner; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * The MemStore holds in-memory modifications to the Store. Modifications + * are {@link KeyValue}s. When asked to flush, current memstore is moved + * to snapshot and is cleared. We continue to serve edits out of new memstore + * and backing snapshot until flusher reports in that the flush succeeded. At + * this point we let the snapshot go. + *

+ * The MemStore functions should not be called in parallel. Callers should hold + * write and read locks. This is done in {@link HStore}. + *

+ * + * TODO: Adjust size of the memstore when we remove items because they have + * been deleted. + * TODO: With new KVSLS, need to make sure we update HeapSize with difference + * in KV size. + */ +@InterfaceAudience.Private +public class DefaultMemStore implements MemStore { + private static final Log LOG = LogFactory.getLog(DefaultMemStore.class); + + static final String USEMSLAB_KEY = + "hbase.hregion.memstore.mslab.enabled"; + private static final boolean USEMSLAB_DEFAULT = true; + + private Configuration conf; + + // MemStore. Use a KeyValueSkipListSet rather than SkipListSet because of the + // better semantics. The Map will overwrite if passed a key it already had + // whereas the Set will not add new KV if key is same though value might be + // different. Value is not important -- just make sure always same + // reference passed. + volatile KeyValueSkipListSet kvset; + + // Snapshot of memstore. Made for flusher. + volatile KeyValueSkipListSet snapshot; + + final KeyValue.KVComparator comparator; + + // Used to track own heapSize + final AtomicLong size; + + // Used to track when to flush + volatile long timeOfOldestEdit = Long.MAX_VALUE; + + TimeRangeTracker timeRangeTracker; + TimeRangeTracker snapshotTimeRangeTracker; + + MemStoreChunkPool chunkPool; + volatile MemStoreLAB allocator; + volatile MemStoreLAB snapshotAllocator; + volatile SnapshotInfo snapshotInfo; + + /** + * Default constructor. Used for tests. + */ + public DefaultMemStore() { + this(HBaseConfiguration.create(), KeyValue.COMPARATOR); + } + + /** + * Constructor. + * @param c Comparator + */ + public DefaultMemStore(final Configuration conf, + final KeyValue.KVComparator c) { + this.conf = conf; + this.comparator = c; + this.kvset = new KeyValueSkipListSet(c); + this.snapshot = new KeyValueSkipListSet(c); + timeRangeTracker = new TimeRangeTracker(); + snapshotTimeRangeTracker = new TimeRangeTracker(); + this.size = new AtomicLong(DEEP_OVERHEAD); + if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) { + this.chunkPool = MemStoreChunkPool.getPool(conf); + this.allocator = new MemStoreLAB(conf, chunkPool); + } else { + this.allocator = null; + this.chunkPool = null; + } + } + + void dump() { + for (KeyValue kv: this.kvset) { + LOG.info(kv); + } + for (KeyValue kv: this.snapshot) { + LOG.info(kv); + } + } + + /** + * Creates a snapshot of the current memstore. + */ + @Override + public SnapshotInfo snapshot() { + // If snapshot currently has entries, then flusher failed or didn't call + // cleanup. Log a warning. + if (!this.snapshot.isEmpty()) { + LOG.warn("Snapshot called again without clearing previous. " + + "Doing nothing. Another ongoing flush or did we fail last attempt?"); + } else { + long snapshotSize = 0; + if (!this.kvset.isEmpty()) { + this.snapshot = this.kvset; + this.kvset = new KeyValueSkipListSet(this.comparator); + this.snapshotTimeRangeTracker = this.timeRangeTracker; + this.timeRangeTracker = new TimeRangeTracker(); + snapshotSize = this.size.get(); + // Reset heap to not include any keys + this.size.set(DEEP_OVERHEAD); + this.snapshotAllocator = this.allocator; + // Reset allocator so we get a fresh buffer for the new memstore + if (allocator != null) { + this.allocator = new MemStoreLAB(conf, chunkPool); + } + timeOfOldestEdit = Long.MAX_VALUE; + } + this.snapshotInfo = new SnapshotInfo(EnvironmentEdgeManager.currentTimeMillis(), + snapshot.size(), snapshotSize); + } + return this.snapshotInfo; + } + + @Override + public KeyValueScanner getSnapshotScanner() { + return new CollectionBackedScanner(snapshot, this.comparator); + } + + /** + * The passed snapshot was successfully persisted; it can be let go. + * @param id Id of the snapshot to clean out. + * @throws UnexpectedException + * @see #snapshot() + */ + @Override + public void clearSnapshot(long id) throws UnexpectedException { + MemStoreLAB tmpAllocator = null; + if (this.snapshotInfo.getId() != id) { + throw new UnexpectedException("Current snapshot id is " + this.snapshotInfo.getId() + + ",passed " + id); + } + // OK. Passed in snapshot is same as current snapshot. If not-empty, + // create a new snapshot and let the old one go. + if (!this.snapshot.isEmpty()) { + this.snapshot = new KeyValueSkipListSet(this.comparator); + this.snapshotTimeRangeTracker = new TimeRangeTracker(); + } + if (this.snapshotAllocator != null) { + tmpAllocator = this.snapshotAllocator; + this.snapshotAllocator = null; + } + if (tmpAllocator != null) { + tmpAllocator.close(); + } + } + + /** + * Write an update + * @param cell + * @return approximate size of the passed key and value. + */ + @Override + public long add(Cell cell) { + KeyValue toAdd = maybeCloneWithAllocator(KeyValueUtil.ensureKeyValue(cell)); + return internalAdd(toAdd); + } + + @Override + public long timeOfOldestEdit() { + return timeOfOldestEdit; + } + + private boolean addToKVSet(KeyValue e) { + boolean b = this.kvset.add(e); + setOldestEditTimeToNow(); + return b; + } + + private boolean removeFromKVSet(KeyValue e) { + boolean b = this.kvset.remove(e); + setOldestEditTimeToNow(); + return b; + } + + void setOldestEditTimeToNow() { + if (timeOfOldestEdit == Long.MAX_VALUE) { + timeOfOldestEdit = EnvironmentEdgeManager.currentTimeMillis(); + } + } + + /** + * Internal version of add() that doesn't clone KVs with the + * allocator, and doesn't take the lock. + * + * Callers should ensure they already have the read lock taken + */ + private long internalAdd(final KeyValue toAdd) { + long s = heapSizeChange(toAdd, addToKVSet(toAdd)); + timeRangeTracker.includeTimestamp(toAdd); + this.size.addAndGet(s); + return s; + } + + private KeyValue maybeCloneWithAllocator(KeyValue kv) { + if (allocator == null) { + return kv; + } + + int len = kv.getLength(); + Allocation alloc = allocator.allocateBytes(len); + if (alloc == null) { + // The allocation was too large, allocator decided + // not to do anything with it. + return kv; + } + assert alloc.getData() != null; + System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len); + KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len); + newKv.setMvccVersion(kv.getMvccVersion()); + return newKv; + } + + /** + * Remove n key from the memstore. Only kvs that have the same key and the + * same memstoreTS are removed. It is ok to not update timeRangeTracker + * in this call. It is possible that we can optimize this method by using + * tailMap/iterator, but since this method is called rarely (only for + * error recovery), we can leave those optimization for the future. + * @param cell + */ + @Override + public void rollback(Cell cell) { + // If the key is in the snapshot, delete it. We should not update + // this.size, because that tracks the size of only the memstore and + // not the snapshot. The flush of this snapshot to disk has not + // yet started because Store.flush() waits for all rwcc transactions to + // commit before starting the flush to disk. + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + KeyValue found = this.snapshot.get(kv); + if (found != null && found.getMvccVersion() == kv.getMvccVersion()) { + this.snapshot.remove(kv); + } + // If the key is in the memstore, delete it. Update this.size. + found = this.kvset.get(kv); + if (found != null && found.getMvccVersion() == kv.getMvccVersion()) { + removeFromKVSet(kv); + long s = heapSizeChange(kv, true); + this.size.addAndGet(-s); + } + } + + /** + * Write a delete + * @param deleteCell + * @return approximate size of the passed key and value. + */ + @Override + public long delete(Cell deleteCell) { + long s = 0; + KeyValue toAdd = maybeCloneWithAllocator(KeyValueUtil.ensureKeyValue(deleteCell)); + s += heapSizeChange(toAdd, addToKVSet(toAdd)); + timeRangeTracker.includeTimestamp(toAdd); + this.size.addAndGet(s); + return s; + } + + /** + * @param kv Find the row that comes after this one. If null, we return the + * first. + * @return Next row or null if none found. + */ + KeyValue getNextRow(final KeyValue kv) { + return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot)); + } + + /* + * @param a + * @param b + * @return Return lowest of a or b or null if both a and b are null + */ + private KeyValue getLowest(final KeyValue a, final KeyValue b) { + if (a == null) { + return b; + } + if (b == null) { + return a; + } + return comparator.compareRows(a, b) <= 0? a: b; + } + + /* + * @param key Find row that follows this one. If null, return first. + * @param map Set to look in for a row beyond row. + * @return Next row or null if none found. If one found, will be a new + * KeyValue -- can be destroyed by subsequent calls to this method. + */ + private KeyValue getNextRow(final KeyValue key, + final NavigableSet set) { + KeyValue result = null; + SortedSet tail = key == null? set: set.tailSet(key); + // Iterate until we fall into the next row; i.e. move off current row + for (KeyValue kv: tail) { + if (comparator.compareRows(kv, key) <= 0) + continue; + // Note: Not suppressing deletes or expired cells. Needs to be handled + // by higher up functions. + result = kv; + break; + } + return result; + } + + /** + * @param state column/delete tracking state + */ + @Override + public void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) { + getRowKeyAtOrBefore(kvset, state); + getRowKeyAtOrBefore(snapshot, state); + } + + /* + * @param set + * @param state Accumulates deletes and candidates. + */ + private void getRowKeyAtOrBefore(final NavigableSet set, + final GetClosestRowBeforeTracker state) { + if (set.isEmpty()) { + return; + } + if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) { + // Found nothing in row. Try backing up. + getRowKeyBefore(set, state); + } + } + + /* + * Walk forward in a row from firstOnRow. Presumption is that + * we have been passed the first possible key on a row. As we walk forward + * we accumulate deletes until we hit a candidate on the row at which point + * we return. + * @param set + * @param firstOnRow First possible key on this row. + * @param state + * @return True if we found a candidate walking this row. + */ + private boolean walkForwardInSingleRow(final SortedSet set, + final KeyValue firstOnRow, final GetClosestRowBeforeTracker state) { + boolean foundCandidate = false; + SortedSet tail = set.tailSet(firstOnRow); + if (tail.isEmpty()) return foundCandidate; + for (Iterator i = tail.iterator(); i.hasNext();) { + KeyValue kv = i.next(); + // Did we go beyond the target row? If so break. + if (state.isTooFar(kv, firstOnRow)) break; + if (state.isExpired(kv)) { + i.remove(); + continue; + } + // If we added something, this row is a contender. break. + if (state.handle(kv)) { + foundCandidate = true; + break; + } + } + return foundCandidate; + } + + /* + * Walk backwards through the passed set a row at a time until we run out of + * set or until we get a candidate. + * @param set + * @param state + */ + private void getRowKeyBefore(NavigableSet set, + final GetClosestRowBeforeTracker state) { + KeyValue firstOnRow = state.getTargetKey(); + for (Member p = memberOfPreviousRow(set, state, firstOnRow); + p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) { + // Make sure we don't fall out of our table. + if (!state.isTargetTable(p.kv)) break; + // Stop looking if we've exited the better candidate range. + if (!state.isBetterCandidate(p.kv)) break; + // Make into firstOnRow + firstOnRow = new KeyValue(p.kv.getRowArray(), p.kv.getRowOffset(), p.kv.getRowLength(), + HConstants.LATEST_TIMESTAMP); + // If we find something, break; + if (walkForwardInSingleRow(p.set, firstOnRow, state)) break; + } + } + + /** + * Update or insert the specified KeyValues. + *

+ * For each KeyValue, insert into MemStore. This will atomically upsert the + * value for that row/family/qualifier. If a KeyValue did already exist, + * it will then be removed. + *

+ * Currently the memstoreTS is kept at 0 so as each insert happens, it will + * be immediately visible. May want to change this so it is atomic across + * all KeyValues. + *

+ * This is called under row lock, so Get operations will still see updates + * atomically. Scans will only see each KeyValue update as atomic. + * + * @param cells + * @param readpoint readpoint below which we can safely remove duplicate KVs + * @return change in memstore size + */ + @Override + public long upsert(Iterable cells, long readpoint) { + long size = 0; + for (Cell cell : cells) { + size += upsert(cell, readpoint); + } + return size; + } + + /** + * Inserts the specified KeyValue into MemStore and deletes any existing + * versions of the same row/family/qualifier as the specified KeyValue. + *

+ * First, the specified KeyValue is inserted into the Memstore. + *

+ * If there are any existing KeyValues in this MemStore with the same row, + * family, and qualifier, they are removed. + *

+ * Callers must hold the read lock. + * + * @param cell + * @return change in size of MemStore + */ + private long upsert(Cell cell, long readpoint) { + // Add the KeyValue to the MemStore + // Use the internalAdd method here since we (a) already have a lock + // and (b) cannot safely use the MSLAB here without potentially + // hitting OOME - see TestMemStore.testUpsertMSLAB for a + // test that triggers the pathological case if we don't avoid MSLAB + // here. + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + long addedSize = internalAdd(kv); + + // Get the KeyValues for the row/family/qualifier regardless of timestamp. + // For this case we want to clean up any other puts + KeyValue firstKv = KeyValue.createFirstOnRow( + kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), + kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), + kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()); + SortedSet ss = kvset.tailSet(firstKv); + Iterator it = ss.iterator(); + // versions visible to oldest scanner + int versionsVisible = 0; + while ( it.hasNext() ) { + KeyValue cur = it.next(); + + if (kv == cur) { + // ignore the one just put in + continue; + } + // check that this is the row and column we are interested in, otherwise bail + if (kv.matchingRow(cur) && kv.matchingQualifier(cur)) { + // only remove Puts that concurrent scanners cannot possibly see + if (cur.getTypeByte() == KeyValue.Type.Put.getCode() && + cur.getMvccVersion() <= readpoint) { + if (versionsVisible > 1) { + // if we get here we have seen at least one version visible to the oldest scanner, + // which means we can prove that no scanner will see this version + + // false means there was a change, so give us the size. + long delta = heapSizeChange(cur, true); + addedSize -= delta; + this.size.addAndGet(-delta); + it.remove(); + setOldestEditTimeToNow(); + } else { + versionsVisible++; + } + } + } else { + // past the row or column, done + break; + } + } + return addedSize; + } + + /* + * Immutable data structure to hold member found in set and the set it was + * found in. Include set because it is carrying context. + */ + private static class Member { + final KeyValue kv; + final NavigableSet set; + Member(final NavigableSet s, final KeyValue kv) { + this.kv = kv; + this.set = s; + } + } + + /* + * @param set Set to walk back in. Pass a first in row or we'll return + * same row (loop). + * @param state Utility and context. + * @param firstOnRow First item on the row after the one we want to find a + * member in. + * @return Null or member of row previous to firstOnRow + */ + private Member memberOfPreviousRow(NavigableSet set, + final GetClosestRowBeforeTracker state, final KeyValue firstOnRow) { + NavigableSet head = set.headSet(firstOnRow, false); + if (head.isEmpty()) return null; + for (Iterator i = head.descendingIterator(); i.hasNext();) { + KeyValue found = i.next(); + if (state.isExpired(found)) { + i.remove(); + continue; + } + return new Member(head, found); + } + return null; + } + + /** + * @return scanner on memstore and snapshot in this order. + */ + @Override + public List getScanners(long readPt) { + return Collections. singletonList(new MemStoreScanner(readPt)); + } + + /** + * Check if this memstore may contain the required keys + * @param scan + * @return False if the key definitely does not exist in this Memstore + */ + public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) { + return (timeRangeTracker.includesTimeRange(scan.getTimeRange()) || + snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange())) + && (Math.max(timeRangeTracker.getMaximumTimestamp(), + snapshotTimeRangeTracker.getMaximumTimestamp()) >= + oldestUnexpiredTS); + } + + public TimeRangeTracker getSnapshotTimeRangeTracker() { + return this.snapshotTimeRangeTracker; + } + + /* + * MemStoreScanner implements the KeyValueScanner. + * It lets the caller scan the contents of a memstore -- both current + * map and snapshot. + * This behaves as if it were a real scanner but does not maintain position. + */ + protected class MemStoreScanner extends NonLazyKeyValueScanner { + // Next row information for either kvset or snapshot + private KeyValue kvsetNextRow = null; + private KeyValue snapshotNextRow = null; + + // last iterated KVs for kvset and snapshot (to restore iterator state after reseek) + private KeyValue kvsetItRow = null; + private KeyValue snapshotItRow = null; + + // iterator based scanning. + private Iterator kvsetIt; + private Iterator snapshotIt; + + // The kvset and snapshot at the time of creating this scanner + private KeyValueSkipListSet kvsetAtCreation; + private KeyValueSkipListSet snapshotAtCreation; + + // the pre-calculated KeyValue to be returned by peek() or next() + private KeyValue theNext; + + // The allocator and snapshot allocator at the time of creating this scanner + volatile MemStoreLAB allocatorAtCreation; + volatile MemStoreLAB snapshotAllocatorAtCreation; + + // A flag represents whether could stop skipping KeyValues for MVCC + // if have encountered the next row. Only used for reversed scan + private boolean stopSkippingKVsIfNextRow = false; + + private long readPoint; + + /* + Some notes... + + So memstorescanner is fixed at creation time. this includes pointers/iterators into + existing kvset/snapshot. during a snapshot creation, the kvset is null, and the + snapshot is moved. since kvset is null there is no point on reseeking on both, + we can save us the trouble. During the snapshot->hfile transition, the memstore + scanner is re-created by StoreScanner#updateReaders(). StoreScanner should + potentially do something smarter by adjusting the existing memstore scanner. + + But there is a greater problem here, that being once a scanner has progressed + during a snapshot scenario, we currently iterate past the kvset then 'finish' up. + if a scan lasts a little while, there is a chance for new entries in kvset to + become available but we will never see them. This needs to be handled at the + StoreScanner level with coordination with MemStoreScanner. + + Currently, this problem is only partly managed: during the small amount of time + when the StoreScanner has not yet created a new MemStoreScanner, we will miss + the adds to kvset in the MemStoreScanner. + */ + + MemStoreScanner(long readPoint) { + super(); + + this.readPoint = readPoint; + kvsetAtCreation = kvset; + snapshotAtCreation = snapshot; + if (allocator != null) { + this.allocatorAtCreation = allocator; + this.allocatorAtCreation.incScannerCount(); + } + if (snapshotAllocator != null) { + this.snapshotAllocatorAtCreation = snapshotAllocator; + this.snapshotAllocatorAtCreation.incScannerCount(); + } + } + + private KeyValue getNext(Iterator it) { + KeyValue startKV = theNext; + KeyValue v = null; + try { + while (it.hasNext()) { + v = it.next(); + if (v.getMvccVersion() <= this.readPoint) { + return v; + } + if (stopSkippingKVsIfNextRow && startKV != null + && comparator.compareRows(v, startKV) > 0) { + return null; + } + } + + return null; + } finally { + if (v != null) { + // in all cases, remember the last KV iterated to + if (it == snapshotIt) { + snapshotItRow = v; + } else { + kvsetItRow = v; + } + } + } + } + + /** + * Set the scanner at the seek key. + * Must be called only once: there is no thread safety between the scanner + * and the memStore. + * @param key seek value + * @return false if the key is null or if there is no data + */ + @Override + public synchronized boolean seek(KeyValue key) { + if (key == null) { + close(); + return false; + } + + // kvset and snapshot will never be null. + // if tailSet can't find anything, SortedSet is empty (not null). + kvsetIt = kvsetAtCreation.tailSet(key).iterator(); + snapshotIt = snapshotAtCreation.tailSet(key).iterator(); + kvsetItRow = null; + snapshotItRow = null; + + return seekInSubLists(key); + } + + + /** + * (Re)initialize the iterators after a seek or a reseek. + */ + private synchronized boolean seekInSubLists(KeyValue key){ + kvsetNextRow = getNext(kvsetIt); + snapshotNextRow = getNext(snapshotIt); + + // Calculate the next value + theNext = getLowest(kvsetNextRow, snapshotNextRow); + + // has data + return (theNext != null); + } + + + /** + * Move forward on the sub-lists set previously by seek. + * @param key seek value (should be non-null) + * @return true if there is at least one KV to read, false otherwise + */ + @Override + public synchronized boolean reseek(KeyValue key) { + /* + See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation. + This code is executed concurrently with flush and puts, without locks. + Two points must be known when working on this code: + 1) It's not possible to use the 'kvTail' and 'snapshot' + variables, as they are modified during a flush. + 2) The ideal implementation for performance would use the sub skip list + implicitly pointed by the iterators 'kvsetIt' and + 'snapshotIt'. Unfortunately the Java API does not offer a method to + get it. So we remember the last keys we iterated to and restore + the reseeked set to at least that point. + */ + + kvsetIt = kvsetAtCreation.tailSet(getHighest(key, kvsetItRow)).iterator(); + snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator(); + + return seekInSubLists(key); + } + + + @Override + public synchronized KeyValue peek() { + //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest()); + return theNext; + } + + @Override + public synchronized KeyValue next() { + if (theNext == null) { + return null; + } + + final KeyValue ret = theNext; + + // Advance one of the iterators + if (theNext == kvsetNextRow) { + kvsetNextRow = getNext(kvsetIt); + } else { + snapshotNextRow = getNext(snapshotIt); + } + + // Calculate the next value + theNext = getLowest(kvsetNextRow, snapshotNextRow); + + //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint(); + //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " + + // getLowest() + " threadpoint=" + readpoint); + return ret; + } + + /* + * Returns the lower of the two key values, or null if they are both null. + * This uses comparator.compare() to compare the KeyValue using the memstore + * comparator. + */ + private KeyValue getLowest(KeyValue first, KeyValue second) { + if (first == null && second == null) { + return null; + } + if (first != null && second != null) { + int compare = comparator.compare(first, second); + return (compare <= 0 ? first : second); + } + return (first != null ? first : second); + } + + /* + * Returns the higher of the two key values, or null if they are both null. + * This uses comparator.compare() to compare the KeyValue using the memstore + * comparator. + */ + private KeyValue getHighest(KeyValue first, KeyValue second) { + if (first == null && second == null) { + return null; + } + if (first != null && second != null) { + int compare = comparator.compare(first, second); + return (compare > 0 ? first : second); + } + return (first != null ? first : second); + } + + public synchronized void close() { + this.kvsetNextRow = null; + this.snapshotNextRow = null; + + this.kvsetIt = null; + this.snapshotIt = null; + + if (allocatorAtCreation != null) { + this.allocatorAtCreation.decScannerCount(); + this.allocatorAtCreation = null; + } + if (snapshotAllocatorAtCreation != null) { + this.snapshotAllocatorAtCreation.decScannerCount(); + this.snapshotAllocatorAtCreation = null; + } + + this.kvsetItRow = null; + this.snapshotItRow = null; + } + + /** + * MemStoreScanner returns max value as sequence id because it will + * always have the latest data among all files. + */ + @Override + public long getSequenceID() { + return Long.MAX_VALUE; + } + + @Override + public boolean shouldUseScanner(Scan scan, SortedSet columns, + long oldestUnexpiredTS) { + return shouldSeek(scan, oldestUnexpiredTS); + } + + /** + * Seek scanner to the given key first. If it returns false(means + * peek()==null) or scanner's peek row is bigger than row of given key, seek + * the scanner to the previous row of given key + */ + @Override + public synchronized boolean backwardSeek(KeyValue key) { + seek(key); + if (peek() == null || comparator.compareRows(peek(), key) > 0) { + return seekToPreviousRow(key); + } + return true; + } + + /** + * Separately get the KeyValue before the specified key from kvset and + * snapshotset, and use the row of higher one as the previous row of + * specified key, then seek to the first KeyValue of previous row + */ + @Override + public synchronized boolean seekToPreviousRow(KeyValue key) { + KeyValue firstKeyOnRow = KeyValue.createFirstOnRow(key.getRow()); + SortedSet kvHead = kvsetAtCreation.headSet(firstKeyOnRow); + KeyValue kvsetBeforeRow = kvHead.isEmpty() ? null : kvHead.last(); + SortedSet snapshotHead = snapshotAtCreation + .headSet(firstKeyOnRow); + KeyValue snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead + .last(); + KeyValue lastKVBeforeRow = getHighest(kvsetBeforeRow, snapshotBeforeRow); + if (lastKVBeforeRow == null) { + theNext = null; + return false; + } + KeyValue firstKeyOnPreviousRow = KeyValue + .createFirstOnRow(lastKVBeforeRow.getRow()); + this.stopSkippingKVsIfNextRow = true; + seek(firstKeyOnPreviousRow); + this.stopSkippingKVsIfNextRow = false; + if (peek() == null + || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) { + return seekToPreviousRow(lastKVBeforeRow); + } + return true; + } + + @Override + public synchronized boolean seekToLastRow() { + KeyValue first = kvsetAtCreation.isEmpty() ? null : kvsetAtCreation + .last(); + KeyValue second = snapshotAtCreation.isEmpty() ? null + : snapshotAtCreation.last(); + KeyValue higherKv = getHighest(first, second); + if (higherKv == null) { + return false; + } + KeyValue firstKvOnLastRow = KeyValue.createFirstOnRow(higherKv.getRow()); + if (seek(firstKvOnLastRow)) { + return true; + } else { + return seekToPreviousRow(higherKv); + } + + } + } + + public final static long FIXED_OVERHEAD = ClassSize.align( + ClassSize.OBJECT + (11 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); + + public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + + ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) + + (2 * ClassSize.KEYVALUE_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP)); + + /* + * Calculate how the MemStore size has changed. Includes overhead of the + * backing Map. + * @param kv + * @param notpresent True if the kv was NOT present in the set. + * @return Size + */ + private static long heapSizeChange(final KeyValue kv, final boolean notpresent) { + return notpresent ? + ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()): + 0; + } + + /** + * Get the entire heap usage for this MemStore not including keys in the + * snapshot. + */ + @Override + public long heapSize() { + long heapSize = size.get(); + if (this.snapshotInfo != null) { + heapSize += this.snapshotInfo.heapSize(); + } + return heapSize; + } + + @Override + public long size() { + return heapSize(); + } + + /** + * Code to help figure if our approximation of object heap sizes is close + * enough. See hbase-900. Fills memstores then waits so user can heap + * dump and bring up resultant hprof in something like jprofiler which + * allows you get 'deep size' on objects. + * @param args main args + */ + public static void main(String [] args) { + RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); + LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" + + runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion()); + LOG.info("vmInputArguments=" + runtime.getInputArguments()); + DefaultMemStore memstore1 = new DefaultMemStore(); + // TODO: x32 vs x64 + long size = 0; + final int count = 10000; + byte [] fam = Bytes.toBytes("col"); + byte [] qf = Bytes.toBytes("umn"); + byte [] empty = new byte[0]; + for (int i = 0; i < count; i++) { + // Give each its own ts + size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty)); + } + LOG.info("memstore1 estimated size=" + size); + for (int i = 0; i < count; i++) { + size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty)); + } + LOG.info("memstore1 estimated size (2nd loading of same data)=" + size); + // Make a variably sized memstore. + DefaultMemStore memstore2 = new DefaultMemStore(); + for (int i = 0; i < count; i++) { + size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, + new byte[i])); + } + LOG.info("memstore2 estimated size=" + size); + final int seconds = 30; + LOG.info("Waiting " + seconds + " seconds while heap dump is taken"); + for (int i = 0; i < seconds; i++) { + // Thread.sleep(1000); + } + LOG.info("Exiting."); + } + +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java index a5837c2..7d30b18 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java @@ -21,16 +21,13 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.SortedSet; -import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.util.CollectionBackedScanner; +import org.apache.hadoop.hbase.regionserver.MemStore.SnapshotInfo; import org.apache.hadoop.util.StringUtils; /** @@ -45,21 +42,21 @@ public class DefaultStoreFlusher extends StoreFlusher { } @Override - public List flushSnapshot(SortedSet snapshot, long cacheFlushId, - TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, - MonitoredTask status) throws IOException { + public List flushSnapshot(KeyValueScanner snapshotScanner, SnapshotInfo snapshotInfo, + long cacheFlushId, TimeRangeTracker snapshotTimeRangeTracker, MonitoredTask status) + throws IOException { ArrayList result = new ArrayList(); - if (snapshot.size() == 0) return result; // don't flush if there are no entries + int cellsCount = snapshotInfo.getCellsCount(); + if (cellsCount == 0) return result; // don't flush if there are no entries // Use a store scanner to find which rows to flush. long smallestReadPoint = store.getSmallestReadPoint(); - InternalScanner scanner = createScanner(snapshot, smallestReadPoint); + InternalScanner scanner = createScanner(snapshotScanner, smallestReadPoint); if (scanner == null) { return result; // NULL scanner returned from coprocessor hooks means skip normal processing } StoreFile.Writer writer; - long flushed = 0; try { // TODO: We can fail in the below block before we complete adding this flush to // list of store files. Add cleanup of anything put on filesystem if we fail. @@ -67,20 +64,19 @@ public class DefaultStoreFlusher extends StoreFlusher { status.setStatus("Flushing " + store + ": creating writer"); // Write the map out to the disk writer = store.createWriterInTmp( - snapshot.size(), store.getFamily().getCompression(), false, true, true); + cellsCount, store.getFamily().getCompression(), false, true, true); writer.setTimeRangeTracker(snapshotTimeRangeTracker); try { - flushed = performFlush(scanner, writer, smallestReadPoint); + performFlush(scanner, writer, smallestReadPoint); } finally { finalizeWriter(writer, cacheFlushId, status); } } } finally { - flushedSize.set(flushed); scanner.close(); } LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize=" - + StringUtils.humanReadableInt(flushed) + + + StringUtils.humanReadableInt(snapshotInfo.getSize()) + ", hasBloomFilter=" + writer.hasGeneralBloom() + ", into tmp file " + writer.getPath()); result.add(writer.getPath()); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index bd9d791..85a7c82 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; +import org.apache.hadoop.hbase.regionserver.MemStore.SnapshotInfo; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -83,6 +84,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; @@ -116,6 +118,7 @@ import com.google.common.collect.Lists; */ @InterfaceAudience.Private public class HStore implements Store { + private static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class"; public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY = "hbase.server.compactchecker.interval.multiplier"; public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles"; @@ -224,7 +227,9 @@ public class HStore implements Store { // Why not just pass a HColumnDescriptor in here altogether? Even if have // to clone it? scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator); - this.memstore = new MemStore(conf, this.comparator); + String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName()); + this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { + Configuration.class, KeyValue.KVComparator.class }, new Object[] { conf, this.comparator }); this.offPeakHours = OffPeakHours.getInstance(conf); // Setting up cache configuration for this family @@ -762,18 +767,16 @@ public class HStore implements Store { * Write out current snapshot. Presumes {@link #snapshot()} has been called * previously. * @param logCacheFlushId flush sequence number - * @param snapshot + * @param snapshotScanner + * @param snapshotInfo * @param snapshotTimeRangeTracker - * @param flushedSize The number of bytes flushed * @param status * @return The path name of the tmp file to which the store was flushed * @throws IOException */ - protected List flushCache(final long logCacheFlushId, - SortedSet snapshot, - TimeRangeTracker snapshotTimeRangeTracker, - AtomicLong flushedSize, - MonitoredTask status) throws IOException { + protected List flushCache(final long logCacheFlushId, KeyValueScanner snapshotScanner, + SnapshotInfo snapshotInfo, TimeRangeTracker snapshotTimeRangeTracker, MonitoredTask status) + throws IOException { // If an exception happens flushing, we let it out without clearing // the memstore snapshot. The old snapshot will be returned when we say // 'snapshot', the next time flush comes around. @@ -783,8 +786,8 @@ public class HStore implements Store { IOException lastException = null; for (int i = 0; i < flushRetriesNumber; i++) { try { - List pathNames = flusher.flushSnapshot( - snapshot, logCacheFlushId, snapshotTimeRangeTracker, flushedSize, status); + List pathNames = flusher.flushSnapshot(snapshotScanner, snapshotInfo, + logCacheFlushId, snapshotTimeRangeTracker, status); Path lastPathName = null; try { for (Path pathName : pathNames) { @@ -826,7 +829,6 @@ public class HStore implements Store { private StoreFile commitFile(final Path path, final long logCacheFlushId, TimeRangeTracker snapshotTimeRangeTracker, - AtomicLong flushedSize, MonitoredTask status) throws IOException { // Write-out finished successfully, move into the right spot @@ -915,11 +917,11 @@ public class HStore implements Store { * @return Whether compaction is required. */ private boolean updateStorefiles( - final List sfs, final SortedSet set) throws IOException { + final List sfs, final long snapshotId) throws IOException { this.lock.writeLock().lock(); try { this.storeEngine.getStoreFileManager().insertNewFiles(sfs); - this.memstore.clearSnapshot(set); + this.memstore.clearSnapshot(snapshotId); } finally { // We need the lock, as long as we are updating the storeFiles // or changing the memstore. Let us release it before calling @@ -1821,7 +1823,7 @@ public class HStore implements Store { @Override public long getMemStoreSize() { - return this.memstore.heapSize(); + return this.memstore.size(); } @Override @@ -1862,38 +1864,6 @@ public class HStore implements Store { return this.region.getSmallestReadPoint(); } - /** - * Used in tests. TODO: Remove - * - * Updates the value for the given row/family/qualifier. This function will always be seen as - * atomic by other readers because it only puts a single KV to memstore. Thus no read/write - * control necessary. - * @param row row to update - * @param f family to update - * @param qualifier qualifier to update - * @param newValue the new value to set into memstore - * @return memstore size delta - * @throws IOException - */ - public long updateColumnValue(byte [] row, byte [] f, - byte [] qualifier, long newValue) - throws IOException { - - this.lock.readLock().lock(); - try { - long now = EnvironmentEdgeManager.currentTimeMillis(); - - return this.memstore.updateColumnValue(row, - f, - qualifier, - newValue, - now); - - } finally { - this.lock.readLock().unlock(); - } - } - @Override public long upsert(Iterable cells, long readpoint) throws IOException { this.lock.readLock().lock(); @@ -1912,10 +1882,10 @@ public class HStore implements Store { private class StoreFlusherImpl implements StoreFlushContext { private long cacheFlushSeqNum; - private SortedSet snapshot; private List tempFiles; private TimeRangeTracker snapshotTimeRangeTracker; - private final AtomicLong flushedSize = new AtomicLong(); + private KeyValueScanner snapshotScanner; + private SnapshotInfo snapshotInfo; private StoreFlusherImpl(long cacheFlushSeqNum) { this.cacheFlushSeqNum = cacheFlushSeqNum; @@ -1927,15 +1897,15 @@ public class HStore implements Store { */ @Override public void prepare() { - memstore.snapshot(); - this.snapshot = memstore.getSnapshot(); + this.snapshotInfo = memstore.snapshot(); + this.snapshotScanner = memstore.getSnapshotScanner(); this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker(); } @Override public void flushCache(MonitoredTask status) throws IOException { - tempFiles = HStore.this.flushCache( - cacheFlushSeqNum, snapshot, snapshotTimeRangeTracker, flushedSize, status); + tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshotScanner, + snapshotInfo, snapshotTimeRangeTracker, status); } @Override @@ -1947,7 +1917,7 @@ public class HStore implements Store { for (Path storeFilePath : tempFiles) { try { storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, - snapshotTimeRangeTracker, flushedSize, status)); + snapshotTimeRangeTracker, status)); } catch (IOException ex) { LOG.error("Failed to commit store file " + storeFilePath, ex); // Try to delete the files we have committed before. @@ -1970,7 +1940,7 @@ public class HStore implements Store { } } // Add new file to store files. Clear snapshot too while we have the Store write lock. - return HStore.this.updateStorefiles(storeFiles, snapshot); + return HStore.this.updateStorefiles(storeFiles, snapshotInfo.getId()); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 8da61fd..203ea3b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,1048 +15,140 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.regionserver; -import java.lang.management.ManagementFactory; -import java.lang.management.RuntimeMXBean; import java.rmi.UnexpectedException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; import java.util.List; -import java.util.NavigableSet; -import java.util.SortedSet; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.HeapSize; -import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** - * The MemStore holds in-memory modifications to the Store. Modifications - * are {@link KeyValue}s. When asked to flush, current memstore is moved - * to snapshot and is cleared. We continue to serve edits out of new memstore - * and backing snapshot until flusher reports in that the flush succeeded. At - * this point we let the snapshot go. - *

- * The MemStore functions should not be called in parallel. Callers should hold - * write and read locks. This is done in {@link HStore}. - *

- * - * TODO: Adjust size of the memstore when we remove items because they have - * been deleted. - * TODO: With new KVSLS, need to make sure we update HeapSize with difference - * in KV size. + * The MemStore holds in-memory modifications to the Store. Modifications are {@link Cell}s. + *

+ * The MemStore functions should not be called in parallel. Callers should hold write and read + * locks. This is done in {@link HStore}. + *

*/ @InterfaceAudience.Private -public class MemStore implements HeapSize { - private static final Log LOG = LogFactory.getLog(MemStore.class); - - static final String USEMSLAB_KEY = - "hbase.hregion.memstore.mslab.enabled"; - private static final boolean USEMSLAB_DEFAULT = true; - - private Configuration conf; - - // MemStore. Use a KeyValueSkipListSet rather than SkipListSet because of the - // better semantics. The Map will overwrite if passed a key it already had - // whereas the Set will not add new KV if key is same though value might be - // different. Value is not important -- just make sure always same - // reference passed. - volatile KeyValueSkipListSet kvset; - - // Snapshot of memstore. Made for flusher. - volatile KeyValueSkipListSet snapshot; - - final KeyValue.KVComparator comparator; - - // Used to track own heapSize - final AtomicLong size; - - // Used to track when to flush - volatile long timeOfOldestEdit = Long.MAX_VALUE; - - TimeRangeTracker timeRangeTracker; - TimeRangeTracker snapshotTimeRangeTracker; - - MemStoreChunkPool chunkPool; - volatile MemStoreLAB allocator; - volatile MemStoreLAB snapshotAllocator; +public interface MemStore extends HeapSize { /** - * Default constructor. Used for tests. + * Creates a snapshot of the current memstore. Snapshot must be cleared by call to + * {@link #clearSnapshot(long id)}. Use {@link KeyValueScanner} from {@link #getSnapshotScanner()} + * to iterate over the snapshot. + * + * @return {@link SnapshotInfo} */ - public MemStore() { - this(HBaseConfiguration.create(), KeyValue.COMPARATOR); - } - - /** - * Constructor. - * @param c Comparator - */ - public MemStore(final Configuration conf, - final KeyValue.KVComparator c) { - this.conf = conf; - this.comparator = c; - this.kvset = new KeyValueSkipListSet(c); - this.snapshot = new KeyValueSkipListSet(c); - timeRangeTracker = new TimeRangeTracker(); - snapshotTimeRangeTracker = new TimeRangeTracker(); - this.size = new AtomicLong(DEEP_OVERHEAD); - if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) { - this.chunkPool = MemStoreChunkPool.getPool(conf); - this.allocator = new MemStoreLAB(conf, chunkPool); - } else { - this.allocator = null; - this.chunkPool = null; - } - } - - void dump() { - for (KeyValue kv: this.kvset) { - LOG.info(kv); - } - for (KeyValue kv: this.snapshot) { - LOG.info(kv); - } - } + SnapshotInfo snapshot(); /** - * Creates a snapshot of the current memstore. - * Snapshot must be cleared by call to {@link #clearSnapshot(SortedSet)} - * To get the snapshot made by this method, use {@link #getSnapshot()} + * @return {@link KeyValueScanner} for iterating over the snapshot */ - void snapshot() { - // If snapshot currently has entries, then flusher failed or didn't call - // cleanup. Log a warning. - if (!this.snapshot.isEmpty()) { - LOG.warn("Snapshot called again without clearing previous. " + - "Doing nothing. Another ongoing flush or did we fail last attempt?"); - } else { - if (!this.kvset.isEmpty()) { - this.snapshot = this.kvset; - this.kvset = new KeyValueSkipListSet(this.comparator); - this.snapshotTimeRangeTracker = this.timeRangeTracker; - this.timeRangeTracker = new TimeRangeTracker(); - // Reset heap to not include any keys - this.size.set(DEEP_OVERHEAD); - this.snapshotAllocator = this.allocator; - // Reset allocator so we get a fresh buffer for the new memstore - if (allocator != null) { - this.allocator = new MemStoreLAB(conf, chunkPool); - } - timeOfOldestEdit = Long.MAX_VALUE; - } - } - } + KeyValueScanner getSnapshotScanner(); /** - * Return the current snapshot. - * Called by flusher to get current snapshot made by a previous - * call to {@link #snapshot()} - * @return Return snapshot. + * Clears the current snapshot of the Memstore. + * @param id * @see #snapshot() - * @see #clearSnapshot(SortedSet) */ - KeyValueSkipListSet getSnapshot() { - return this.snapshot; - } - - /** - * The passed snapshot was successfully persisted; it can be let go. - * @param ss The snapshot to clean out. - * @throws UnexpectedException - * @see #snapshot() - */ - void clearSnapshot(final SortedSet ss) - throws UnexpectedException { - MemStoreLAB tmpAllocator = null; - if (this.snapshot != ss) { - throw new UnexpectedException("Current snapshot is " + - this.snapshot + ", was passed " + ss); - } - // OK. Passed in snapshot is same as current snapshot. If not-empty, - // create a new snapshot and let the old one go. - if (!ss.isEmpty()) { - this.snapshot = new KeyValueSkipListSet(this.comparator); - this.snapshotTimeRangeTracker = new TimeRangeTracker(); - } - if (this.snapshotAllocator != null) { - tmpAllocator = this.snapshotAllocator; - this.snapshotAllocator = null; - } - if (tmpAllocator != null) { - tmpAllocator.close(); - } - } + void clearSnapshot(long id) throws UnexpectedException; /** * Write an update - * @param kv + * @param cell * @return approximate size of the passed key and value. */ - long add(final KeyValue kv) { - KeyValue toAdd = maybeCloneWithAllocator(kv); - return internalAdd(toAdd); - } - - long timeOfOldestEdit() { - return timeOfOldestEdit; - } - - private boolean addToKVSet(KeyValue e) { - boolean b = this.kvset.add(e); - setOldestEditTimeToNow(); - return b; - } - - private boolean removeFromKVSet(KeyValue e) { - boolean b = this.kvset.remove(e); - setOldestEditTimeToNow(); - return b; - } - - void setOldestEditTimeToNow() { - if (timeOfOldestEdit == Long.MAX_VALUE) { - timeOfOldestEdit = EnvironmentEdgeManager.currentTimeMillis(); - } - } + long add(final Cell cell); /** - * Internal version of add() that doesn't clone KVs with the - * allocator, and doesn't take the lock. - * - * Callers should ensure they already have the read lock taken + * @return Oldest timestamp of all the Cells in the MemStore */ - private long internalAdd(final KeyValue toAdd) { - long s = heapSizeChange(toAdd, addToKVSet(toAdd)); - timeRangeTracker.includeTimestamp(toAdd); - this.size.addAndGet(s); - return s; - } - - private KeyValue maybeCloneWithAllocator(KeyValue kv) { - if (allocator == null) { - return kv; - } - - int len = kv.getLength(); - Allocation alloc = allocator.allocateBytes(len); - if (alloc == null) { - // The allocation was too large, allocator decided - // not to do anything with it. - return kv; - } - assert alloc.getData() != null; - System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len); - KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len); - newKv.setMvccVersion(kv.getMvccVersion()); - return newKv; - } + long timeOfOldestEdit(); /** - * Remove n key from the memstore. Only kvs that have the same key and the - * same memstoreTS are removed. It is ok to not update timeRangeTracker - * in this call. It is possible that we can optimize this method by using - * tailMap/iterator, but since this method is called rarely (only for - * error recovery), we can leave those optimization for the future. - * @param kv + * Remove n key from the memstore. Only kvs that have the same key and the same memstoreTS are + * removed. It is ok to not update timeRangeTracker in this call. + * @param cell */ - void rollback(final KeyValue kv) { - // If the key is in the snapshot, delete it. We should not update - // this.size, because that tracks the size of only the memstore and - // not the snapshot. The flush of this snapshot to disk has not - // yet started because Store.flush() waits for all rwcc transactions to - // commit before starting the flush to disk. - KeyValue found = this.snapshot.get(kv); - if (found != null && found.getMvccVersion() == kv.getMvccVersion()) { - this.snapshot.remove(kv); - } - // If the key is in the memstore, delete it. Update this.size. - found = this.kvset.get(kv); - if (found != null && found.getMvccVersion() == kv.getMvccVersion()) { - removeFromKVSet(kv); - long s = heapSizeChange(kv, true); - this.size.addAndGet(-s); - } - } + void rollback(final Cell cell); /** * Write a delete - * @param delete + * @param deleteCell * @return approximate size of the passed key and value. */ - long delete(final KeyValue delete) { - long s = 0; - KeyValue toAdd = maybeCloneWithAllocator(delete); - s += heapSizeChange(toAdd, addToKVSet(toAdd)); - timeRangeTracker.includeTimestamp(toAdd); - this.size.addAndGet(s); - return s; - } - - /** - * @param kv Find the row that comes after this one. If null, we return the - * first. - * @return Next row or null if none found. - */ - KeyValue getNextRow(final KeyValue kv) { - return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot)); - } - - /* - * @param a - * @param b - * @return Return lowest of a or b or null if both a and b are null - */ - private KeyValue getLowest(final KeyValue a, final KeyValue b) { - if (a == null) { - return b; - } - if (b == null) { - return a; - } - return comparator.compareRows(a, b) <= 0? a: b; - } - - /* - * @param key Find row that follows this one. If null, return first. - * @param map Set to look in for a row beyond row. - * @return Next row or null if none found. If one found, will be a new - * KeyValue -- can be destroyed by subsequent calls to this method. - */ - private KeyValue getNextRow(final KeyValue key, - final NavigableSet set) { - KeyValue result = null; - SortedSet tail = key == null? set: set.tailSet(key); - // Iterate until we fall into the next row; i.e. move off current row - for (KeyValue kv: tail) { - if (comparator.compareRows(kv, key) <= 0) - continue; - // Note: Not suppressing deletes or expired cells. Needs to be handled - // by higher up functions. - result = kv; - break; - } - return result; - } + long delete(final Cell deleteCell); /** + * Find the key that matches row exactly, or the one that immediately precedes it. The + * target row key is set in state. * @param state column/delete tracking state */ - void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) { - getRowKeyAtOrBefore(kvset, state); - getRowKeyAtOrBefore(snapshot, state); - } - - /* - * @param set - * @param state Accumulates deletes and candidates. - */ - private void getRowKeyAtOrBefore(final NavigableSet set, - final GetClosestRowBeforeTracker state) { - if (set.isEmpty()) { - return; - } - if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) { - // Found nothing in row. Try backing up. - getRowKeyBefore(set, state); - } - } - - /* - * Walk forward in a row from firstOnRow. Presumption is that - * we have been passed the first possible key on a row. As we walk forward - * we accumulate deletes until we hit a candidate on the row at which point - * we return. - * @param set - * @param firstOnRow First possible key on this row. - * @param state - * @return True if we found a candidate walking this row. - */ - private boolean walkForwardInSingleRow(final SortedSet set, - final KeyValue firstOnRow, final GetClosestRowBeforeTracker state) { - boolean foundCandidate = false; - SortedSet tail = set.tailSet(firstOnRow); - if (tail.isEmpty()) return foundCandidate; - for (Iterator i = tail.iterator(); i.hasNext();) { - KeyValue kv = i.next(); - // Did we go beyond the target row? If so break. - if (state.isTooFar(kv, firstOnRow)) break; - if (state.isExpired(kv)) { - i.remove(); - continue; - } - // If we added something, this row is a contender. break. - if (state.handle(kv)) { - foundCandidate = true; - break; - } - } - return foundCandidate; - } - - /* - * Walk backwards through the passed set a row at a time until we run out of - * set or until we get a candidate. - * @param set - * @param state - */ - private void getRowKeyBefore(NavigableSet set, - final GetClosestRowBeforeTracker state) { - KeyValue firstOnRow = state.getTargetKey(); - for (Member p = memberOfPreviousRow(set, state, firstOnRow); - p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) { - // Make sure we don't fall out of our table. - if (!state.isTargetTable(p.kv)) break; - // Stop looking if we've exited the better candidate range. - if (!state.isBetterCandidate(p.kv)) break; - // Make into firstOnRow - firstOnRow = new KeyValue(p.kv.getRowArray(), p.kv.getRowOffset(), p.kv.getRowLength(), - HConstants.LATEST_TIMESTAMP); - // If we find something, break; - if (walkForwardInSingleRow(p.set, firstOnRow, state)) break; - } - } - - /** - * Only used by tests. TODO: Remove - * - * Given the specs of a column, update it, first by inserting a new record, - * then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS - * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying - * store will ensure that the insert/delete each are atomic. A scanner/reader will either - * get the new value, or the old value and all readers will eventually only see the new - * value after the old was removed. - * - * @param row - * @param family - * @param qualifier - * @param newValue - * @param now - * @return Timestamp - */ - long updateColumnValue(byte[] row, - byte[] family, - byte[] qualifier, - long newValue, - long now) { - KeyValue firstKv = KeyValue.createFirstOnRow( - row, family, qualifier); - // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit. - SortedSet snSs = snapshot.tailSet(firstKv); - if (!snSs.isEmpty()) { - KeyValue snKv = snSs.first(); - // is there a matching KV in the snapshot? - if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) { - if (snKv.getTimestamp() == now) { - // poop, - now += 1; - } - } - } - - // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary. - // But the timestamp should also be max(now, mostRecentTsInMemstore) - - // so we cant add the new KV w/o knowing what's there already, but we also - // want to take this chance to delete some kvs. So two loops (sad) - - SortedSet ss = kvset.tailSet(firstKv); - for (KeyValue kv : ss) { - // if this isnt the row we are interested in, then bail: - if (!kv.matchingColumn(family, qualifier) || !kv.matchingRow(firstKv)) { - break; // rows dont match, bail. - } - - // if the qualifier matches and it's a put, just RM it out of the kvset. - if (kv.getTypeByte() == KeyValue.Type.Put.getCode() && - kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) { - now = kv.getTimestamp(); - } - } - - // create or update (upsert) a new KeyValue with - // 'now' and a 0 memstoreTS == immediately visible - List cells = new ArrayList(1); - cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue))); - return upsert(cells, 1L); - } + void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state); /** - * Update or insert the specified KeyValues. + * Update or insert the specified cells. *

- * For each KeyValue, insert into MemStore. This will atomically upsert the - * value for that row/family/qualifier. If a KeyValue did already exist, - * it will then be removed. + * For each Cell, insert into MemStore. This will atomically upsert the value for that + * row/family/qualifier. If a Cell did already exist, it will then be removed. *

- * Currently the memstoreTS is kept at 0 so as each insert happens, it will - * be immediately visible. May want to change this so it is atomic across - * all KeyValues. + * Currently the memstoreTS is kept at 0 so as each insert happens, it will be immediately + * visible. May want to change this so it is atomic across all KeyValues. *

- * This is called under row lock, so Get operations will still see updates - * atomically. Scans will only see each KeyValue update as atomic. - * + * This is called under row lock, so Get operations will still see updates atomically. Scans will + * only see each KeyValue update as atomic. * @param cells - * @param readpoint readpoint below which we can safely remove duplicate KVs + * @param readpoint readpoint below which we can safely remove duplicate Cells. * @return change in memstore size */ - public long upsert(Iterable cells, long readpoint) { - long size = 0; - for (Cell cell : cells) { - size += upsert(cell, readpoint); - } - return size; - } + long upsert(Iterable cells, long readpoint); /** - * Inserts the specified KeyValue into MemStore and deletes any existing - * versions of the same row/family/qualifier as the specified KeyValue. - *

- * First, the specified KeyValue is inserted into the Memstore. - *

- * If there are any existing KeyValues in this MemStore with the same row, - * family, and qualifier, they are removed. - *

- * Callers must hold the read lock. - * - * @param cell - * @return change in size of MemStore - */ - private long upsert(Cell cell, long readpoint) { - // Add the KeyValue to the MemStore - // Use the internalAdd method here since we (a) already have a lock - // and (b) cannot safely use the MSLAB here without potentially - // hitting OOME - see TestMemStore.testUpsertMSLAB for a - // test that triggers the pathological case if we don't avoid MSLAB - // here. - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - long addedSize = internalAdd(kv); - - // Get the KeyValues for the row/family/qualifier regardless of timestamp. - // For this case we want to clean up any other puts - KeyValue firstKv = KeyValue.createFirstOnRow( - kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), - kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), - kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()); - SortedSet ss = kvset.tailSet(firstKv); - Iterator it = ss.iterator(); - // versions visible to oldest scanner - int versionsVisible = 0; - while ( it.hasNext() ) { - KeyValue cur = it.next(); - - if (kv == cur) { - // ignore the one just put in - continue; - } - // check that this is the row and column we are interested in, otherwise bail - if (kv.matchingRow(cur) && kv.matchingQualifier(cur)) { - // only remove Puts that concurrent scanners cannot possibly see - if (cur.getTypeByte() == KeyValue.Type.Put.getCode() && - cur.getMvccVersion() <= readpoint) { - if (versionsVisible > 1) { - // if we get here we have seen at least one version visible to the oldest scanner, - // which means we can prove that no scanner will see this version - - // false means there was a change, so give us the size. - long delta = heapSizeChange(cur, true); - addedSize -= delta; - this.size.addAndGet(-delta); - it.remove(); - setOldestEditTimeToNow(); - } else { - versionsVisible++; - } - } - } else { - // past the row or column, done - break; - } - } - return addedSize; - } - - /* - * Immutable data structure to hold member found in set and the set it was - * found in. Include set because it is carrying context. - */ - private static class Member { - final KeyValue kv; - final NavigableSet set; - Member(final NavigableSet s, final KeyValue kv) { - this.kv = kv; - this.set = s; - } - } - - /* - * @param set Set to walk back in. Pass a first in row or we'll return - * same row (loop). - * @param state Utility and context. - * @param firstOnRow First item on the row after the one we want to find a - * member in. - * @return Null or member of row previous to firstOnRow + * @return scanner over the memstore. This might include scanner over the snapshot when one is + * present. */ - private Member memberOfPreviousRow(NavigableSet set, - final GetClosestRowBeforeTracker state, final KeyValue firstOnRow) { - NavigableSet head = set.headSet(firstOnRow, false); - if (head.isEmpty()) return null; - for (Iterator i = head.descendingIterator(); i.hasNext();) { - KeyValue found = i.next(); - if (state.isExpired(found)) { - i.remove(); - continue; - } - return new Member(head, found); - } - return null; - } + List getScanners(long readPt); /** - * @return scanner on memstore and snapshot in this order. + * @return {@link TimeRangeTracker} for all the Cells in the snapshot. */ - List getScanners(long readPt) { - return Collections.singletonList( - new MemStoreScanner(readPt)); - } + TimeRangeTracker getSnapshotTimeRangeTracker(); - /** - * Check if this memstore may contain the required keys - * @param scan - * @return False if the key definitely does not exist in this Memstore - */ - public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) { - return (timeRangeTracker.includesTimeRange(scan.getTimeRange()) || - snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange())) - && (Math.max(timeRangeTracker.getMaximumTimestamp(), - snapshotTimeRangeTracker.getMaximumTimestamp()) >= - oldestUnexpiredTS); - } + long size(); - public TimeRangeTracker getSnapshotTimeRangeTracker() { - return this.snapshotTimeRangeTracker; - } - - /* - * MemStoreScanner implements the KeyValueScanner. - * It lets the caller scan the contents of a memstore -- both current - * map and snapshot. - * This behaves as if it were a real scanner but does not maintain position. + /** + * Holds details of the snapshot taken on this Memstore. Details include the snapshot's + * identifier, count of cells in it and total memory size occupied by all the cells. */ - protected class MemStoreScanner extends NonLazyKeyValueScanner { - // Next row information for either kvset or snapshot - private KeyValue kvsetNextRow = null; - private KeyValue snapshotNextRow = null; - - // last iterated KVs for kvset and snapshot (to restore iterator state after reseek) - private KeyValue kvsetItRow = null; - private KeyValue snapshotItRow = null; - - // iterator based scanning. - private Iterator kvsetIt; - private Iterator snapshotIt; - - // The kvset and snapshot at the time of creating this scanner - private KeyValueSkipListSet kvsetAtCreation; - private KeyValueSkipListSet snapshotAtCreation; - - // the pre-calculated KeyValue to be returned by peek() or next() - private KeyValue theNext; - - // The allocator and snapshot allocator at the time of creating this scanner - volatile MemStoreLAB allocatorAtCreation; - volatile MemStoreLAB snapshotAllocatorAtCreation; - - // A flag represents whether could stop skipping KeyValues for MVCC - // if have encountered the next row. Only used for reversed scan - private boolean stopSkippingKVsIfNextRow = false; - - private long readPoint; - - /* - Some notes... - - So memstorescanner is fixed at creation time. this includes pointers/iterators into - existing kvset/snapshot. during a snapshot creation, the kvset is null, and the - snapshot is moved. since kvset is null there is no point on reseeking on both, - we can save us the trouble. During the snapshot->hfile transition, the memstore - scanner is re-created by StoreScanner#updateReaders(). StoreScanner should - potentially do something smarter by adjusting the existing memstore scanner. - - But there is a greater problem here, that being once a scanner has progressed - during a snapshot scenario, we currently iterate past the kvset then 'finish' up. - if a scan lasts a little while, there is a chance for new entries in kvset to - become available but we will never see them. This needs to be handled at the - StoreScanner level with coordination with MemStoreScanner. - - Currently, this problem is only partly managed: during the small amount of time - when the StoreScanner has not yet created a new MemStoreScanner, we will miss - the adds to kvset in the MemStoreScanner. - */ - - MemStoreScanner(long readPoint) { - super(); - - this.readPoint = readPoint; - kvsetAtCreation = kvset; - snapshotAtCreation = snapshot; - if (allocator != null) { - this.allocatorAtCreation = allocator; - this.allocatorAtCreation.incScannerCount(); - } - if (snapshotAllocator != null) { - this.snapshotAllocatorAtCreation = snapshotAllocator; - this.snapshotAllocatorAtCreation.incScannerCount(); - } - } - - private KeyValue getNext(Iterator it) { - KeyValue startKV = theNext; - KeyValue v = null; - try { - while (it.hasNext()) { - v = it.next(); - if (v.getMvccVersion() <= this.readPoint) { - return v; - } - if (stopSkippingKVsIfNextRow && startKV != null - && comparator.compareRows(v, startKV) > 0) { - return null; - } - } - - return null; - } finally { - if (v != null) { - // in all cases, remember the last KV iterated to - if (it == snapshotIt) { - snapshotItRow = v; - } else { - kvsetItRow = v; - } - } - } - } - - /** - * Set the scanner at the seek key. - * Must be called only once: there is no thread safety between the scanner - * and the memStore. - * @param key seek value - * @return false if the key is null or if there is no data - */ - @Override - public synchronized boolean seek(KeyValue key) { - if (key == null) { - close(); - return false; - } - - // kvset and snapshot will never be null. - // if tailSet can't find anything, SortedSet is empty (not null). - kvsetIt = kvsetAtCreation.tailSet(key).iterator(); - snapshotIt = snapshotAtCreation.tailSet(key).iterator(); - kvsetItRow = null; - snapshotItRow = null; - - return seekInSubLists(key); - } - - - /** - * (Re)initialize the iterators after a seek or a reseek. - */ - private synchronized boolean seekInSubLists(KeyValue key){ - kvsetNextRow = getNext(kvsetIt); - snapshotNextRow = getNext(snapshotIt); + static class SnapshotInfo { + private long id; + private int cellsCount; + private long size; - // Calculate the next value - theNext = getLowest(kvsetNextRow, snapshotNextRow); - - // has data - return (theNext != null); - } - - - /** - * Move forward on the sub-lists set previously by seek. - * @param key seek value (should be non-null) - * @return true if there is at least one KV to read, false otherwise - */ - @Override - public synchronized boolean reseek(KeyValue key) { - /* - See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation. - This code is executed concurrently with flush and puts, without locks. - Two points must be known when working on this code: - 1) It's not possible to use the 'kvTail' and 'snapshot' - variables, as they are modified during a flush. - 2) The ideal implementation for performance would use the sub skip list - implicitly pointed by the iterators 'kvsetIt' and - 'snapshotIt'. Unfortunately the Java API does not offer a method to - get it. So we remember the last keys we iterated to and restore - the reseeked set to at least that point. - */ - - kvsetIt = kvsetAtCreation.tailSet(getHighest(key, kvsetItRow)).iterator(); - snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator(); - - return seekInSubLists(key); - } - - - @Override - public synchronized KeyValue peek() { - //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest()); - return theNext; - } - - @Override - public synchronized KeyValue next() { - if (theNext == null) { - return null; - } - - final KeyValue ret = theNext; - - // Advance one of the iterators - if (theNext == kvsetNextRow) { - kvsetNextRow = getNext(kvsetIt); - } else { - snapshotNextRow = getNext(snapshotIt); - } - - // Calculate the next value - theNext = getLowest(kvsetNextRow, snapshotNextRow); - - //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint(); - //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " + - // getLowest() + " threadpoint=" + readpoint); - return ret; - } - - /* - * Returns the lower of the two key values, or null if they are both null. - * This uses comparator.compare() to compare the KeyValue using the memstore - * comparator. - */ - private KeyValue getLowest(KeyValue first, KeyValue second) { - if (first == null && second == null) { - return null; - } - if (first != null && second != null) { - int compare = comparator.compare(first, second); - return (compare <= 0 ? first : second); - } - return (first != null ? first : second); - } - - /* - * Returns the higher of the two key values, or null if they are both null. - * This uses comparator.compare() to compare the KeyValue using the memstore - * comparator. - */ - private KeyValue getHighest(KeyValue first, KeyValue second) { - if (first == null && second == null) { - return null; - } - if (first != null && second != null) { - int compare = comparator.compare(first, second); - return (compare > 0 ? first : second); - } - return (first != null ? first : second); - } - - public synchronized void close() { - this.kvsetNextRow = null; - this.snapshotNextRow = null; - - this.kvsetIt = null; - this.snapshotIt = null; - - if (allocatorAtCreation != null) { - this.allocatorAtCreation.decScannerCount(); - this.allocatorAtCreation = null; - } - if (snapshotAllocatorAtCreation != null) { - this.snapshotAllocatorAtCreation.decScannerCount(); - this.snapshotAllocatorAtCreation = null; - } - - this.kvsetItRow = null; - this.snapshotItRow = null; + SnapshotInfo(long id, int cellsCount, long size) { + this.id = id; + this.cellsCount = cellsCount; + this.size = size; } - /** - * MemStoreScanner returns max value as sequence id because it will - * always have the latest data among all files. - */ - @Override - public long getSequenceID() { - return Long.MAX_VALUE; + public long getId() { + return id; } - @Override - public boolean shouldUseScanner(Scan scan, SortedSet columns, - long oldestUnexpiredTS) { - return shouldSeek(scan, oldestUnexpiredTS); + public int getCellsCount() { + return cellsCount; } - /** - * Seek scanner to the given key first. If it returns false(means - * peek()==null) or scanner's peek row is bigger than row of given key, seek - * the scanner to the previous row of given key - */ - @Override - public synchronized boolean backwardSeek(KeyValue key) { - seek(key); - if (peek() == null || comparator.compareRows(peek(), key) > 0) { - return seekToPreviousRow(key); - } - return true; + public long getSize() { + return size; } - /** - * Separately get the KeyValue before the specified key from kvset and - * snapshotset, and use the row of higher one as the previous row of - * specified key, then seek to the first KeyValue of previous row - */ - @Override - public synchronized boolean seekToPreviousRow(KeyValue key) { - KeyValue firstKeyOnRow = KeyValue.createFirstOnRow(key.getRow()); - SortedSet kvHead = kvsetAtCreation.headSet(firstKeyOnRow); - KeyValue kvsetBeforeRow = kvHead.isEmpty() ? null : kvHead.last(); - SortedSet snapshotHead = snapshotAtCreation - .headSet(firstKeyOnRow); - KeyValue snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead - .last(); - KeyValue lastKVBeforeRow = getHighest(kvsetBeforeRow, snapshotBeforeRow); - if (lastKVBeforeRow == null) { - theNext = null; - return false; - } - KeyValue firstKeyOnPreviousRow = KeyValue - .createFirstOnRow(lastKVBeforeRow.getRow()); - this.stopSkippingKVsIfNextRow = true; - seek(firstKeyOnPreviousRow); - this.stopSkippingKVsIfNextRow = false; - if (peek() == null - || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) { - return seekToPreviousRow(lastKVBeforeRow); - } - return true; - } - - @Override - public synchronized boolean seekToLastRow() { - KeyValue first = kvsetAtCreation.isEmpty() ? null : kvsetAtCreation - .last(); - KeyValue second = snapshotAtCreation.isEmpty() ? null - : snapshotAtCreation.last(); - KeyValue higherKv = getHighest(first, second); - if (higherKv == null) { - return false; - } - KeyValue firstKvOnLastRow = KeyValue.createFirstOnRow(higherKv.getRow()); - if (seek(firstKvOnLastRow)) { - return true; - } else { - return seekToPreviousRow(higherKv); - } - - } - } - - public final static long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); - - public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + - ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) + - (2 * ClassSize.KEYVALUE_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP)); - - /* - * Calculate how the MemStore size has changed. Includes overhead of the - * backing Map. - * @param kv - * @param notpresent True if the kv was NOT present in the set. - * @return Size - */ - static long heapSizeChange(final KeyValue kv, final boolean notpresent) { - return notpresent ? - ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()): - 0; - } - - /** - * Get the entire heap usage for this MemStore not including keys in the - * snapshot. - */ - @Override - public long heapSize() { - return size.get(); - } - - /** - * Get the heap usage of KVs in this MemStore. - */ - public long keySize() { - return heapSize() - DEEP_OVERHEAD; - } - - /** - * Code to help figure if our approximation of object heap sizes is close - * enough. See hbase-900. Fills memstores then waits so user can heap - * dump and bring up resultant hprof in something like jprofiler which - * allows you get 'deep size' on objects. - * @param args main args - */ - public static void main(String [] args) { - RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); - LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" + - runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion()); - LOG.info("vmInputArguments=" + runtime.getInputArguments()); - MemStore memstore1 = new MemStore(); - // TODO: x32 vs x64 - long size = 0; - final int count = 10000; - byte [] fam = Bytes.toBytes("col"); - byte [] qf = Bytes.toBytes("umn"); - byte [] empty = new byte[0]; - for (int i = 0; i < count; i++) { - // Give each its own ts - size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty)); - } - LOG.info("memstore1 estimated size=" + size); - for (int i = 0; i < count; i++) { - size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty)); - } - LOG.info("memstore1 estimated size (2nd loading of same data)=" + size); - // Make a variably sized memstore. - MemStore memstore2 = new MemStore(); - for (int i = 0; i < count; i++) { - size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, - new byte[i])); - } - LOG.info("memstore2 estimated size=" + size); - final int seconds = 30; - LOG.info("Waiting " + seconds + " seconds while heap dump is taken"); - for (int i = 0; i < seconds; i++) { - // Thread.sleep(1000); + public long heapSize() { + return ClassSize.align(ClassSize.OBJECT + (2 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_INT); } - LOG.info("Exiting."); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index efd250b..c6384fc 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -22,11 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.SortedSet; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -36,8 +32,8 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.MemStore.SnapshotInfo; import org.apache.hadoop.hbase.regionserver.compactions.Compactor; -import org.apache.hadoop.hbase.util.CollectionBackedScanner; /** * Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one). @@ -55,16 +51,16 @@ abstract class StoreFlusher { /** * Turns a snapshot of memstore into a set of store files. - * @param snapshot Memstore snapshot. + * @param snapshotScanner + * @param snapshotInfo * @param cacheFlushSeqNum Log cache flush sequence number. * @param snapshotTimeRangeTracker Time range tracker from the memstore * pertaining to the snapshot. - * @param flushedSize Out parameter for the size of the KVs flushed. * @param status Task that represents the flush operation and may be updated with status. * @return List of files written. Can be empty; must not be null. */ - public abstract List flushSnapshot(SortedSet snapshot, long cacheFlushSeqNum, - TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, MonitoredTask status) + public abstract List flushSnapshot(KeyValueScanner snapshotScanner, SnapshotInfo snapshotInfo, + long cacheFlushSeqNum, TimeRangeTracker snapshotTimeRangeTracker, MonitoredTask status) throws IOException; protected void finalizeWriter(StoreFile.Writer writer, long cacheFlushSeqNum, @@ -81,12 +77,12 @@ abstract class StoreFlusher { /** * Creates the scanner for flushing snapshot. Also calls coprocessors. + * @param memstoreScanner + * @param smallestReadPoint * @return The scanner; null if coprocessor is canceling the flush. */ - protected InternalScanner createScanner(SortedSet snapshot, + protected InternalScanner createScanner(KeyValueScanner memstoreScanner, long smallestReadPoint) throws IOException { - KeyValueScanner memstoreScanner = - new CollectionBackedScanner(snapshot, store.getComparator()); InternalScanner scanner = null; if (store.getCoprocessorHost() != null) { scanner = store.getCoprocessorHost().preFlushScannerOpen(store, memstoreScanner); @@ -115,15 +111,13 @@ abstract class StoreFlusher { * @param scanner Scanner to get data from. * @param sink Sink to write data to. Could be StoreFile.Writer. * @param smallestReadPoint Smallest read point used for the flush. - * @return Bytes flushed. */ - protected long performFlush(InternalScanner scanner, + protected void performFlush(InternalScanner scanner, Compactor.CellSink sink, long smallestReadPoint) throws IOException { int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); List kvs = new ArrayList(); boolean hasMore; - long flushed = 0; do { hasMore = scanner.next(kvs, compactionKVMax); if (!kvs.isEmpty()) { @@ -139,11 +133,10 @@ abstract class StoreFlusher { kv.setMvccVersion(0); } sink.append(kv); - flushed += MemStore.heapSizeChange(kv, true); } kvs.clear(); } } while (hasMore); - return flushed; } + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java index a2ece5d..4d231f0 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java @@ -21,21 +21,17 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY; import java.io.IOException; -import java.util.ArrayList; import java.util.List; -import java.util.SortedSet; -import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.MemStore.SnapshotInfo; import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; -import org.apache.hadoop.hbase.util.CollectionBackedScanner; import com.google.common.annotations.VisibleForTesting; @@ -57,33 +53,32 @@ public class StripeStoreFlusher extends StoreFlusher { } @Override - public List flushSnapshot(SortedSet snapshot, long cacheFlushSeqNum, - final TimeRangeTracker tracker, AtomicLong flushedSize, MonitoredTask status) - throws IOException { + public List flushSnapshot(KeyValueScanner snapshotScanner, SnapshotInfo snapshotInfo, + long cacheFlushSeqNum, final TimeRangeTracker tracker, MonitoredTask status) + throws IOException { List result = null; - int kvCount = snapshot.size(); - if (kvCount == 0) return result; // don't flush if there are no entries + int cellsCount = snapshotInfo.getCellsCount(); + if (cellsCount == 0) return result; // don't flush if there are no entries long smallestReadPoint = store.getSmallestReadPoint(); - InternalScanner scanner = createScanner(snapshot, smallestReadPoint); + InternalScanner scanner = createScanner(snapshotScanner, smallestReadPoint); if (scanner == null) { return result; // NULL scanner returned from coprocessor hooks means skip normal processing } // Let policy select flush method. - StripeFlushRequest req = this.policy.selectFlush(this.stripes, kvCount); + StripeFlushRequest req = this.policy.selectFlush(this.stripes, cellsCount); - long flushedBytes = 0; boolean success = false; StripeMultiFileWriter mw = null; try { mw = req.createWriter(); // Writer according to the policy. - StripeMultiFileWriter.WriterFactory factory = createWriterFactory(tracker, kvCount); + StripeMultiFileWriter.WriterFactory factory = createWriterFactory(tracker, cellsCount); StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null; mw.init(storeScanner, factory, store.getComparator()); synchronized (flushLock) { - flushedBytes = performFlush(scanner, mw, smallestReadPoint); + performFlush(scanner, mw, smallestReadPoint); result = mw.commitWriters(cacheFlushSeqNum, false); success = true; } @@ -100,7 +95,6 @@ public class StripeStoreFlusher extends StoreFlusher { } } } - flushedSize.set(flushedBytes); try { scanner.close(); } catch (IOException ex) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index 5f8458a..d6e010e 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -47,7 +47,7 @@ import org.apache.hadoop.hbase.io.hfile.LruBlockCache; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.KeyValueSkipListSet; -import org.apache.hadoop.hbase.regionserver.MemStore; +import org.apache.hadoop.hbase.regionserver.DefaultMemStore; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.util.ClassSize; import org.junit.BeforeClass; @@ -292,8 +292,8 @@ public class TestHeapSize { } // MemStore Overhead - cl = MemStore.class; - actual = MemStore.FIXED_OVERHEAD; + cl = DefaultMemStore.class; + actual = DefaultMemStore.FIXED_OVERHEAD; expected = ClassSize.estimateBase(cl, false); if(expected != actual) { ClassSize.estimateBase(cl, true); @@ -301,7 +301,7 @@ public class TestHeapSize { } // MemStore Deep Overhead - actual = MemStore.DEEP_OVERHEAD; + actual = DefaultMemStore.DEEP_OVERHEAD; expected = ClassSize.estimateBase(cl, false); expected += ClassSize.estimateBase(AtomicLong.class, false); expected += (2 * ClassSize.estimateBase(KeyValueSkipListSet.class, false)); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index e518074..68ac35d 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1154,13 +1154,9 @@ public class TestHRegion { put.add(kv); // checkAndPut with wrong value - HStore store = (HStore) region.getStore(fam1); - store.memstore.kvset.size(); - boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator( val1), put, true); assertEquals(true, res); - store.memstore.kvset.size(); Get get = new Get(row1); get.addColumn(fam2, qf1); @@ -1673,10 +1669,11 @@ public class TestHRegion { // extract the key values out the memstore: // This is kinda hacky, but better than nothing... long now = System.currentTimeMillis(); - KeyValue firstKv = ((HStore) region.getStore(fam1)).memstore.kvset.first(); + DefaultMemStore memstore = (DefaultMemStore) ((HStore) region.getStore(fam1)).memstore; + KeyValue firstKv = memstore.kvset.first(); assertTrue(firstKv.getTimestamp() <= now); now = firstKv.getTimestamp(); - for (KeyValue kv : ((HStore) region.getStore(fam1)).memstore.kvset) { + for (KeyValue kv : memstore.kvset) { assertTrue(kv.getTimestamp() <= now); now = kv.getTimestamp(); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java index 3794ab6..51f62dc 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryMXBean; import java.rmi.UnexpectedException; import java.util.ArrayList; import java.util.Arrays; @@ -43,6 +41,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueTestUtil; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.MemStore.SnapshotInfo; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -56,20 +55,17 @@ import com.google.common.collect.Lists; @Category(MediumTests.class) public class TestMemStore extends TestCase { private final Log LOG = LogFactory.getLog(this.getClass()); - private MemStore memstore; + private DefaultMemStore memstore; private static final int ROW_COUNT = 10; private static final int QUALIFIER_COUNT = ROW_COUNT; private static final byte [] FAMILY = Bytes.toBytes("column"); - private static final byte [] CONTENTS = Bytes.toBytes("contents"); - private static final byte [] BASIC = Bytes.toBytes("basic"); - private static final String CONTENTSTR = "contentstr"; private MultiVersionConsistencyControl mvcc; @Override public void setUp() throws Exception { super.setUp(); this.mvcc = new MultiVersionConsistencyControl(); - this.memstore = new MemStore(); + this.memstore = new DefaultMemStore(); } public void testPutSameKey() { @@ -154,8 +150,8 @@ public class TestMemStore extends TestCase { assertEquals("count=" + count + ", result=" + result, rowCount, result.size()); count++; if (count == snapshotIndex) { - this.memstore.snapshot(); - this.memstore.clearSnapshot(this.memstore.getSnapshot()); + SnapshotInfo snapshotInfo = this.memstore.snapshot(); + this.memstore.clearSnapshot(snapshotInfo.getId()); // Added more rows into kvset. But the scanner wont see these rows. addRows(this.memstore, ts); LOG.info("Snapshotted, cleared it and then added values (which wont be seen)"); @@ -195,7 +191,7 @@ public class TestMemStore extends TestCase { verifyScanAcrossSnapshot2(kv1, kv2); // use case 3: first in snapshot second in kvset - this.memstore = new MemStore(); + this.memstore = new DefaultMemStore(); this.memstore.add(kv1.clone()); this.memstore.snapshot(); this.memstore.add(kv2.clone()); @@ -379,13 +375,13 @@ public class TestMemStore extends TestCase { final byte[] q1 = Bytes.toBytes("q1"); final MultiVersionConsistencyControl mvcc; - final MemStore memstore; + final DefaultMemStore memstore; AtomicReference caughtException; public ReadOwnWritesTester(int id, - MemStore memstore, + DefaultMemStore memstore, MultiVersionConsistencyControl mvcc, AtomicReference caughtException) { @@ -459,13 +455,12 @@ public class TestMemStore extends TestCase { for (int i = 0; i < snapshotCount; i++) { addRows(this.memstore); runSnapshot(this.memstore); - KeyValueSkipListSet ss = this.memstore.getSnapshot(); - assertEquals("History not being cleared", 0, ss.size()); + assertEquals("History not being cleared", 0, this.memstore.snapshot.size()); } } public void testMultipleVersionsSimple() throws Exception { - MemStore m = new MemStore(new Configuration(), KeyValue.COMPARATOR); + DefaultMemStore m = new DefaultMemStore(new Configuration(), KeyValue.COMPARATOR); byte [] row = Bytes.toBytes("testRow"); byte [] family = Bytes.toBytes("testFamily"); byte [] qf = Bytes.toBytes("testQualifier"); @@ -755,52 +750,6 @@ public class TestMemStore extends TestCase { //assertTrue(!memstore.shouldSeek(scan)); } - //////////////////////////////////// - //Test for upsert with MSLAB - //////////////////////////////////// - - /** - * Test a pathological pattern that shows why we can't currently - * use the MSLAB for upsert workloads. This test inserts data - * in the following pattern: - * - * - row0001 through row1000 (fills up one 2M Chunk) - * - row0002 through row1001 (fills up another 2M chunk, leaves one reference - * to the first chunk - * - row0003 through row1002 (another chunk, another dangling reference) - * - * This causes OOME pretty quickly if we use MSLAB for upsert - * since each 2M chunk is held onto by a single reference. - */ - public void testUpsertMSLAB() throws Exception { - Configuration conf = HBaseConfiguration.create(); - conf.setBoolean(MemStore.USEMSLAB_KEY, true); - memstore = new MemStore(conf, KeyValue.COMPARATOR); - - int ROW_SIZE = 2048; - byte[] qualifier = new byte[ROW_SIZE - 4]; - - MemoryMXBean bean = ManagementFactory.getMemoryMXBean(); - for (int i = 0; i < 3; i++) { System.gc(); } - long usageBefore = bean.getHeapMemoryUsage().getUsed(); - - long size = 0; - long ts=0; - - for (int newValue = 0; newValue < 1000; newValue++) { - for (int row = newValue; row < newValue + 1000; row++) { - byte[] rowBytes = Bytes.toBytes(row); - size += memstore.updateColumnValue(rowBytes, FAMILY, qualifier, newValue, ++ts); - } - } - System.out.println("Wrote " + ts + " vals"); - for (int i = 0; i < 3; i++) { System.gc(); } - long usageAfter = bean.getHeapMemoryUsage().getUsed(); - System.out.println("Memory used: " + (usageAfter - usageBefore) - + " (heapsize: " + memstore.heapSize() + - " size: " + size + ")"); - } - ////////////////////////////////////////////////////////////////////////////// // Helpers ////////////////////////////////////////////////////////////////////////////// @@ -816,7 +765,7 @@ public class TestMemStore extends TestCase { */ public void testUpsertMemstoreSize() throws Exception { Configuration conf = HBaseConfiguration.create(); - memstore = new MemStore(conf, KeyValue.COMPARATOR); + memstore = new DefaultMemStore(conf, KeyValue.COMPARATOR); long oldSize = memstore.size.get(); List l = new ArrayList(); @@ -853,7 +802,7 @@ public class TestMemStore extends TestCase { try { EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest(); EnvironmentEdgeManager.injectEdge(edge); - MemStore memstore = new MemStore(); + DefaultMemStore memstore = new DefaultMemStore(); long t = memstore.timeOfOldestEdit(); assertEquals(t, Long.MAX_VALUE); @@ -939,7 +888,7 @@ public class TestMemStore extends TestCase { * @return How many rows we added. * @throws IOException */ - private int addRows(final MemStore hmc) { + private int addRows(final DefaultMemStore hmc) { return addRows(hmc, HConstants.LATEST_TIMESTAMP); } @@ -949,7 +898,7 @@ public class TestMemStore extends TestCase { * @return How many rows we added. * @throws IOException */ - private int addRows(final MemStore hmc, final long ts) { + private int addRows(final DefaultMemStore hmc, final long ts) { for (int i = 0; i < ROW_COUNT; i++) { long timestamp = ts == HConstants.LATEST_TIMESTAMP? System.currentTimeMillis(): ts; @@ -962,16 +911,15 @@ public class TestMemStore extends TestCase { return ROW_COUNT; } - private long runSnapshot(final MemStore hmc) throws UnexpectedException { + private long runSnapshot(final DefaultMemStore hmc) throws UnexpectedException { // Save off old state. - int oldHistorySize = hmc.getSnapshot().size(); - hmc.snapshot(); - KeyValueSkipListSet ss = hmc.getSnapshot(); + int oldHistorySize = hmc.snapshot.size(); + SnapshotInfo snapshotInfo = hmc.snapshot(); // Make some assertions about what just happened. - assertTrue("History size has not increased", oldHistorySize < ss.size()); + assertTrue("History size has not increased", oldHistorySize < hmc.snapshot.size()); long t = memstore.timeOfOldestEdit(); assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE); - hmc.clearSnapshot(ss); + hmc.clearSnapshot(snapshotInfo.getId()); return t; } @@ -998,7 +946,7 @@ public class TestMemStore extends TestCase { return new KeyValue(row, Bytes.toBytes("test_col"), null, HConstants.LATEST_TIMESTAMP, value); } - private static void addRows(int count, final MemStore mem) { + private static void addRows(int count, final DefaultMemStore mem) { long nanos = System.nanoTime(); for (int i = 0 ; i < count ; i++) { @@ -1018,7 +966,7 @@ public class TestMemStore extends TestCase { } - static void doScan(MemStore ms, int iteration) throws IOException { + static void doScan(DefaultMemStore ms, int iteration) throws IOException { long nanos = System.nanoTime(); KeyValueScanner s = ms.getScanners(0).get(0); s.seek(KeyValue.createFirstOnRow(new byte[]{})); @@ -1033,7 +981,7 @@ public class TestMemStore extends TestCase { public static void main(String [] args) throws IOException { MultiVersionConsistencyControl mvcc = new MultiVersionConsistencyControl(); - MemStore ms = new MemStore(); + DefaultMemStore ms = new DefaultMemStore(); long n1 = System.nanoTime(); addRows(25000, ms); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index 6936c5b..e13d581 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -28,6 +28,7 @@ import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.regionserver.MemStore.SnapshotInfo; import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; @@ -47,7 +48,7 @@ public class TestMemStoreChunkPool { @BeforeClass public static void setUpBeforeClass() throws Exception { - conf.setBoolean(MemStore.USEMSLAB_KEY, true); + conf.setBoolean(DefaultMemStore.USEMSLAB_KEY, true); conf.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f); chunkPoolDisabledBeforeTest = MemStoreChunkPool.chunkPoolDisabled; MemStoreChunkPool.chunkPoolDisabled = false; @@ -107,7 +108,7 @@ public class TestMemStoreChunkPool { byte[] qf5 = Bytes.toBytes("testqualifier5"); byte[] val = Bytes.toBytes("testval"); - MemStore memstore = new MemStore(); + DefaultMemStore memstore = new DefaultMemStore(); // Setting up memstore memstore.add(new KeyValue(row, fam, qf1, val)); @@ -115,8 +116,7 @@ public class TestMemStoreChunkPool { memstore.add(new KeyValue(row, fam, qf3, val)); // Creating a snapshot - memstore.snapshot(); - KeyValueSkipListSet snapshot = memstore.getSnapshot(); + SnapshotInfo snapshotInfo = memstore.snapshot(); assertEquals(3, memstore.snapshot.size()); // Adding value to "new" memstore @@ -124,7 +124,7 @@ public class TestMemStoreChunkPool { memstore.add(new KeyValue(row, fam, qf4, val)); memstore.add(new KeyValue(row, fam, qf5, val)); assertEquals(2, memstore.kvset.size()); - memstore.clearSnapshot(snapshot); + memstore.clearSnapshot(snapshotInfo.getId()); int chunkCount = chunkPool.getPoolSize(); assertTrue(chunkCount > 0); @@ -145,7 +145,7 @@ public class TestMemStoreChunkPool { byte[] qf7 = Bytes.toBytes("testqualifier7"); byte[] val = Bytes.toBytes("testval"); - MemStore memstore = new MemStore(); + DefaultMemStore memstore = new DefaultMemStore(); // Setting up memstore memstore.add(new KeyValue(row, fam, qf1, val)); @@ -153,8 +153,7 @@ public class TestMemStoreChunkPool { memstore.add(new KeyValue(row, fam, qf3, val)); // Creating a snapshot - memstore.snapshot(); - KeyValueSkipListSet snapshot = memstore.getSnapshot(); + SnapshotInfo snapshotInfo = memstore.snapshot(); assertEquals(3, memstore.snapshot.size()); // Adding value to "new" memstore @@ -167,7 +166,7 @@ public class TestMemStoreChunkPool { List scanners = memstore.getScanners(0); // Shouldn't putting back the chunks to pool,since some scanners are opening // based on their data - memstore.clearSnapshot(snapshot); + memstore.clearSnapshot(snapshotInfo.getId()); assertTrue(chunkPool.getPoolSize() == 0); @@ -181,8 +180,7 @@ public class TestMemStoreChunkPool { chunkPool.clearChunks(); // Creating another snapshot - memstore.snapshot(); - snapshot = memstore.getSnapshot(); + snapshotInfo = memstore.snapshot(); // Adding more value memstore.add(new KeyValue(row, fam, qf6, val)); memstore.add(new KeyValue(row, fam, qf7, val)); @@ -194,7 +192,7 @@ public class TestMemStoreChunkPool { } // Since no opening scanner, the chunks of snapshot should be put back to // pool - memstore.clearSnapshot(snapshot); + memstore.clearSnapshot(snapshotInfo.getId()); assertTrue(chunkPool.getPoolSize() > 0); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java index 8842b90..ad9f524 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java @@ -114,7 +114,7 @@ public class TestReversibleScanners { @Test public void testReversibleMemstoreScanner() throws IOException { - MemStore memstore = new MemStore(); + DefaultMemStore memstore = new DefaultMemStore(); writeMemstore(memstore); List scanners = memstore.getScanners(Long.MAX_VALUE); seekTestOfReversibleKeyValueScanner(scanners.get(0)); @@ -144,7 +144,7 @@ public class TestReversibleScanners { TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir( hfilePath).withFileContext(hFileContext).build(); - MemStore memstore = new MemStore(); + DefaultMemStore memstore = new DefaultMemStore(); writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1, writer2 }); @@ -234,7 +234,7 @@ public class TestReversibleScanners { TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir( hfilePath).withFileContext(hFileContext).build(); - MemStore memstore = new MemStore(); + DefaultMemStore memstore = new DefaultMemStore(); writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1, writer2 }); @@ -403,7 +403,7 @@ public class TestReversibleScanners { verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false); } - private StoreScanner getReversibleStoreScanner(MemStore memstore, + private StoreScanner getReversibleStoreScanner(DefaultMemStore memstore, StoreFile sf1, StoreFile sf2, Scan scan, ScanType scanType, ScanInfo scanInfo, int readPoint) throws IOException { List scanners = getScanners(memstore, sf1, sf2, null, @@ -472,7 +472,7 @@ public class TestReversibleScanners { assertEquals(null, kvHeap.peek()); } - private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore, + private ReversedKeyValueHeap getReversibleKeyValueHeap(DefaultMemStore memstore, StoreFile sf1, StoreFile sf2, byte[] startRow, int readPoint) throws IOException { List scanners = getScanners(memstore, sf1, sf2, startRow, @@ -482,7 +482,7 @@ public class TestReversibleScanners { return kvHeap; } - private List getScanners(MemStore memstore, StoreFile sf1, + private List getScanners(DefaultMemStore memstore, StoreFile sf1, StoreFile sf2, byte[] startRow, boolean doSeek, int readPoint) throws IOException { List fileScanners = StoreFileScanner @@ -625,7 +625,7 @@ public class TestReversibleScanners { } } - private static void writeMemstoreAndStoreFiles(MemStore memstore, + private static void writeMemstoreAndStoreFiles(DefaultMemStore memstore, final StoreFile.Writer[] writers) throws IOException { Random rand = new Random(); try { @@ -658,7 +658,7 @@ public class TestReversibleScanners { } } - private static void writeMemstore(MemStore memstore) throws IOException { + private static void writeMemstore(DefaultMemStore memstore) throws IOException { // Add half of the keyvalues to memstore for (int i = 0; i < ROWSIZE; i++) { for (int j = 0; j < QUALSIZE; j++) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 793b839..0e35393 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -43,7 +43,6 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -73,7 +72,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; -import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.util.Progressable; import org.junit.experimental.categories.Category; import org.mockito.Mockito; @@ -442,7 +440,7 @@ public class TestStore extends TestCase { this.store.snapshot(); flushStore(store, id++); assertEquals(storeFilessize, this.store.getStorefiles().size()); - assertEquals(0, this.store.memstore.kvset.size()); + assertEquals(0, ((DefaultMemStore)this.store.memstore).kvset.size()); } private void assertCheck() { @@ -452,189 +450,12 @@ public class TestStore extends TestCase { } } - ////////////////////////////////////////////////////////////////////////////// - // IncrementColumnValue tests - ////////////////////////////////////////////////////////////////////////////// - /* - * test the internal details of how ICV works, especially during a flush scenario. - */ - public void testIncrementColumnValue_ICVDuringFlush() - throws IOException, InterruptedException { - init(this.getName()); - - long oldValue = 1L; - long newValue = 3L; - this.store.add(new KeyValue(row, family, qf1, - System.currentTimeMillis(), - Bytes.toBytes(oldValue))); - - // snapshot the store. - this.store.snapshot(); - - // add other things: - this.store.add(new KeyValue(row, family, qf2, - System.currentTimeMillis(), - Bytes.toBytes(oldValue))); - - // update during the snapshot. - long ret = this.store.updateColumnValue(row, family, qf1, newValue); - - // memstore should have grown by some amount. - assertTrue(ret > 0); - - // then flush. - flushStore(store, id++); - assertEquals(1, this.store.getStorefiles().size()); - // from the one we inserted up there, and a new one - assertEquals(2, this.store.memstore.kvset.size()); - - // how many key/values for this row are there? - Get get = new Get(row); - get.addColumn(family, qf1); - get.setMaxVersions(); // all versions. - List results = new ArrayList(); - - results = HBaseTestingUtility.getFromStoreFile(store, get); - assertEquals(2, results.size()); - - long ts1 = results.get(0).getTimestamp(); - long ts2 = results.get(1).getTimestamp(); - - assertTrue(ts1 > ts2); - - assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0)))); - assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1)))); - } - @Override protected void tearDown() throws Exception { super.tearDown(); EnvironmentEdgeManagerTestHelper.reset(); } - public void testICV_negMemstoreSize() throws IOException { - init(this.getName()); - - long time = 100; - ManualEnvironmentEdge ee = new ManualEnvironmentEdge(); - ee.setValue(time); - EnvironmentEdgeManagerTestHelper.injectEdge(ee); - long newValue = 3L; - long size = 0; - - - size += this.store.add(new KeyValue(Bytes.toBytes("200909091000"), family, qf1, - System.currentTimeMillis(), - Bytes.toBytes(newValue))); - size += this.store.add(new KeyValue(Bytes.toBytes("200909091200"), family, qf1, - System.currentTimeMillis(), - Bytes.toBytes(newValue))); - size += this.store.add(new KeyValue(Bytes.toBytes("200909091300"), family, qf1, - System.currentTimeMillis(), - Bytes.toBytes(newValue))); - size += this.store.add(new KeyValue(Bytes.toBytes("200909091400"), family, qf1, - System.currentTimeMillis(), - Bytes.toBytes(newValue))); - size += this.store.add(new KeyValue(Bytes.toBytes("200909091500"), family, qf1, - System.currentTimeMillis(), - Bytes.toBytes(newValue))); - - - for ( int i = 0 ; i < 10000 ; ++i) { - newValue++; - - long ret = this.store.updateColumnValue(row, family, qf1, newValue); - long ret2 = this.store.updateColumnValue(row2, family, qf1, newValue); - - if (ret != 0) System.out.println("ret: " + ret); - if (ret2 != 0) System.out.println("ret2: " + ret2); - - assertTrue("ret: " + ret, ret >= 0); - size += ret; - assertTrue("ret2: " + ret2, ret2 >= 0); - size += ret2; - - - if (i % 1000 == 0) - ee.setValue(++time); - } - - long computedSize=0; - for (KeyValue kv : this.store.memstore.kvset) { - long kvsize = MemStore.heapSizeChange(kv, true); - //System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize()); - computedSize += kvsize; - } - assertEquals(computedSize, size); - } - - public void testIncrementColumnValue_SnapshotFlushCombo() throws Exception { - ManualEnvironmentEdge mee = new ManualEnvironmentEdge(); - EnvironmentEdgeManagerTestHelper.injectEdge(mee); - init(this.getName()); - - long oldValue = 1L; - long newValue = 3L; - this.store.add(new KeyValue(row, family, qf1, - EnvironmentEdgeManager.currentTimeMillis(), - Bytes.toBytes(oldValue))); - - // snapshot the store. - this.store.snapshot(); - - // update during the snapshot, the exact same TS as the Put (lololol) - long ret = this.store.updateColumnValue(row, family, qf1, newValue); - - // memstore should have grown by some amount. - assertTrue(ret > 0); - - // then flush. - flushStore(store, id++); - assertEquals(1, this.store.getStorefiles().size()); - assertEquals(1, this.store.memstore.kvset.size()); - - // now increment again: - newValue += 1; - this.store.updateColumnValue(row, family, qf1, newValue); - - // at this point we have a TS=1 in snapshot, and a TS=2 in kvset, so increment again: - newValue += 1; - this.store.updateColumnValue(row, family, qf1, newValue); - - // the second TS should be TS=2 or higher., even though 'time=1' right now. - - - // how many key/values for this row are there? - Get get = new Get(row); - get.addColumn(family, qf1); - get.setMaxVersions(); // all versions. - List results = new ArrayList(); - - results = HBaseTestingUtility.getFromStoreFile(store, get); - assertEquals(2, results.size()); - - long ts1 = results.get(0).getTimestamp(); - long ts2 = results.get(1).getTimestamp(); - - assertTrue(ts1 > ts2); - assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0)))); - assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1)))); - - mee.setValue(2); // time goes up slightly - newValue += 1; - this.store.updateColumnValue(row, family, qf1, newValue); - - results = HBaseTestingUtility.getFromStoreFile(store, get); - assertEquals(2, results.size()); - - ts1 = results.get(0).getTimestamp(); - ts2 = results.get(1).getTimestamp(); - - assertTrue(ts1 > ts2); - assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0)))); - assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1)))); - } - public void testHandleErrorsInFlush() throws Exception { LOG.info("Setting up a faulty file system that cannot write"); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 5aeb31b..5bed885 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -26,7 +26,6 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; -import java.util.SortedSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -69,10 +68,12 @@ import org.apache.hadoop.hbase.regionserver.FlushRequestListener; import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; +import org.apache.hadoop.hbase.regionserver.MemStore.SnapshotInfo; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; @@ -560,14 +561,14 @@ public class TestWALReplay { super(conf, store); } @Override - public List flushSnapshot(SortedSet snapshot, long cacheFlushId, - TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, MonitoredTask status) - throws IOException { + public List flushSnapshot(KeyValueScanner snapshotScanner, SnapshotInfo snapshotInfo, + long cacheFlushId, TimeRangeTracker snapshotTimeRangeTracker, MonitoredTask status) + throws IOException { if (throwExceptionWhenFlushing.get()) { throw new IOException("Simulated exception by tests"); } - return super.flushSnapshot(snapshot, cacheFlushId, snapshotTimeRangeTracker, - flushedSize, status); + return super.flushSnapshot(snapshotScanner, snapshotInfo, cacheFlushId, + snapshotTimeRangeTracker, status); } };