Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (revision 1372603) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (working copy) @@ -110,7 +110,7 @@ .getScannersForStoreFiles(filesToCompact, false, false, true); // Get some configs - int compactionKVMax = getConf().getInt("hbase.hstore.compaction.kv.max", 10); + int compactionKVMax = getConf().getInt(HConstants.COMPACTION_KV_MAX, 10); Compression.Algorithm compression = store.getFamily().getCompression(); // Avoid overriding compression setting for major compactions if the user // has not specified it separately Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1372603) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -738,9 +738,9 @@ if (scanner == null) { Scan scan = new Scan(); scan.setMaxVersions(scanInfo.getMaxVersions()); - scanner = new StoreScanner(this, scanInfo, scan, Collections.singletonList(new CollectionBackedScanner( - set, this.comparator)), ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(), - HConstants.OLDEST_TIMESTAMP); + scanner = new StoreScanner(this, scanInfo, scan, + Collections.singletonList(memstoreScanner), ScanType.MINOR_COMPACT, + this.region.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); } if (getHRegion().getCoprocessorHost() != null) { InternalScanner cpScanner = @@ -752,6 +752,7 @@ scanner = cpScanner; } try { + int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10); // TODO: We can fail in the below block before we complete adding this // flush to list of store files. Add cleanup of anything put on filesystem // if we fail. @@ -765,7 +766,7 @@ List kvs = new ArrayList(); boolean hasMore; do { - hasMore = scanner.next(kvs); + hasMore = scanner.next(kvs, compactionKVMax); if (!kvs.isEmpty()) { for (KeyValue kv : kvs) { // If we know that this KV is going to be included always, then let us Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1372603) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -674,6 +674,10 @@ private KeyValue kvsetNextRow = null; private KeyValue snapshotNextRow = null; + // last iterated KVs for kvset and snapshot (to restore iterator state after reseek) + private KeyValue kvsetItRow = null; + private KeyValue snapshotItRow = null; + // iterator based scanning. private Iterator kvsetIt; private Iterator snapshotIt; @@ -717,17 +721,28 @@ snapshotAtCreation = snapshot; } - protected KeyValue getNext(Iterator it) { + private KeyValue getNext(Iterator it) { long readPoint = MultiVersionConsistencyControl.getThreadReadPoint(); - while (it.hasNext()) { - KeyValue v = it.next(); - if (v.getMemstoreTS() <= readPoint) { - return v; + KeyValue v = null; + try { + while (it.hasNext()) { + v = it.next(); + if (v.getMemstoreTS() <= readPoint) { + return v; + } } + + return null; + } finally { + if (v != null) { + if (it == snapshotIt) { + snapshotItRow = v; + } else { + kvsetItRow = v; + } + } } - - return null; } /** @@ -748,6 +763,8 @@ // if tailSet can't find anything, SortedSet is empty (not null). kvTail = kvsetAtCreation.tailSet(key); snapshotTail = snapshotAtCreation.tailSet(key); + kvsetItRow = null; + snapshotItRow = null; return seekInSubLists(key); } @@ -784,20 +801,15 @@ Two points must be known when working on this code: 1) It's not possible to use the 'kvTail' and 'snapshot' variables, as they are modified during a flush. - 2) The ideal implementation for performances would use the sub skip list + 2) The ideal implementation for performance would use the sub skip list implicitly pointed by the iterators 'kvsetIt' and 'snapshotIt'. Unfortunately the Java API does not offer a method to - get it. So we're using the skip list that we kept when we created - the iterators. As these iterators could have been moved forward after - their creation, we're doing a kind of rewind here. It has a small - performance impact (we're using a wider list than necessary), and we - could see values that were not here when we read the list the first - time. We expect that the new values will be skipped by the test on - readpoint performed in the next() function. + get it. So we remember the last keys we iterated to and restore + the reseeked set to at least that point. */ - kvTail = kvTail.tailSet(key); - snapshotTail = snapshotTail.tailSet(key); + kvTail = kvsetAtCreation.tailSet(getHighest(key, kvsetItRow)); + snapshotTail = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)); return seekInSubLists(key); } @@ -838,7 +850,7 @@ * This uses comparator.compare() to compare the KeyValue using the memstore * comparator. */ - protected KeyValue getLowest(KeyValue first, KeyValue second) { + private KeyValue getLowest(KeyValue first, KeyValue second) { if (first == null && second == null) { return null; } @@ -849,6 +861,22 @@ return (first != null ? first : second); } + /* + * Returns the higher of the two key values, or null if they are both null. + * This uses comparator.compare() to compare the KeyValue using the memstore + * comparator. + */ + private KeyValue getHighest(KeyValue first, KeyValue second) { + if (first == null && second == null) { + return null; + } + if (first != null && second != null) { + int compare = comparator.compare(first, second); + return (compare > 0 ? first : second); + } + return (first != null ? first : second); + } + public synchronized void close() { this.kvsetNextRow = null; this.snapshotNextRow = null; Index: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1372603) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -202,6 +202,9 @@ /** Parameter name for how often a region should should perform a major compaction */ public static final String MAJOR_COMPACTION_PERIOD = "hbase.hregion.majorcompaction"; + /** Parameter name for the maximum batch of KVs to be used in flushes and compactions */ + public static final String COMPACTION_KV_MAX = "hbase.hstore.compaction.kv.max"; + /** Parameter name for HBase instance root directory */ public static final String HBASE_DIR = "hbase.rootdir";