diff --git src/main/java/org/apache/hadoop/hbase/KeyValue.java src/main/java/org/apache/hadoop/hbase/KeyValue.java index b8fb085..50e8b28 100644 --- src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -1684,34 +1684,26 @@ public class KeyValue implements Writable, HeapSize { protected Object clone() throws CloneNotSupportedException { return new KVComparator(); } - - /** - * @return Comparator that ignores timestamps; useful counting versions. - */ - public KVComparator getComparatorIgnoringTimestamps() { - KVComparator c = null; - try { - c = (KVComparator)this.clone(); - c.getRawComparator().ignoreTimestamp = true; - } catch (CloneNotSupportedException e) { - LOG.error("Not supported", e); - } - return c; + + public KVComparator getComparatorIgnoringRowKey() { + return getComparatorWithRowKeyMode(KeyComparator.RowCompareMode.IGNORE_ROWKEY); } - - /** - * @return Comparator that ignores key type; useful checking deletes - */ - public KVComparator getComparatorIgnoringType() { + public Comparator getComparatorForRowKeyOnly() { + return new RowComparator(this); + } + + private KVComparator getComparatorWithRowKeyMode( + KeyComparator.RowCompareMode mode) { KVComparator c = null; try { c = (KVComparator)this.clone(); - c.getRawComparator().ignoreType = true; + c.getRawComparator().rowCompareMode = mode; } catch (CloneNotSupportedException e) { - LOG.error("Not supported", e); + throw new AssertionError(e); } return c; } + } /** @@ -2040,18 +2032,29 @@ public class KeyValue implements Writable, HeapSize { */ public static class KeyComparator implements RawComparator, SamePrefixComparator { - volatile boolean ignoreTimestamp = false; - volatile boolean ignoreType = false; + private enum RowCompareMode { + NORMAL, + /** The inverse of the above */ + IGNORE_ROWKEY + }; + + RowCompareMode rowCompareMode = RowCompareMode.NORMAL; + /** + * Function starts comparing at "key": + * (row len) (row) (cf len) (cf) (qual) (timestamp) (type) + */ public int compare(byte[] left, int loffset, int llength, byte[] right, int roffset, int rlength) { - // Compare row - short lrowlength = Bytes.toShort(left, loffset); short rrowlength = Bytes.toShort(right, roffset); - int compare = compareRows(left, loffset + Bytes.SIZEOF_SHORT, - lrowlength, right, roffset + Bytes.SIZEOF_SHORT, rrowlength); - if (compare != 0) { - return compare; + if (rowCompareMode != RowCompareMode.IGNORE_ROWKEY) { + short lrowlength = Bytes.toShort(left, loffset); + + int compare = compareRows(left, loffset + Bytes.SIZEOF_SHORT, + lrowlength, right, roffset + Bytes.SIZEOF_SHORT, rrowlength); + if (compare != 0) { + return compare; + } } // Compare the rest of the two KVs without making any assumptions about @@ -2166,26 +2169,22 @@ public class KeyValue implements Writable, HeapSize { private int compareTimestampAndType(byte[] left, int loffset, int llength, byte[] right, int roffset, int rlength, byte ltype, byte rtype) { int compare; - if (!this.ignoreTimestamp) { - // Get timestamps. - long ltimestamp = Bytes.toLong(left, - loffset + (llength - TIMESTAMP_TYPE_SIZE)); - long rtimestamp = Bytes.toLong(right, - roffset + (rlength - TIMESTAMP_TYPE_SIZE)); - compare = compareTimestamps(ltimestamp, rtimestamp); - if (compare != 0) { - return compare; - } - } - if (!this.ignoreType) { - // Compare types. Let the delete types sort ahead of puts; i.e. types - // of higher numbers sort before those of lesser numbers. Maximum (255) - // appears ahead of everything, and minimum (0) appears after - // everything. - return (0xff & rtype) - (0xff & ltype); + // Get timestamps. + long ltimestamp = Bytes.toLong(left, + loffset + (llength - TIMESTAMP_TYPE_SIZE)); + long rtimestamp = Bytes.toLong(right, + roffset + (rlength - TIMESTAMP_TYPE_SIZE)); + compare = compareTimestamps(ltimestamp, rtimestamp); + if (compare != 0) { + return compare; } - return 0; + + // Compare types. Let the delete types sort ahead of puts; i.e. types + // of higher numbers sort before those of lesser numbers. Maximum (255) + // appears ahead of everything, and minimum (0) appears after + // everything. + return (0xff & rtype) - (0xff & ltype); } public int compare(byte[] left, byte[] right) { diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index dfd29ee..a01ce1b 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2484,6 +2484,7 @@ public class HRegion implements HeapSize { // , Writable{ * This handles the consistency control on its own, but the caller * should already have locked updatesLock.readLock(). This also does * not check the families for validity. + * It is assumed that all of the entries correspond to a single row. * * @param familyMap Map of kvs per family * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction. @@ -2509,8 +2510,8 @@ public class HRegion implements HeapSize { // , Writable{ Store store = getStore(family); for (KeyValue kv: edits) { kv.setMemstoreTS(localizedWriteEntry.getWriteNumber()); - size += store.add(kv); } + size += store.addKvsInSingleRow(edits); } } finally { if (freemvcc) { diff --git src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java index 51df1ee..4b6eba9 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java @@ -20,14 +20,23 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.ForwardingIterator; +import com.google.common.collect.Iterators; import java.util.Collection; import java.util.Comparator; import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Map.Entry; +import java.util.List; import java.util.NavigableSet; import java.util.SortedSet; -import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicInteger; /** * A {@link java.util.Set} of {@link KeyValue}s implemented on top of a @@ -44,14 +53,38 @@ import java.util.concurrent.ConcurrentSkipListMap; * get and set and won't throw ConcurrentModificationException when iterating. */ class KeyValueSkipListSet implements NavigableSet { - private final ConcurrentNavigableMap delegatee; + private final Comparator comparatorOnlyRowKey; + private final KVComparator comparatorIgnoreRowKey; + + private final ConcurrentSkipListMap> + rowMap; + + private static final Function, Iterator> + MAP_TO_VALUE_ITERATOR = new Function, Iterator>() { + @Override + public Iterator apply( + ConcurrentSkipListMap arg0) { + return arg0.values().iterator(); + } + }; + + private static final Function, Iterator> + MAP_TO_DESCENDING_VALUE_ITERATOR = new Function, Iterator>() { + @Override + public Iterator apply( + ConcurrentSkipListMap arg0) { + return arg0.descendingMap().values().iterator(); + } + }; + + + KeyValueSkipListSet(final KeyValue.KVComparator c) { - this.delegatee = new ConcurrentSkipListMap(c); - } - - KeyValueSkipListSet(final ConcurrentNavigableMap m) { - this.delegatee = m; + this.comparatorOnlyRowKey = c.getComparatorForRowKeyOnly(); + this.comparatorIgnoreRowKey = c.getComparatorIgnoringRowKey(); + this.rowMap = new ConcurrentSkipListMap>( + comparatorOnlyRowKey); } public KeyValue ceiling(KeyValue e) { @@ -59,7 +92,11 @@ class KeyValueSkipListSet implements NavigableSet { } public Iterator descendingIterator() { - return this.delegatee.descendingMap().values().iterator(); + Iterator> descRowIter = + rowMap.descendingMap().values().iterator(); + Iterator> descEntriesOfDescRows = + Iterators.transform(descRowIter, MAP_TO_DESCENDING_VALUE_ITERATOR); + return Iterators.concat(descEntriesOfDescRows); } public NavigableSet descendingSet() { @@ -70,21 +107,64 @@ class KeyValueSkipListSet implements NavigableSet { throw new UnsupportedOperationException("Not implemented"); } + public KeyValue getNextRow(KeyValue kv) { + return rowMap.higherKey(kv); + } + + public KeyValue getPreviousRow(KeyValue firstOnRow) { + return rowMap.lowerKey(firstOnRow); + } + + public SortedSet headSet(final KeyValue toElement) { - return headSet(toElement, false); + throw new UnsupportedOperationException("Not implemented"); } public NavigableSet headSet(final KeyValue toElement, boolean inclusive) { - return new KeyValueSkipListSet(this.delegatee.headMap(toElement, inclusive)); + throw new UnsupportedOperationException("Not implemented"); } - + public KeyValue higher(KeyValue e) { throw new UnsupportedOperationException("Not implemented"); } public Iterator iterator() { - return this.delegatee.values().iterator(); + Iterator> rowIter = + rowMap.values().iterator(); + Iterator> entriesOfRows = + Iterators.transform(rowIter, MAP_TO_VALUE_ITERATOR); + return Iterators.concat(entriesOfRows); + } + + public Iterator tailIterator(KeyValue key) { + // Iterator starts with this row + Iterator> rowIter = + rowMap.tailMap(key, true).values().iterator(); + + if (!rowIter.hasNext()) { + return Iterators.emptyIterator(); + } + + Iterator firstRowIter = null; + + // Pull off the first row + ConcurrentSkipListMap firstRow = rowIter.next(); + if (!firstRow.isEmpty()) { + KeyValue kvOnFirstRow = firstRow.firstKey(); + if (comparatorOnlyRowKey.compare(kvOnFirstRow, key) == 0) { + firstRowIter = firstRow.tailMap(key).values().iterator(); + } else { + firstRowIter = firstRow.values().iterator(); + } + } + + Iterator> nextRowIters = Iterators.transform(rowIter, MAP_TO_VALUE_ITERATOR); + if (firstRowIter != null) { + return Iterators.concat(firstRowIter, Iterators.concat(nextRowIters)); + } else { + return Iterators.concat(nextRowIters); + } } public KeyValue lower(KeyValue e) { @@ -113,48 +193,90 @@ class KeyValueSkipListSet implements NavigableSet { } public NavigableSet tailSet(KeyValue fromElement, boolean inclusive) { - return new KeyValueSkipListSet(this.delegatee.tailMap(fromElement, inclusive)); + throw new UnsupportedOperationException("Not implemented"); } public Comparator comparator() { throw new UnsupportedOperationException("Not implemented"); } + @Override public KeyValue first() { - return this.delegatee.get(this.delegatee.firstKey()); + Entry> entry = rowMap.firstEntry(); + if (entry == null) { + throw new NoSuchElementException(); + } + return entry.getValue().firstEntry().getValue(); } public KeyValue last() { - return this.delegatee.get(this.delegatee.lastKey()); + throw new UnsupportedOperationException("Not implemented"); } public boolean add(KeyValue e) { - return this.delegatee.put(e, e) == null; + ConcurrentSkipListMap row = getOrAddRow(e); + // TODO: unfortunately the old Key object is left around forever + boolean added = row.put(e, e) == null; + return added; + } + + private ConcurrentSkipListMap getOrAddRow(KeyValue e) { + ConcurrentSkipListMap row = rowMap.get(e); + if (row != null) { + return row; + } + + row = new ConcurrentSkipListMap( + comparatorIgnoreRowKey); + ConcurrentSkipListMap otherThreadWon = + rowMap.putIfAbsent(e, row); + if (otherThreadWon != null) { + return otherThreadWon; + } else { + return row; + } + } + + public long addAllSingleRow(List toAdd) { + if (toAdd.isEmpty()) return 0; + // TODO: add assert to verify + long ret = 0; + KeyValue first = toAdd.get(0); + ConcurrentSkipListMap row = getOrAddRow(first); + for (KeyValue kv : toAdd) { + boolean added = row.put(kv, kv) == null; + ret += MemStore.heapSizeChange(kv, added); + } + return ret; } + public boolean addAll(Collection c) { throw new UnsupportedOperationException("Not implemented"); } public void clear() { - this.delegatee.clear(); + throw new UnsupportedOperationException("Not implemented"); } - public boolean contains(Object o) { - //noinspection SuspiciousMethodCalls - return this.delegatee.containsKey(o); - } public boolean containsAll(Collection c) { throw new UnsupportedOperationException("Not implemented"); } public boolean isEmpty() { - return this.delegatee.isEmpty(); + // TODO: this isn't actually correct, if we add and remove + return this.rowMap.isEmpty(); } + @Override public boolean remove(Object o) { - return this.delegatee.remove(o) != null; + Preconditions.checkArgument(o instanceof KeyValue); + ConcurrentSkipListMap row = rowMap.get(o); + if (row == null) { + return false; + } + return row.remove(o) != null; } public boolean removeAll(Collection c) { @@ -166,11 +288,28 @@ class KeyValueSkipListSet implements NavigableSet { } public KeyValue get(KeyValue kv) { - return this.delegatee.get(kv); + ConcurrentSkipListMap row = rowMap.get(kv); + if (row == null) { + return null; + } + return row.get(kv); } + public boolean contains(Object o) { + ConcurrentSkipListMap row = rowMap.get(o); + if (row == null) { + return false; + } + return row.containsKey(o); + } + + // WARNING: this is slow public int size() { - return this.delegatee.size(); + int size = 0; + for (ConcurrentSkipListMap row : rowMap.values()) { + size += row.size(); + } + return size; } public Object[] toArray() { @@ -180,4 +319,18 @@ class KeyValueSkipListSet implements NavigableSet { public T[] toArray(T[] a) { throw new UnsupportedOperationException("Not implemented"); } + + public Iterator rowTailIterator(KeyValue kvInRow) { + ConcurrentSkipListMap row = rowMap.get(kvInRow); + if (row == null) { + return Iterators.emptyIterator(); + } + return row.tailMap(kvInRow).values().iterator(); + } + + @Override + public String toString() { + return rowMap.toString(); + } + } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 03ed7f1..389214a 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -44,6 +44,8 @@ import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; +import com.google.common.collect.Lists; + /** * The MemStore holds in-memory modifications to the Store. Modifications * are {@link KeyValue}s. When asked to flush, current memstore is moved @@ -78,12 +80,6 @@ public class MemStore implements HeapSize { final KeyValue.KVComparator comparator; - // Used comparing versions -- same r/c and ts but different type. - final KeyValue.KVComparator comparatorIgnoreType; - - // Used comparing versions -- same r/c and type but different timestamp. - final KeyValue.KVComparator comparatorIgnoreTimestamp; - // Used to track own heapSize final AtomicLong size; @@ -109,9 +105,7 @@ public class MemStore implements HeapSize { final KeyValue.KVComparator c) { this.conf = conf; this.comparator = c; - this.comparatorIgnoreTimestamp = - this.comparator.getComparatorIgnoringTimestamps(); - this.comparatorIgnoreType = this.comparator.getComparatorIgnoringType(); + this.kvset = new KeyValueSkipListSet(c); this.snapshot = new KeyValueSkipListSet(c); timeRangeTracker = new TimeRangeTracker(); @@ -210,22 +204,39 @@ public class MemStore implements HeapSize { long add(final KeyValue kv) { this.lock.readLock().lock(); try { + // TODO: remove this method KeyValue toAdd = maybeCloneWithAllocator(kv); - return internalAdd(toAdd); + return internalAdd(Collections.singletonList(toAdd)); + } finally { + this.lock.readLock().unlock(); + } + } + + long addKvsInSingleRow(List edits) { + List cloned = Lists.newArrayListWithCapacity(edits.size()); + for (KeyValue kv : edits) { + cloned.add(maybeCloneWithAllocator(kv)); + } + this.lock.readLock().lock(); + try { + return internalAdd(cloned); } finally { this.lock.readLock().unlock(); } } + /** * Internal version of add() that doesn't clone KVs with the * allocator, and doesn't take the lock. * * Callers should ensure they already have the read lock taken */ - private long internalAdd(final KeyValue toAdd) { - long s = heapSizeChange(toAdd, this.kvset.add(toAdd)); - timeRangeTracker.includeTimestamp(toAdd); + private long internalAdd(final List toAdd) { + long s = this.kvset.addAllSingleRow(toAdd); + for (KeyValue kv : toAdd) { + timeRangeTracker.includeTimestamp(kv); + } this.size.addAndGet(s); return s; } @@ -308,7 +319,7 @@ public class MemStore implements HeapSize { KeyValue getNextRow(final KeyValue kv) { this.lock.readLock().lock(); try { - return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot)); + return getLowest(kvset.getNextRow(kv), this.snapshot.getNextRow(kv)); } finally { this.lock.readLock().unlock(); } @@ -329,28 +340,6 @@ public class MemStore implements HeapSize { return comparator.compareRows(a, b) <= 0? a: b; } - /* - * @param key Find row that follows this one. If null, return first. - * @param map Set to look in for a row beyond row. - * @return Next row or null if none found. If one found, will be a new - * KeyValue -- can be destroyed by subsequent calls to this method. - */ - private KeyValue getNextRow(final KeyValue key, - final NavigableSet set) { - KeyValue result = null; - SortedSet tail = key == null? set: set.tailSet(key); - // Iterate until we fall into the next row; i.e. move off current row - for (KeyValue kv: tail) { - if (comparator.compareRows(kv, key) <= 0) - continue; - // Note: Not suppressing deletes or expired cells. Needs to be handled - // by higher up functions. - result = kv; - break; - } - return result; - } - /** * @param state column/delete tracking state */ @@ -368,7 +357,7 @@ public class MemStore implements HeapSize { * @param set * @param state Accumulates deletes and candidates. */ - private void getRowKeyAtOrBefore(final NavigableSet set, + private void getRowKeyAtOrBefore(final KeyValueSkipListSet set, final GetClosestRowBeforeTracker state) { if (set.isEmpty()) { return; @@ -389,15 +378,16 @@ public class MemStore implements HeapSize { * @param state * @return True if we found a candidate walking this row. */ - private boolean walkForwardInSingleRow(final SortedSet set, + private boolean walkForwardInSingleRow(final KeyValueSkipListSet set, final KeyValue firstOnRow, final GetClosestRowBeforeTracker state) { boolean foundCandidate = false; - SortedSet tail = set.tailSet(firstOnRow); - if (tail.isEmpty()) return foundCandidate; - for (Iterator i = tail.iterator(); i.hasNext();) { + + Iterator i = set.rowTailIterator(firstOnRow); + while (i.hasNext()) { KeyValue kv = i.next(); // Did we go beyond the target row? If so break. - if (state.isTooFar(kv, firstOnRow)) break; + assert !state.isTooFar(kv, firstOnRow); + if (state.isExpired(kv)) { i.remove(); continue; @@ -417,7 +407,7 @@ public class MemStore implements HeapSize { * @param set * @param state */ - private void getRowKeyBefore(NavigableSet set, + private void getRowKeyBefore(KeyValueSkipListSet set, final GetClosestRowBeforeTracker state) { KeyValue firstOnRow = state.getTargetKey(); for (Member p = memberOfPreviousRow(set, state, firstOnRow); @@ -458,31 +448,32 @@ public class MemStore implements HeapSize { KeyValue firstKv = KeyValue.createFirstOnRow( row, family, qualifier); // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit. - SortedSet snSs = snapshot.tailSet(firstKv); - if (!snSs.isEmpty()) { - KeyValue snKv = snSs.first(); + Iterator iterInRow = snapshot.rowTailIterator(firstKv); + if (iterInRow.hasNext()) { + KeyValue snKv = iterInRow.next(); + assert snKv.matchingRow(firstKv); // is there a matching KV in the snapshot? - if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) { + if (snKv.matchingQualifier(firstKv)) { if (snKv.getTimestamp() == now) { - // poop, + // There is a KV in the snapshot for the same row/col/ts now += 1; } } } - + // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary. // But the timestamp should also be max(now, mostRecentTsInMemstore) // so we cant add the new KV w/o knowing what's there already, but we also // want to take this chance to delete some kvs. So two loops (sad) - SortedSet ss = kvset.tailSet(firstKv); - Iterator it = ss.iterator(); + Iterator it = kvset.rowTailIterator(firstKv); while ( it.hasNext() ) { KeyValue kv = it.next(); + assert kv.matchingRow(firstKv); // if this isnt the row we are interested in, then bail: - if (!kv.matchingColumn(family,qualifier) || !kv.matchingRow(firstKv) ) { + if (!kv.matchingColumn(family,qualifier)) { break; // rows dont match, bail. } @@ -492,7 +483,7 @@ public class MemStore implements HeapSize { now = kv.getTimestamp(); } } - + // create or update (upsert) a new KeyValue with // 'now' and a 0 memstoreTS == immediately visible return upsert(Arrays.asList( @@ -555,7 +546,7 @@ public class MemStore implements HeapSize { // hitting OOME - see TestMemStore.testUpsertMSLAB for a // test that triggers the pathological case if we don't avoid MSLAB // here. - long addedSize = internalAdd(kv); + long addedSize = internalAdd(Collections.singletonList(kv)); // Get the KeyValues for the row/family/qualifier regardless of timestamp. // For this case we want to clean up any other puts @@ -563,8 +554,8 @@ public class MemStore implements HeapSize { kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength()); - SortedSet ss = kvset.tailSet(firstKv); - Iterator it = ss.iterator(); + + Iterator it = kvset.rowTailIterator(firstKv); while ( it.hasNext() ) { KeyValue cur = it.next(); @@ -572,10 +563,7 @@ public class MemStore implements HeapSize { // ignore the one just put in continue; } - // if this isn't the row we are interested in, then bail - if (!kv.matchingRow(cur)) { - break; - } + assert kv.matchingRow(cur); // if the qualifier matches and it's a put, remove it if (kv.matchingQualifier(cur)) { @@ -601,8 +589,8 @@ public class MemStore implements HeapSize { */ private static class Member { final KeyValue kv; - final NavigableSet set; - Member(final NavigableSet s, final KeyValue kv) { + final KeyValueSkipListSet set; + Member(final KeyValueSkipListSet s, final KeyValue kv) { this.kv = kv; this.set = s; } @@ -616,17 +604,13 @@ public class MemStore implements HeapSize { * member in. * @return Null or member of row previous to firstOnRow */ - private Member memberOfPreviousRow(NavigableSet set, + private Member memberOfPreviousRow(KeyValueSkipListSet set, final GetClosestRowBeforeTracker state, final KeyValue firstOnRow) { - NavigableSet head = set.headSet(firstOnRow, false); - if (head.isEmpty()) return null; - for (Iterator i = head.descendingIterator(); i.hasNext();) { - KeyValue found = i.next(); - if (state.isExpired(found)) { - i.remove(); - continue; - } - return new Member(head, found); + + KeyValue prevRow = set.getPreviousRow(firstOnRow); + if (prevRow != null) { + // we used to do some removal here. necessary anymore? + return new Member(set, prevRow); } return null; } @@ -681,9 +665,11 @@ public class MemStore implements HeapSize { volatile KeyValueSkipListSet snapshotAtCreation; // Sub lists on which we're iterating + /* + * TODO: perhaps a more efficient way about this private SortedSet kvTail; private SortedSet snapshotTail; - +*/ // the pre-calculated KeyValue to be returned by peek() or next() private KeyValue theNext; @@ -744,23 +730,20 @@ public class MemStore implements HeapSize { // kvset and snapshot will never be null. // if tailSet can't find anything, SortedSet is empty (not null). - kvTail = kvsetAtCreation.tailSet(key); - snapshotTail = snapshotAtCreation.tailSet(key); - - return seekInSubLists(key); + kvsetIt = kvsetAtCreation.tailIterator(key); + snapshotIt = snapshotAtCreation.tailIterator(key); + //LOG.info("seek to key=" + key); + return seekInSubLists(); } /** * (Re)initialize the iterators after a seek or a reseek. */ - private synchronized boolean seekInSubLists(KeyValue key){ - kvsetIt = kvTail.iterator(); - snapshotIt = snapshotTail.iterator(); - + private synchronized boolean seekInSubLists(){ kvsetNextRow = getNext(kvsetIt); snapshotNextRow = getNext(snapshotIt); - + //LOG.info("kvsetNextRow=" + kvsetNextRow); // Calculate the next value theNext = getLowest(kvsetNextRow, snapshotNextRow); @@ -794,10 +777,16 @@ public class MemStore implements HeapSize { readpoint performed in the next() function. */ - kvTail = kvTail.tailSet(key); - snapshotTail = snapshotTail.tailSet(key); + // TODO: this should be made more efficient again! rather than + // reseek from the top of memstore, seek forward in the row + // tailmap + // Also need to think whether this is correct! + //LOG.info("Reseek to key=" + key); + //LOG.info("kvset =" + kvset); + kvsetIt = kvsetAtCreation.tailIterator(key); + snapshotIt = snapshotAtCreation.tailIterator(key); - return seekInSubLists(key); + return seekInSubLists(); } @@ -889,7 +878,7 @@ public class MemStore implements HeapSize { * @param notpresent True if the kv was NOT present in the set. * @return Size */ - long heapSizeChange(final KeyValue kv, final boolean notpresent) { + static long heapSizeChange(final KeyValue kv, final boolean notpresent) { return notpresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()): 0; diff --git src/main/java/org/apache/hadoop/hbase/regionserver/Store.java src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 1de3b12..2297619 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -385,6 +385,17 @@ public class Store extends SchemaConfigured implements HeapSize { lock.readLock().unlock(); } } + + + protected long addKvsInSingleRow(List edits) { + lock.readLock().lock(); + try { + return this.memstore.addKvsInSingleRow(edits); + } finally { + lock.readLock().unlock(); + } + } + /** * Adds a value to the memstore diff --git src/test/java/org/apache/hadoop/hbase/TestKeyValue.java src/test/java/org/apache/hadoop/hbase/TestKeyValue.java index f97d2ba..506928f 100644 --- src/test/java/org/apache/hadoop/hbase/TestKeyValue.java +++ src/test/java/org/apache/hadoop/hbase/TestKeyValue.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase; import java.io.IOException; +import java.util.Comparator; import java.util.Set; import java.util.TreeSet; @@ -407,6 +408,44 @@ public class TestKeyValue extends TestCase { kv.toString().replaceAll("=[0-9]+$", "=0")); } + public void testComparingOnlyRow() { + KeyValue kv_a_a = new KeyValue( + Bytes.toBytes("row-a"), + Bytes.toBytes("cf-a"), + Bytes.toBytes("qual-a"), + 12345L, + Bytes.toBytes("val-a")); + KeyValue kv_a_b = new KeyValue( + Bytes.toBytes("row-a"), + Bytes.toBytes("cf-a"), + Bytes.toBytes("qual-b"), + 12345L, + Bytes.toBytes("val-a")); + KeyValue kv_b_a = new KeyValue( + Bytes.toBytes("row-b"), + Bytes.toBytes("cf-a"), + Bytes.toBytes("qual-a"), + 12345L, + Bytes.toBytes("val-a")); + KeyValue kv_b_b = new KeyValue( + Bytes.toBytes("row-b"), + Bytes.toBytes("cf-a"), + Bytes.toBytes("qual-b"), + 12345L, + Bytes.toBytes("val-a")); + + + Comparator rowOnly = new KVComparator().getComparatorForRowKeyOnly(); + KVComparator ignoreRow= new KVComparator().getComparatorIgnoringRowKey(); + assertTrue(rowOnly.compare(kv_a_a, kv_a_a) == 0); + assertTrue(rowOnly.compare(kv_a_a, kv_a_b) == 0); + assertFalse(rowOnly.compare(kv_a_a, kv_b_a) == 0); + assertFalse(rowOnly.compare(kv_a_a, kv_b_b) == 0); + + assertTrue(ignoreRow.compare(kv_a_a, kv_a_a) == 0); + assertTrue(ignoreRow.compare(kv_a_a, kv_b_a) == 0); + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 97c624d..0a697fc 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -86,6 +86,7 @@ import org.apache.hadoop.hbase.util.Threads; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.google.common.base.Joiner; import com.google.common.collect.Lists; @@ -1120,13 +1121,19 @@ public class TestHRegion extends HBaseTestCase { put.add(fam1, null, value); put.add(fam2, null, value); region.put(put); + + System.err.println("fam1 kvset:" + + Joiner.on(",").join(region.getStore(fam1).memstore.kvset)); + System.err.println("fam2 kvset:" + + Joiner.on(",").join(region.getStore(fam2).memstore.kvset)); Scan scan = new Scan(); scan.addFamily(fam1).addFamily(fam2); InternalScanner s = region.getScanner(scan); List results = new ArrayList(); s.next(results); - assertTrue(Bytes.equals(rowA, results.get(0).getRow())); + assertEquals(Bytes.toString(rowA), + Bytes.toString(results.get(0).getRow())); results.clear(); s.next(results); @@ -3019,6 +3026,7 @@ public class TestHRegion extends HBaseTestCase { while (scanner.next(res)) ; //long end = System.nanoTime(); //System.out.println("memStoreEmpty=" + memStoreEmpty + ", time=" + (end - start)/1000000D); + System.err.println("res: " + res); assertEquals(1L, res.size()); } diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueSkipListSet.java src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueSkipListSet.java index d9158be..8f6797a 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueSkipListSet.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueSkipListSet.java @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.util.ArrayList; import java.util.Iterator; import java.util.SortedSet; @@ -27,8 +28,12 @@ import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import junit.framework.TestCase; + +import org.junit.Assert; import org.junit.experimental.categories.Category; +import com.google.common.collect.Lists; + @Category(SmallTests.class) public class TestKeyValueSkipListSet extends TestCase { private final KeyValueSkipListSet kvsls = @@ -36,7 +41,7 @@ public class TestKeyValueSkipListSet extends TestCase { protected void setUp() throws Exception { super.setUp(); - this.kvsls.clear(); + assert(this.kvsls.size() == 0); } public void testAdd() throws Exception { @@ -122,7 +127,7 @@ public class TestKeyValueSkipListSet extends TestCase { assertEquals(total, count); } - public void testHeadTail() throws Exception { + public void testRowTail() throws Exception { byte [] bytes = Bytes.toBytes(getName()); byte [] value1 = Bytes.toBytes("1"); byte [] value2 = Bytes.toBytes("2"); @@ -133,19 +138,91 @@ public class TestKeyValueSkipListSet extends TestCase { if (i == 1) splitter = kv; this.kvsls.add(kv); } - SortedSet tail = this.kvsls.tailSet(splitter); + + ArrayList tail = Lists.newArrayList( + kvsls.tailIterator(splitter)); assertEquals(2, tail.size()); - SortedSet head = this.kvsls.headSet(splitter); - assertEquals(1, head.size()); // Now ensure that we get back right answer even when we do tail or head. // Now overwrite with a new value. for (int i = 0; i < total; i++) { this.kvsls.add(new KeyValue(bytes, bytes, Bytes.toBytes("" + i), value2)); } - tail = this.kvsls.tailSet(splitter); - assertTrue(Bytes.equals(tail.first().getValue(), value2)); - head = this.kvsls.headSet(splitter); - assertTrue(Bytes.equals(head.first().getValue(), value2)); + tail = Lists.newArrayList(kvsls.tailIterator(splitter)); + assertTrue(Bytes.equals(tail.get(0).getValue(), value2)); + } + + public void testTailIterators() throws Exception { + KeyValue kvs[] = new KeyValue[] { + new KeyValue( + Bytes.toBytes("row0"), Bytes.toBytes("fam"), Bytes.toBytes("qual0")), + new KeyValue( + Bytes.toBytes("row0"), Bytes.toBytes("fam"), Bytes.toBytes("qual1")), + new KeyValue( + Bytes.toBytes("row1"), Bytes.toBytes("fam"), Bytes.toBytes("qual0")), + new KeyValue( + Bytes.toBytes("row1"), Bytes.toBytes("fam"), Bytes.toBytes("qual1")), + new KeyValue( + Bytes.toBytes("row2"), Bytes.toBytes("fam"), Bytes.toBytes("qual1")) }; + kvsls.add(kvs[0]); + kvsls.add(kvs[1]); + kvsls.add(kvs[2]); + kvsls.add(kvs[3]); + + // tail iterator on the known keys + checkIterator(kvsls.tailIterator(kvs[0]), kvs[0], kvs[1], kvs[2], kvs[3]); + checkIterator(kvsls.tailIterator(kvs[1]), kvs[1], kvs[2], kvs[3]); + checkIterator(kvsls.tailIterator(kvs[2]), kvs[2], kvs[3]); + checkIterator(kvsls.tailIterator(kvs[3]), kvs[3]); + + // Right after one of the keys + checkIterator(kvsls.tailIterator(kvs[0].createLastOnRowCol()), + kvs[1], kvs[2], kvs[3]); + + // First on first row + checkIterator(kvsls.tailIterator( + KeyValue.createFirstOnRow(kvs[0].getRow())), + kvs[0], kvs[1], kvs[2], kvs[3]); + + // Last on last row + checkIterator(kvsls.tailIterator( + KeyValue.createLastOnRow(kvs[3].getRow()))); + + // After last row + checkIterator(kvsls.tailIterator(kvs[4])); + + ////////////////////////////// + // row iterators + /////////////////////////////// + + // tail iterator on the known keys + checkIterator(kvsls.rowTailIterator(kvs[0]), kvs[0], kvs[1]); + checkIterator(kvsls.rowTailIterator(kvs[1]), kvs[1]); + checkIterator(kvsls.rowTailIterator(kvs[2]), kvs[2], kvs[3]); + checkIterator(kvsls.rowTailIterator(kvs[3]), kvs[3]); + + // Right after one of the keys + checkIterator(kvsls.rowTailIterator(kvs[0].createLastOnRowCol()), + kvs[1]); + + // First on first row + checkIterator(kvsls.rowTailIterator( + KeyValue.createFirstOnRow(kvs[0].getRow())), + kvs[0], kvs[1]); + + // Last on last row + checkIterator(kvsls.rowTailIterator( + KeyValue.createLastOnRow(kvs[3].getRow()))); + + // After last row + checkIterator(kvsls.rowTailIterator(kvs[4])); + + } + + private void checkIterator(Iterator iter, + KeyValue ... expected) { + ArrayList resultList = Lists.newArrayList(iter); + KeyValue[] result = resultList.toArray(new KeyValue[0]); + Assert.assertArrayEquals(expected, result); } @org.junit.Rule diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java index b1214f6..74ba8bf 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java @@ -70,10 +70,16 @@ public class TestMemStore extends TestCase { byte [] bytes = Bytes.toBytes(getName()); KeyValue kv = new KeyValue(bytes, bytes, bytes, bytes); this.memstore.add(kv); + assertEquals(1, this.memstore.kvset.size()); + KeyValue found = this.memstore.kvset.first(); + assertTrue(Bytes.toString(found.getValue()), + Bytes.equals(kv.getValue(), found.getValue())); + + byte [] other = Bytes.toBytes("somethingelse"); KeyValue samekey = new KeyValue(bytes, bytes, bytes, other); this.memstore.add(samekey); - KeyValue found = this.memstore.kvset.first(); + found = this.memstore.kvset.first(); assertEquals(1, this.memstore.kvset.size()); assertTrue(Bytes.toString(found.getValue()), Bytes.equals(samekey.getValue(), found.getValue())); @@ -378,7 +384,7 @@ public class TestMemStore extends TestCase { private static class ReadOwnWritesTester extends Thread { - static final int NUM_TRIES = 1000; + static final int NUM_TRIES = 200000; final byte[] row;