Index: src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java =================================================================== --- src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java (revision 801872) +++ src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java (working copy) @@ -42,7 +42,7 @@ private final Log LOG = LogFactory.getLog(this.getClass()); private MemStore memstore; private static final int ROW_COUNT = 10; - private static final int QUALIFIER_COUNT = 10; + private static final int QUALIFIER_COUNT = ROW_COUNT; private static final byte [] FAMILY = Bytes.toBytes("column"); private static final byte [] CONTENTS_BASIC = Bytes.toBytes("contents:basic"); private static final String CONTENTSTR = "contentstr"; @@ -82,6 +82,8 @@ while (s.next(result)) { LOG.info(result); count++; + // Row count is same as column count. + assertEquals(rowCount, result.size()); result.clear(); } } finally { @@ -98,6 +100,8 @@ // Assert the stuff is coming out in right order. assertTrue(Bytes.compareTo(Bytes.toBytes(count), result.get(0).getRow()) == 0); count++; + // Row count is same as column count. + assertEquals(rowCount, result.size()); if (count == 2) { this.memstore.snapshot(); LOG.info("Snapshotted"); @@ -108,6 +112,33 @@ s.close(); } assertEquals(rowCount, count); + // Assert that new values are seen in kvset as we scan. + s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, + this.memstore.comparator, null, memstorescanners); + count = 0; + int snapshotIndex = 5; + try { + while (s.next(result)) { + LOG.info(result); + // Assert the stuff is coming out in right order. + assertTrue(Bytes.compareTo(Bytes.toBytes(count), result.get(0).getRow()) == 0); + // Row count is same as column count. + assertEquals("count=" + count + ", result=" + result, + rowCount, result.size()); + count++; + if (count == snapshotIndex) { + this.memstore.snapshot(); + this.memstore.clearSnapshot(this.memstore.getSnapshot()); + // Added more rows into kvset. + addRows(this.memstore); + LOG.info("Snapshotted, cleared it and then added values"); + } + result.clear(); + } + } finally { + s.close(); + } + assertEquals(rowCount, count); } /** Index: src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 801872) +++ src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -145,7 +145,6 @@ * @return true if there are more rows, false if scanner is done */ public synchronized boolean next(List outResult) throws IOException { - List results = new ArrayList(); KeyValue peeked = this.heap.peek(); if (peeked == null) { close(); @@ -153,6 +152,7 @@ } matcher.setRow(peeked.getRow()); KeyValue kv; + List results = new ArrayList(); while((kv = this.heap.peek()) != null) { QueryMatcher.MatchCode qcode = matcher.match(kv); switch(qcode) { @@ -162,7 +162,6 @@ continue; case DONE: - // copy jazz outResult.addAll(results); return true; @@ -198,7 +197,6 @@ if (!results.isEmpty()) { // copy jazz outResult.addAll(results); - return true; } Index: src/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 801872) +++ src/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -527,9 +527,8 @@ KeyValueScanner [] getScanners() { this.lock.readLock().lock(); try { - KeyValueScanner [] scanners = new KeyValueScanner[2]; - scanners[0] = new MemStoreScanner(this.kvset); - scanners[1] = new MemStoreScanner(this.snapshot); + KeyValueScanner [] scanners = new KeyValueScanner[1]; + scanners[0] = new MemStoreScanner(); return scanners; } finally { this.lock.readLock().unlock(); @@ -603,18 +602,17 @@ /* * MemStoreScanner implements the KeyValueScanner. - * It lets the caller scan the contents of a memstore. - * This behaves as if it were a real scanner but does not maintain position - * in the passed memstore tree. + * It lets the caller scan the contents of a memstore -- both current + * map and snapshot. + * This behaves as if it were a real scanner but does not maintain position. */ protected class MemStoreScanner implements KeyValueScanner { - private final NavigableSet kvs; - private KeyValue current = null; private List result = new ArrayList(); private int idx = 0; + private KeyValue firstOnNextRow = null; - MemStoreScanner(final NavigableSet s) { - this.kvs = s; + MemStoreScanner() { + super(); } public boolean seek(KeyValue key) { @@ -623,7 +621,7 @@ close(); return false; } - this.current = key; + this.firstOnNextRow = key; return cacheNextRow(); } catch(Exception e) { close(); @@ -652,35 +650,66 @@ } /** - * @return True if we successfully cached a NavigableSet aligned on - * next row. + * @return True if successfully cached a next row. */ boolean cacheNextRow() { - SortedSet keys; + this.result.clear(); + this.idx = 0; + // Prevent snapshot being cleared while caching a row. + lock.readLock().lock(); try { - keys = this.kvs.tailSet(this.current); - } catch (Exception e) { - close(); - return false; + // Look at each set, kvset and snapshot. + // Both look for matching entries for this.current row returning what + // they + // have as next row after this.current (or null if nothing in set or if + // nothing follows. + KeyValue kvsetNextRow = cacheNextRow(kvset); + KeyValue snapshotNextRow = cacheNextRow(snapshot); + if (kvsetNextRow == null && snapshotNextRow == null) { + // Nothing more in memstore but we might have gotten current row + // results + // Indicate at end of store by setting next row to null. + this.firstOnNextRow = null; + return !this.result.isEmpty(); + } else if (kvsetNextRow != null && snapshotNextRow != null) { + // Set current at the lowest of the two values. + int compare = comparator.compare(kvsetNextRow, snapshotNextRow); + this.firstOnNextRow = (compare <= 0) ? kvsetNextRow : snapshotNextRow; + } else { + this.firstOnNextRow = kvsetNextRow != null ? kvsetNextRow + : snapshotNextRow; + } + return true; + } finally { + lock.readLock().unlock(); } - if (keys == null || keys.isEmpty()) { - close(); - return false; - } - this.current = null; - byte [] row = keys.first().getRow(); - for (KeyValue kv: keys) { - if (comparator.compareRows(kv, row) != 0) { - this.current = kv; + } + + /* + * See if set has entries for the this.current row. If so, + * add them to this.result. + * @param set Set to examine + * @return Next row in passed set or null if nothing in this + * passed set + */ + private KeyValue cacheNextRow(final NavigableSet set) { + if (this.firstOnNextRow == null || set.isEmpty()) return null; + SortedSet tail = set.tailSet(this.firstOnNextRow); + if (tail == null || tail.isEmpty()) return null; + KeyValue first = tail.first(); + KeyValue nextRow = null; + for (KeyValue kv: tail) { + if (comparator.compareRows(first, kv) != 0) { + nextRow = kv; break; } - result.add(kv); + this.result.add(kv); } - return true; + return nextRow; } public void close() { - current = null; + firstOnNextRow = null; idx = 0; if (!result.isEmpty()) { result.clear();