.../hbase/regionserver/CompactionPipeline.java | 2 + .../hbase/regionserver/MemStoreCompactor.java | 63 +++------------------- .../hadoop/hbase/regionserver/MemStoreScanner.java | 35 ++++++++++++ 3 files changed, 45 insertions(+), 55 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index e33ceae..504cbe4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -87,11 +87,13 @@ public class CompactionPipeline { */ public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment) { if(versionedList.getVersion() != version) { + LOG.info("Swapping failed as the version already changed"); return false; } LinkedList suffix; synchronized (pipeline){ if(versionedList.getVersion() != version) { + LOG.info("Swapping failed as the version already changed"); return false; } suffix = versionedList.getStoreSegments(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 88e067e..9834332 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -23,10 +23,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.MemStoreScanner.MemStoreIterator; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -43,11 +43,6 @@ class MemStoreCompactor { private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); private CompactingMemStore compactingMemStore; private MemStoreScanner scanner; // scanner for pipeline only - // scanner on top of MemStoreScanner that uses ScanQueryMatcher - private StoreScanner compactingScanner; - - // smallest read point for any ongoing MemStore scan - private long smallestReadPoint; // a static version of the segment list from the pipeline private VersionedSegmentsList versionedList; @@ -76,12 +71,9 @@ class MemStoreCompactor { scanners.add(segment.getSegmentScanner(Long.MAX_VALUE)); } scanner = - new MemStoreScanner(compactingMemStore, scanners, Long.MAX_VALUE, + new MemStoreScanner(compactingMemStore, scanners, compactingMemStore.getSmallestReadPoint(), MemStoreScanner.Type.COMPACT_FORWARD); - smallestReadPoint = compactingMemStore.getSmallestReadPoint(); - compactingScanner = createScanner(compactingMemStore.getStore()); - LOG.info("Starting the MemStore in-memory compaction for store " + compactingMemStore.getStore().getColumnFamilyName()); @@ -107,8 +99,6 @@ class MemStoreCompactor { isInterrupted.set(false); scanner.close(); scanner = null; - compactingScanner.close(); - compactingScanner = null; versionedList = null; } @@ -148,50 +138,13 @@ class MemStoreCompactor { } /** - * Creates the scanner for compacting the pipeline. - * - * @return the scanner - */ - private StoreScanner createScanner(Store store) throws IOException { - - Scan scan = new Scan(); - scan.setMaxVersions(); //Get all available versions - - StoreScanner internalScanner = - new StoreScanner(store, store.getScanInfo(), scan, Collections.singletonList(scanner), - ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP); - - return internalScanner; - } - - /** - * Updates the given single Segment using the internal store scanner, - * who in turn uses ScanQueryMatcher + * Updates the given single Segment using the memstore scanner */ private void compactSegments(Segment result) throws IOException { - - List kvs = new ArrayList(); - // get the limit to the size of the groups to be returned by compactingScanner - int compactionKVMax = compactingMemStore.getConfiguration().getInt( - HConstants.COMPACTION_KV_MAX, - HConstants.COMPACTION_KV_MAX_DEFAULT); - - ScannerContext scannerContext = - ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); - - boolean hasMore; - do { - hasMore = compactingScanner.next(kvs, scannerContext); - if (!kvs.isEmpty()) { - for (Cell c : kvs) { - // The scanner is doing all the elimination logic - // now we just copy it to the new segment - Cell newKV = result.maybeCloneWithAllocator(c); - result.internalAdd(newKV); - - } - kvs.clear(); - } - } while (hasMore && (!isInterrupted.get())); + MemStoreIterator iterator = new MemStoreIterator(scanner); + while (iterator.hasNext() && (!isInterrupted.get())) { + Cell newKV = result.maybeCloneWithAllocator(iterator.next()); + result.internalAdd(newKV); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java index dfcec25..4d05af1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.Iterator; import java.util.List; import org.apache.hadoop.hbase.Cell; @@ -345,4 +346,38 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { throw new IllegalStateException("Traversing forward with backward scan"); } } + /** + * A simple iterator that iterates over the Memstore scanner. Used in cases + * where we need to just iterate over the cells rather than doing a Storescanner based + * scan which uses the ScanQueryMatcher + */ + // TODO : Handle backward scanner + public static class MemStoreIterator implements Iterator { + + private MemStoreScanner memStoreScanner; + + public MemStoreIterator(MemStoreScanner memStoreScanner) { + this.memStoreScanner = memStoreScanner; + } + + @Override + public boolean hasNext() { + return this.memStoreScanner.peek() != null; + } + + @Override + public Cell next() { + try { + return this.memStoreScanner.next(); + } catch (IOException e) { + // throw error out? ideally no error should be thrown here + return null; + } + } + + @Override + public void remove() { + // No operation + } + } }