Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1371869) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -782,7 +782,7 @@ Two points must be known when working on this code: 1) It's not possible to use the 'kvTail' and 'snapshot' variables, as they are modified during a flush. - 2) The ideal implementation for performances would use the sub skip list + 2) The ideal implementation for performance would use the sub skip list implicitly pointed by the iterators 'kvsetIt' and 'snapshotIt'. Unfortunately the Java API does not offer a method to get it. So we're using the skip list that we kept when we created @@ -792,8 +792,29 @@ could see values that were not here when we read the list the first time. We expect that the new values will be skipped by the test on readpoint performed in the next() function. + 3) The above is problematic when the iterator(s) already skipped over a lot of + KVs with higher memstoreTS. So we'll first try to iterate "some" before + the iterators are rewound (as described in #2) + (See HBASE-6561) */ + if (peek() != null) { + final int N = 1000; // TODO: Make this configurable? Is 1000 a good default? + int i = 0; + while(peek() != null && comparator.compare(peek(), key) < 0 && i++ < N) { + next(); + } + if (peek() == null) { + // no KV found + return false; + } else if (i < N) { + // we broke out of the loop early, + // so the current key is what we are looking for + return true; + } + LOG.debug("reseeking after "+N+" iterations"); + } + kvTail = kvTail.tailSet(key); snapshotTail = snapshotTail.tailSet(key); Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1371869) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -706,9 +706,8 @@ if (scanner == null) { Scan scan = new Scan(); scan.setMaxVersions(scanInfo.getMaxVersions()); - scanner = new StoreScanner(this, scanInfo, scan, Collections.singletonList(new CollectionBackedScanner( - set, this.comparator)), ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(), - HConstants.OLDEST_TIMESTAMP); + scanner = new StoreScanner(this, scanInfo, scan, Collections.singletonList(memstoreScanner), + ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); } if (getHRegion().getCoprocessorHost() != null) { InternalScanner cpScanner = @@ -733,7 +732,7 @@ List kvs = new ArrayList(); boolean hasMore; do { - hasMore = scanner.next(kvs); + hasMore = scanner.next(kvs, this.compactionKVMax); if (!kvs.isEmpty()) { for (KeyValue kv : kvs) { // If we know that this KV is going to be included always, then let us