.../hbase/regionserver/MemStoreCompactor.java | 29 +++++++++++++--------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 88e067e..9790ecd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -42,9 +42,6 @@ class MemStoreCompactor { private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); private CompactingMemStore compactingMemStore; - private MemStoreScanner scanner; // scanner for pipeline only - // scanner on top of MemStoreScanner that uses ScanQueryMatcher - private StoreScanner compactingScanner; // smallest read point for any ongoing MemStore scan private long smallestReadPoint; @@ -75,17 +72,19 @@ class MemStoreCompactor { for (Segment segment : versionedList.getStoreSegments()) { scanners.add(segment.getSegmentScanner(Long.MAX_VALUE)); } - scanner = + //scanner for pipeline only + MemStoreScanner scanner = new MemStoreScanner(compactingMemStore, scanners, Long.MAX_VALUE, MemStoreScanner.Type.COMPACT_FORWARD); smallestReadPoint = compactingMemStore.getSmallestReadPoint(); - compactingScanner = createScanner(compactingMemStore.getStore()); + // scanner on top of MemStoreScanner that uses ScanQueryMatcher + StoreScanner compactingScanner = createScanner(compactingMemStore.getStore(), scanner); LOG.info("Starting the MemStore in-memory compaction for store " + compactingMemStore.getStore().getColumnFamilyName()); - doCompaction(); + doCompaction(scanner, compactingScanner); return true; } @@ -102,8 +101,10 @@ class MemStoreCompactor { /** * Close the scanners and clear the pointers in order to allow good * garbage collection + * @param compactingScanner + * @param scanner */ - private void releaseResources() { + private void releaseResources(MemStoreScanner scanner, StoreScanner compactingScanner) { isInterrupted.set(false); scanner.close(); scanner = null; @@ -116,8 +117,10 @@ class MemStoreCompactor { * The worker thread performs the compaction asynchronously. * The solo (per compactor) thread only reads the compaction pipeline. * There is at most one thread per memstore instance. + * @param compactingScanner + * @param scanner */ - private void doCompaction() { + private void doCompaction(MemStoreScanner scanner, StoreScanner compactingScanner) { ImmutableSegment result = SegmentFactory.instance() // create the scanner .createImmutableSegment( @@ -127,7 +130,7 @@ class MemStoreCompactor { // the compaction processing try { // Phase I: create the compacted MutableCellSetSegment - compactSegments(result); + compactSegments(result, compactingScanner); // Phase II: swap the old compaction pipeline if (!isInterrupted.get()) { @@ -141,7 +144,7 @@ class MemStoreCompactor { Thread.currentThread().interrupt(); return; } finally { - releaseResources(); + releaseResources(scanner, compactingScanner); compactingMemStore.setInMemoryFlushInProgress(false); } @@ -149,10 +152,11 @@ class MemStoreCompactor { /** * Creates the scanner for compacting the pipeline. + * @param scanner * * @return the scanner */ - private StoreScanner createScanner(Store store) throws IOException { + private StoreScanner createScanner(Store store, MemStoreScanner scanner) throws IOException { Scan scan = new Scan(); scan.setMaxVersions(); //Get all available versions @@ -167,8 +171,9 @@ class MemStoreCompactor { /** * Updates the given single Segment using the internal store scanner, * who in turn uses ScanQueryMatcher + * @param compactingScanner */ - private void compactSegments(Segment result) throws IOException { + private void compactSegments(Segment result, StoreScanner compactingScanner) throws IOException { List kvs = new ArrayList(); // get the limit to the size of the groups to be returned by compactingScanner