Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (revision 1373058) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (working copy) @@ -110,7 +110,7 @@ .getScannersForStoreFiles(filesToCompact, false, false, true); // Get some configs - int compactionKVMax = getConf().getInt("hbase.hstore.compaction.kv.max", 10); + int compactionKVMax = getConf().getInt(HConstants.COMPACTION_KV_MAX, 10); Compression.Algorithm compression = store.getFamily().getCompression(); // Avoid overriding compression setting for major compactions if the user // has not specified it separately Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1373058) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -738,9 +738,9 @@ if (scanner == null) { Scan scan = new Scan(); scan.setMaxVersions(scanInfo.getMaxVersions()); - scanner = new StoreScanner(this, scanInfo, scan, Collections.singletonList(new CollectionBackedScanner( - set, this.comparator)), ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(), - HConstants.OLDEST_TIMESTAMP); + scanner = new StoreScanner(this, scanInfo, scan, + Collections.singletonList(memstoreScanner), ScanType.MINOR_COMPACT, + this.region.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); } if (getHRegion().getCoprocessorHost() != null) { InternalScanner cpScanner = @@ -752,6 +752,7 @@ scanner = cpScanner; } try { + int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10); // TODO: We can fail in the below block before we complete adding this // flush to list of store files. Add cleanup of anything put on filesystem // if we fail. @@ -765,7 +766,7 @@ List kvs = new ArrayList(); boolean hasMore; do { - hasMore = scanner.next(kvs); + hasMore = scanner.next(kvs, compactionKVMax); if (!kvs.isEmpty()) { for (KeyValue kv : kvs) { // If we know that this KV is going to be included always, then let us Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1373058) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -682,10 +682,6 @@ private KeyValueSkipListSet kvsetAtCreation; private KeyValueSkipListSet snapshotAtCreation; - // Sub lists on which we're iterating - private SortedSet kvTail; - private SortedSet snapshotTail; - // the pre-calculated KeyValue to be returned by peek() or next() private KeyValue theNext; @@ -717,7 +713,7 @@ snapshotAtCreation = snapshot; } - protected KeyValue getNext(Iterator it) { + private KeyValue getNext(Iterator it) { long readPoint = MultiVersionConsistencyControl.getThreadReadPoint(); while (it.hasNext()) { @@ -746,20 +742,9 @@ // kvset and snapshot will never be null. // if tailSet can't find anything, SortedSet is empty (not null). - kvTail = kvsetAtCreation.tailSet(key); - snapshotTail = snapshotAtCreation.tailSet(key); + kvsetIt = kvsetAtCreation.tailSet(key).iterator(); + snapshotIt = snapshotAtCreation.tailSet(key).iterator(); - return seekInSubLists(key); - } - - - /** - * (Re)initialize the iterators after a seek or a reseek. - */ - private synchronized boolean seekInSubLists(KeyValue key){ - kvsetIt = kvTail.iterator(); - snapshotIt = snapshotTail.iterator(); - kvsetNextRow = getNext(kvsetIt); snapshotNextRow = getNext(snapshotIt); @@ -784,22 +769,28 @@ Two points must be known when working on this code: 1) It's not possible to use the 'kvTail' and 'snapshot' variables, as they are modified during a flush. - 2) The ideal implementation for performances would use the sub skip list - implicitly pointed by the iterators 'kvsetIt' and - 'snapshotIt'. Unfortunately the Java API does not offer a method to - get it. So we're using the skip list that we kept when we created - the iterators. As these iterators could have been moved forward after - their creation, we're doing a kind of rewind here. It has a small - performance impact (we're using a wider list than necessary), and we - could see values that were not here when we read the list the first - time. We expect that the new values will be skipped by the test on - readpoint performed in the next() function. + 2) The ideal implementation for performance would use the sub skip list + implicitly pointed by the iterators 'kvsetIt' and 'snapshotIt'. + Unfortunately the Java API does not offer a method to get it. + So the reseeked sets are restored to at least that KV was previously iterated to. + If the iterators are exhausted the reseek request can simply be ignored. */ - kvTail = kvTail.tailSet(key); - snapshotTail = snapshotTail.tailSet(key); + if (kvsetIt.hasNext()) { + kvsetIt = kvsetAtCreation.tailSet(getHighest(key, kvsetIt.next())).iterator(); + kvsetNextRow = getNext(kvsetIt); + } else { + kvsetNextRow = null; + } + if (snapshotIt.hasNext()) { + snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotIt.next())).iterator(); + snapshotNextRow = getNext(snapshotIt); + } else { + snapshotNextRow = null; + } - return seekInSubLists(key); + theNext = getLowest(kvsetNextRow, snapshotNextRow); + return (theNext != null); } @@ -833,22 +824,34 @@ return ret; } - /* - * Returns the lower of the two key values, or null if they are both null. - * This uses comparator.compare() to compare the KeyValue using the memstore - * comparator. - */ - protected KeyValue getLowest(KeyValue first, KeyValue second) { + private boolean isLower(KeyValue first, KeyValue second) { if (first == null && second == null) { - return null; + return true; } if (first != null && second != null) { int compare = comparator.compare(first, second); - return (compare <= 0 ? first : second); + return compare <= 0; } - return (first != null ? first : second); + return first != null; } + /* + * Returns the lower of the two key values, or null if they are both null. + * This uses comparator.compare() to compare the KeyValue using the memstore + * comparator. + */ + private KeyValue getLowest(KeyValue first, KeyValue second) { + return isLower(first, second) ? first : second; + } + /* + * Returns the higher of the two key values, or null if they are both null. + * This uses comparator.compare() to compare the KeyValue using the memstore + * comparator. + */ + private KeyValue getHighest(KeyValue first, KeyValue second) { + return isLower(first, second) ? second : first; + } + public synchronized void close() { this.kvsetNextRow = null; this.snapshotNextRow = null; Index: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1373058) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -202,6 +202,9 @@ /** Parameter name for how often a region should should perform a major compaction */ public static final String MAJOR_COMPACTION_PERIOD = "hbase.hregion.majorcompaction"; + /** Parameter name for the maximum batch of KVs to be used in flushes and compactions */ + public static final String COMPACTION_KV_MAX = "hbase.hstore.compaction.kv.max"; + /** Parameter name for HBase instance root directory */ public static final String HBASE_DIR = "hbase.rootdir";