Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1372603) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -738,9 +738,9 @@ if (scanner == null) { Scan scan = new Scan(); scan.setMaxVersions(scanInfo.getMaxVersions()); - scanner = new StoreScanner(this, scanInfo, scan, Collections.singletonList(new CollectionBackedScanner( - set, this.comparator)), ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(), - HConstants.OLDEST_TIMESTAMP); + scanner = new StoreScanner(this, scanInfo, scan, + Collections.singletonList(memstoreScanner), ScanType.MINOR_COMPACT, + this.region.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); } if (getHRegion().getCoprocessorHost() != null) { InternalScanner cpScanner = @@ -752,6 +752,7 @@ scanner = cpScanner; } try { + int compactionKVMax = conf.getInt("hbase.hstore.compaction.kv.max", 10); // TODO: We can fail in the below block before we complete adding this // flush to list of store files. Add cleanup of anything put on filesystem // if we fail. @@ -765,7 +766,7 @@ List kvs = new ArrayList(); boolean hasMore; do { - hasMore = scanner.next(kvs); + hasMore = scanner.next(kvs, compactionKVMax); if (!kvs.isEmpty()) { for (KeyValue kv : kvs) { // If we know that this KV is going to be included always, then let us Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1372603) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -674,6 +674,10 @@ private KeyValue kvsetNextRow = null; private KeyValue snapshotNextRow = null; + // last iterated KVs for kvser and snapshot (to restore iterator state after reseek) + private KeyValue kvsetItRow = null; + private KeyValue snapshotItRow = null; + // iterator based scanning. private Iterator kvsetIt; private Iterator snapshotIt; @@ -720,14 +724,25 @@ protected KeyValue getNext(Iterator it) { long readPoint = MultiVersionConsistencyControl.getThreadReadPoint(); - while (it.hasNext()) { - KeyValue v = it.next(); - if (v.getMemstoreTS() <= readPoint) { - return v; + KeyValue v = null; + try { + while (it.hasNext()) { + v = it.next(); + if (v.getMemstoreTS() <= readPoint) { + return v; + } } + + return null; + } finally { + if (v != null) { + if (it == snapshotIt) { + snapshotItRow = v; + } else { + kvsetItRow = v; + } + } } - - return null; } /** @@ -748,6 +763,8 @@ // if tailSet can't find anything, SortedSet is empty (not null). kvTail = kvsetAtCreation.tailSet(key); snapshotTail = snapshotAtCreation.tailSet(key); + kvsetItRow = null; + snapshotItRow = null; return seekInSubLists(key); } @@ -784,20 +801,15 @@ Two points must be known when working on this code: 1) It's not possible to use the 'kvTail' and 'snapshot' variables, as they are modified during a flush. - 2) The ideal implementation for performances would use the sub skip list + 2) The ideal implementation for performance would use the sub skip list implicitly pointed by the iterators 'kvsetIt' and 'snapshotIt'. Unfortunately the Java API does not offer a method to - get it. So we're using the skip list that we kept when we created - the iterators. As these iterators could have been moved forward after - their creation, we're doing a kind of rewind here. It has a small - performance impact (we're using a wider list than necessary), and we - could see values that were not here when we read the list the first - time. We expect that the new values will be skipped by the test on - readpoint performed in the next() function. + get it. So we remember the last keys we iterated to and restore + the reseeked set to at least that point. */ - kvTail = kvTail.tailSet(key); - snapshotTail = snapshotTail.tailSet(key); + kvTail = kvsetAtCreation.tailSet(getHighest(key, kvsetItRow)); + snapshotTail = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)); return seekInSubLists(key); } @@ -849,6 +861,22 @@ return (first != null ? first : second); } + /* + * Returns the higher of the two key values, or null if they are both null. + * This uses comparator.compare() to compare the KeyValue using the memstore + * comparator. + */ + protected KeyValue getHighest(KeyValue first, KeyValue second) { + if (first == null && second == null) { + return null; + } + if (first != null && second != null) { + int compare = comparator.compare(first, second); + return (compare > 0 ? first : second); + } + return (first != null ? first : second); + } + public synchronized void close() { this.kvsetNextRow = null; this.snapshotNextRow = null;