.../hbase/regionserver/AbstractMemStore.java | 12 +----------- .../hbase/regionserver/CompactingMemStore.java | 16 +++++++++++----- .../hadoop/hbase/regionserver/DefaultMemStore.java | 4 ++-- .../hbase/regionserver/MemStoreCompactor.java | 2 +- .../hadoop/hbase/regionserver/MemStoreScanner.java | 16 ++++++++-------- .../hadoop/hbase/regionserver/SegmentScanner.java | 10 +--------- .../hbase/regionserver/TestDefaultMemStore.java | 22 +++++++++++++--------- 7 files changed, 37 insertions(+), 45 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index 0f27e0e..d58a76c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NavigableSet; @@ -196,9 +195,7 @@ public abstract class AbstractMemStore implements MemStore { * @return a list containing a single memstore scanner. */ @Override - public List getScanners(long readPt) throws IOException { - return Collections. singletonList(new MemStoreScanner(this, readPt)); - } + public abstract List getScanners(long readPt) throws IOException; @Override public long getSnapshotSize() { @@ -441,13 +438,6 @@ public abstract class AbstractMemStore implements MemStore { protected abstract void checkActiveSize(); /** - * Returns a list of Store segment scanners, one per each store segment - * @param readPt the version number required to initialize the scanners - * @return a list of Store segment scanners, one per each store segment - */ - protected abstract List getListOfScanners(long readPt) throws IOException; - - /** * Returns an ordered list of segments from most recent to oldest in memstore * @return an ordered list of segments from most recent to oldest in memstore */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index ec5684d..5f05e4c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ThreadPoolExecutor; @@ -226,16 +227,21 @@ public class CompactingMemStore extends AbstractMemStore { /* * Scanners are ordered from 0 (oldest) to newest in increasing order. */ - protected List getListOfScanners(long readPt) throws IOException { + public List getScanners(long readPt) throws IOException { List pipelineList = pipeline.getSegments(); long order = pipelineList.size(); - LinkedList list = new LinkedList(); - list.add(getActive().getSegmentScanner(readPt, order+1)); + // The list of elements in pipeline + the active element + the snapshot segment + // TODO : This will change when the snapshot is made of more than one element + List list = new ArrayList(pipelineList.size() + 2); + int pos = 0; + list.add(pos, getActive().getSegmentScanner(readPt, order+1)); + pos++; for (Segment item : pipelineList) { - list.add(item.getSegmentScanner(readPt, order)); + list.add(pos, item.getSegmentScanner(readPt, order)); order--; + pos++; } - list.add(getSnapshot().getSegmentScanner(readPt, order)); + list.add(pos, getSnapshot().getSegmentScanner(readPt, order)); return list; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index cdc910e..17ab688 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -113,8 +113,8 @@ public class DefaultMemStore extends AbstractMemStore { /* * Scanners are ordered from 0 (oldest) to newest in increasing order. */ - protected List getListOfScanners(long readPt) throws IOException { - List list = new ArrayList(2); + public List getScanners(long readPt) throws IOException { + List list = new ArrayList(2); list.add(0, getActive().getSegmentScanner(readPt, 1)); list.add(1, getSnapshot().getSegmentScanner(readPt, 0)); return list; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 5b2876d..691ebb9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -67,7 +67,7 @@ class MemStoreCompactor { public boolean startCompaction() throws IOException { if (!compactingMemStore.hasCompactibleSegments()) return false; // no compaction on empty - List scanners = new ArrayList(); + List scanners = new ArrayList(); // get the list of segments from the pipeline versionedList = compactingMemStore.getCompactibleSegments(); // the list is marked with specific version diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java index 01a7ff3..2729afc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java @@ -57,7 +57,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { private long readPoint; // remember the initial version of the scanners list - List scanners; + List scanners; // pointer back to the relevant MemStore // is needed for shouldSeek() method private AbstractMemStore backwardReferenceToMemStore; @@ -73,7 +73,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { * @param type The scan type COMPACT_FORWARD should be used for compaction */ public MemStoreScanner(AbstractMemStore ms, long readPoint, Type type) throws IOException { - this(ms, ms.getListOfScanners(readPoint), readPoint, type); + this(ms, ms.getScanners(readPoint), readPoint, type); } /* Constructor used only when the scan usage is unknown @@ -82,7 +82,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { this(ms, readPt, Type.UNDEFINED); } - public MemStoreScanner(AbstractMemStore ms, List scanners, long readPoint, + public MemStoreScanner(AbstractMemStore ms, List scanners, long readPoint, Type type) throws IOException { super(); this.readPoint = readPoint; @@ -262,8 +262,8 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { return true; } - for (SegmentScanner sc : scanners) { - if (sc.shouldSeek(scan, oldestUnexpiredTS)) { + for (KeyValueScanner sc : scanners) { + if (sc.shouldUseScanner(scan, store, oldestUnexpiredTS)) { return true; } } @@ -275,7 +275,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { public String toString() { StringBuffer buf = new StringBuffer(); int i = 1; - for (SegmentScanner scanner : scanners) { + for (KeyValueScanner scanner : scanners) { buf.append("scanner (" + i + ") " + scanner.toString() + " ||| "); i++; } @@ -289,7 +289,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { */ private boolean restartBackwardHeap(Cell cell) throws IOException { boolean res = false; - for (SegmentScanner scan : scanners) { + for (KeyValueScanner scan : scanners) { res |= scan.seekToPreviousRow(cell); } this.backwardHeap = @@ -315,7 +315,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { forwardHeap = null; // before building the heap seek for the relevant key on the scanners, // for the heap to be built from the scanners correctly - for (SegmentScanner scan : scanners) { + for (KeyValueScanner scan : scanners) { if (toLast) { res |= scan.seekToLastRow(); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index a04c1da..1191f30 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -243,7 +243,7 @@ public class SegmentScanner implements KeyValueScanner { */ @Override public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { - return true; + return getSegment().shouldSeek(scan,oldestUnexpiredTS); } /** * This scanner is working solely on the in-memory MemStore therefore this @@ -305,14 +305,6 @@ public class SegmentScanner implements KeyValueScanner { // do nothing } - /** - * Returns whether the given scan should seek in this segment - * @return whether the given scan should seek in this segment - */ - public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) { - return getSegment().shouldSeek(scan,oldestUnexpiredTS); - } - protected Segment getSegment(){ return segment; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 1614462..04d0fce 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -232,15 +232,19 @@ public class TestDefaultMemStore { verifyScanAcrossSnapshot2(kv1, kv2); } - protected void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) - throws IOException { - List memstorescanners = this.memstore.getScanners(mvcc.getReadPoint()); - assertEquals(1, memstorescanners.size()); - final KeyValueScanner scanner = memstorescanners.get(0); - scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW)); - assertEquals(kv1, scanner.next()); - assertEquals(kv2, scanner.next()); - assertNull(scanner.next()); + protected void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) throws IOException { + KeyValueScanner scanner = null; + try { + scanner = new MemStoreScanner(this.memstore, mvcc.getReadPoint()); + scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW)); + assertEquals(kv1, scanner.next()); + assertEquals(kv2, scanner.next()); + assertNull(scanner.next()); + } catch (Exception e) { + if (scanner != null) { + scanner.close(); + } + } } protected void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected)