.../hbase/regionserver/AbstractMemStore.java | 9 ++ .../hbase/regionserver/CompactingMemStore.java | 24 +++- .../hadoop/hbase/regionserver/DefaultMemStore.java | 17 ++- .../apache/hadoop/hbase/regionserver/HStore.java | 46 ++------ .../apache/hadoop/hbase/regionserver/MemStore.java | 6 +- .../regionserver/MemStoreCompactorIterator.java | 2 +- .../hadoop/hbase/regionserver/MemStoreScanner.java | 129 +++++++-------------- .../hbase/regionserver/MemstoreScannerType.java | 25 ++++ .../hbase/regionserver/ReversedStoreScanner.java | 9 ++ .../apache/hadoop/hbase/regionserver/Store.java | 32 +---- .../hadoop/hbase/regionserver/StoreScanner.java | 15 ++- .../hadoop/hbase/client/TestFromClientSide.java | 1 - .../hbase/regionserver/TestCompactingMemStore.java | 10 +- .../TestCompactingToCellArrayMapMemStore.java | 41 +++---- .../hbase/regionserver/TestDefaultMemStore.java | 57 +++++---- .../hadoop/hbase/regionserver/TestHRegion.java | 2 - .../hbase/regionserver/TestMemStoreChunkPool.java | 4 +- .../hbase/regionserver/TestReversibleScanners.java | 15 ++- 18 files changed, 216 insertions(+), 228 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index 5e9f632..111ac75 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -458,4 +458,13 @@ public abstract class AbstractMemStore implements MemStore { */ protected abstract List getSegments() throws IOException; + /** + * @param readPt readPt that the scanners can see. + * @param memstoreScannerType the memstore scanner type + * @return scanner over the memstore. This might include scanner over the snapshot when one is + * present. + */ + @VisibleForTesting + protected abstract List getScanners(long readPt, + MemstoreScannerType memstoreScannerType) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 177f222..2ebb0ee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -233,7 +233,8 @@ public class CompactingMemStore extends AbstractMemStore { /* * Scanners are ordered from 0 (oldest) to newest in increasing order. */ - public List getScanners(long readPt) throws IOException { + public List getScanners(Cell seekKey, long readPt, + MemstoreScannerType scannerType) throws IOException { List pipelineList = pipeline.getSegments(); long order = pipelineList.size(); // The list of elements in pipeline + the active element + the snapshot segment @@ -245,7 +246,26 @@ public class CompactingMemStore extends AbstractMemStore { order--; } list.add(this.snapshot.getScanner(readPt, order)); - return Collections. singletonList(new MemStoreScanner(getComparator(), list)); + return Collections. singletonList( + new MemStoreScanner(seekKey, getComparator(), list, scannerType)); + } + + @VisibleForTesting + public List getScanners(long readPt, + MemstoreScannerType scannerType) throws IOException { + List pipelineList = pipeline.getSegments(); + long order = pipelineList.size(); + // The list of elements in pipeline + the active element + the snapshot segment + // TODO : This will change when the snapshot is made of more than one element + List list = new ArrayList(pipelineList.size() + 2); + list.add(this.active.getScanner(readPt, order + 1)); + for (Segment item : pipelineList) { + list.add(item.getScanner(readPt, order)); + order--; + } + list.add(this.snapshot.getScanner(readPt, order)); + return Collections. singletonList( + new MemStoreScanner(getComparator(), list, scannerType)); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index b448b04..8ebfb39 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import com.google.common.annotations.VisibleForTesting; + /** * The MemStore holds in-memory modifications to the Store. Modifications * are {@link Cell}s. When asked to flush, current memstore is moved @@ -113,12 +115,23 @@ public class DefaultMemStore extends AbstractMemStore { /* * Scanners are ordered from 0 (oldest) to newest in increasing order. */ - public List getScanners(long readPt) throws IOException { + public List getScanners(Cell seekKey, long readPt, + MemstoreScannerType scannerType) throws IOException { + List list = new ArrayList(2); + list.add(this.active.getScanner(readPt, 1)); + list.add(this.snapshot.getScanner(readPt, 0)); + return Collections. singletonList( + new MemStoreScanner(seekKey, getComparator(), list, scannerType)); + } + + @VisibleForTesting + public List getScanners(long readPt, + MemstoreScannerType scannerType) throws IOException { List list = new ArrayList(2); list.add(this.active.getScanner(readPt, 1)); list.add(this.snapshot.getScanner(readPt, 0)); return Collections. singletonList( - new MemStoreScanner(getComparator(), list)); + new MemStoreScanner(getComparator(), list, scannerType)); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index c4bd849..502c963 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1076,50 +1076,20 @@ public class HStore implements Store { } } - /** - * Get all scanners with no filtering based on TTL (that happens further down - * the line). - * @return all scanners for this store - */ - @Override - public List getScanners(boolean cacheBlocks, boolean isGet, - boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, - byte[] stopRow, long readPt) throws IOException { - Collection storeFilesToScan; - List memStoreScanners; - this.lock.readLock().lock(); - try { - storeFilesToScan = - this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow); - memStoreScanners = this.memstore.getScanners(readPt); - } finally { - this.lock.readLock().unlock(); - } - - // First the store file scanners - - // TODO this used to get the store files in descending order, - // but now we get them in ascending order, which I think is - // actually more correct, since memstore get put at the end. - List sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan, - cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore()); - List scanners = - new ArrayList(sfScanners.size()+1); - scanners.addAll(sfScanners); - // Then the memstore scanners - scanners.addAll(memStoreScanners); - return scanners; - } - @Override - public List getScanners(List files, boolean cacheBlocks, + public List getScanners(Collection files, boolean cacheBlocks, boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, - byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException { + byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner, + MemstoreScannerType memstoreScannerType) throws IOException { List memStoreScanners = null; if (includeMemstoreScanner) { this.lock.readLock().lock(); try { - memStoreScanners = this.memstore.getScanners(readPt); + if(files == null) { + files = + this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow); + } + memStoreScanners = this.memstore.getScanners(matcher.getStartKey(), readPt, memstoreScannerType); } finally { this.lock.readLock().unlock(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index d52b863..f27f7b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -120,10 +120,14 @@ public interface MemStore extends HeapSize { long upsert(Iterable cells, long readpoint); /** + * @param seekKey the key to be seeked, could be null + * @param readPt readPt that the scanners can see. + * @param memstoreScannerType the memstore scanner type * @return scanner over the memstore. This might include scanner over the snapshot when one is * present. */ - List getScanners(long readPt) throws IOException; + List getScanners(Cell seekKey, long readPt, + MemstoreScannerType memstoreScannerType) throws IOException; /** * @return Total memory occupied by this MemStore. This includes active segment size and heap size diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java index 2eafb42..39fee7a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java @@ -65,7 +65,7 @@ public class MemStoreCompactorIterator implements Iterator { scanners.add(segment.getScanner(store.getSmallestReadPoint())); } - scanner = new MemStoreScanner(comparator, scanners, MemStoreScanner.Type.COMPACT_FORWARD); + scanner = new MemStoreScanner(comparator, scanners, MemstoreScannerType.COMPACT_FORWARD); // reinitialize the compacting scanner for each instance of iterator compactingScanner = createScanner(store, scanner); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java index 74d061c..6a3008f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java @@ -24,11 +24,15 @@ import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.htrace.Trace; +import com.google.common.annotations.VisibleForTesting; + /** * This is the scanner for any MemStore implementation, derived from MemStore. * The MemStoreScanner combines KeyValueScanner from different Segments and @@ -37,15 +41,6 @@ import org.apache.htrace.Trace; */ @InterfaceAudience.Private public class MemStoreScanner extends NonLazyKeyValueScanner { - /** - * Types of cell MemStoreScanner - */ - static public enum Type { - UNDEFINED, - COMPACT_FORWARD, - USER_SCAN_FORWARD, - USER_SCAN_BACKWARD - } // heap of scanners used for traversing forward private KeyValueHeap forwardHeap; @@ -53,8 +48,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { private ReversedKeyValueHeap backwardHeap; // The type of the scan is defined by constructor - // or according to the first usage - private Type type = Type.UNDEFINED; + private final MemstoreScannerType type; // remember the initial version of the scanners list List scanners; @@ -62,25 +56,31 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { private final CellComparator comparator; /** - * If UNDEFINED type for MemStoreScanner is provided, the forward heap is used as default! - * After constructor only one heap is going to be initialized for entire lifespan - * of the MemStoreScanner. A specific scanner can only be one directional! - * + * Creates either a forward KeyValue heap or Reverse KeyValue heap based on the type of scan + * @param seekKey the key to be seeked in case of backward scanner * @param comparator Cell Comparator - * @param scanners List of scanners, from which the heap will be built - * @param type The scan type COMPACT_FORWARD should be used for compaction + * @param scanners List of scanners, from which the heap will be built + * @param type The scan type COMPACT_FORWARD should be used for compaction */ - public MemStoreScanner(CellComparator comparator, List scanners, Type type) - throws IOException { + public MemStoreScanner(Cell seekKey, CellComparator comparator, List scanners, + MemstoreScannerType type) throws IOException { super(); this.type = type; switch (type) { - case UNDEFINED: case USER_SCAN_FORWARD: case COMPACT_FORWARD: this.forwardHeap = new KeyValueHeap(scanners, comparator); break; case USER_SCAN_BACKWARD: + if (CellUtil.matchingRow(seekKey, HConstants.EMPTY_START_ROW)) { + for (KeyValueScanner scanner : scanners) { + scanner.seekToLastRow(); + } + } else { + for (KeyValueScanner scanner : scanners) { + scanner.backwardSeek(seekKey); + } + } this.backwardHeap = new ReversedKeyValueHeap(scanners, comparator); break; default: @@ -92,12 +92,11 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner"); } } - - /* Constructor used only when the scan usage is unknown - and need to be defined according to the first move */ - public MemStoreScanner(CellComparator comparator, List scanners) - throws IOException { - this(comparator, scanners, Type.UNDEFINED); + + @VisibleForTesting + public MemStoreScanner(CellComparator comparator, List scanners, + MemstoreScannerType type) throws IOException { + this(KeyValue.LOWESTKEY, comparator, scanners, type); } /** @@ -106,7 +105,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { */ @Override public synchronized Cell peek() { - if (type == Type.USER_SCAN_BACKWARD) { + if (type == MemstoreScannerType.USER_SCAN_BACKWARD) { return backwardHeap.peek(); } return forwardHeap.peek(); @@ -117,7 +116,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { */ @Override public synchronized Cell next() throws IOException { - KeyValueHeap heap = (Type.USER_SCAN_BACKWARD == type) ? backwardHeap : forwardHeap; + KeyValueHeap heap = (MemstoreScannerType.USER_SCAN_BACKWARD == type) ? backwardHeap : forwardHeap; // loop over till the next suitable value // take next value from the heap @@ -191,18 +190,13 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { @Override public synchronized void close() { - if (forwardHeap != null) { - assert ((type == Type.USER_SCAN_FORWARD) || - (type == Type.COMPACT_FORWARD) || (type == Type.UNDEFINED)); + assert ((type == MemstoreScannerType.USER_SCAN_FORWARD) || + (type == MemstoreScannerType.COMPACT_FORWARD)); forwardHeap.close(); forwardHeap = null; - if (backwardHeap != null) { - backwardHeap.close(); - backwardHeap = null; - } } else if (backwardHeap != null) { - assert (type == Type.USER_SCAN_BACKWARD); + assert (type == MemstoreScannerType.USER_SCAN_BACKWARD); backwardHeap.close(); backwardHeap = null; } @@ -216,7 +210,6 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { */ @Override public synchronized boolean backwardSeek(Cell cell) throws IOException { - initBackwardHeapIfNeeded(cell, false); return backwardHeap.backwardSeek(cell); } @@ -228,7 +221,6 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { */ @Override public synchronized boolean seekToPreviousRow(Cell cell) throws IOException { - initBackwardHeapIfNeeded(cell, false); if (backwardHeap.peek() == null) { restartBackwardHeap(cell); } @@ -237,12 +229,14 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { @Override public synchronized boolean seekToLastRow() throws IOException { - // TODO: it looks like this is how it should be, however ReversedKeyValueHeap class doesn't - // implement seekToLastRow() method :( - // however seekToLastRow() was implemented in internal MemStoreScanner - // so I wonder whether we need to come with our own workaround, or to update - // ReversedKeyValueHeap - return initBackwardHeapIfNeeded(KeyValue.LOWESTKEY, true); + boolean result = false; + // This won't be called on a forward heap and for backward heap + // also there is no direct call for seekToLaskRow that happens on the + // ReversedKeyvalueHeap. For now this is UNUSED + for (KeyValueScanner scanner : scanners) { + result |= scanner.seekToLastRow(); + } + return result; } /** @@ -252,7 +246,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { @Override public synchronized boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { - if (type == Type.COMPACT_FORWARD) { + if (type == MemstoreScannerType.COMPACT_FORWARD) { return true; } @@ -292,52 +286,9 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { } /** - * Checks whether the type of the scan suits the assumption of moving backward - */ - private boolean initBackwardHeapIfNeeded(Cell cell, boolean toLast) throws IOException { - boolean res = false; - if (toLast && (type != Type.UNDEFINED)) { - throw new IllegalStateException( - "Wrong usage of initBackwardHeapIfNeeded in parameters. The type is:" + type.toString()); - } - if (type == Type.UNDEFINED) { - // In case we started from peek, release the forward heap - // and build backward. Set the correct type. Thus this turn - // can happen only once - if ((backwardHeap == null) && (forwardHeap != null)) { - forwardHeap.close(); - forwardHeap = null; - // before building the heap seek for the relevant key on the scanners, - // for the heap to be built from the scanners correctly - for (KeyValueScanner scan : scanners) { - if (toLast) { - res |= scan.seekToLastRow(); - } else { - res |= scan.backwardSeek(cell); - } - } - this.backwardHeap = - new ReversedKeyValueHeap(scanners, comparator); - type = Type.USER_SCAN_BACKWARD; - } - } - - if (type == Type.USER_SCAN_FORWARD) { - throw new IllegalStateException("Traversing backward with forward scan"); - } - return res; - } - - /** * Checks whether the type of the scan suits the assumption of moving forward */ private void assertForward() throws IllegalStateException { - if (type == Type.UNDEFINED) { - type = Type.USER_SCAN_FORWARD; - } - - if (type == Type.USER_SCAN_BACKWARD) { - throw new IllegalStateException("Traversing forward with backward scan"); - } + assert type == MemstoreScannerType.USER_SCAN_FORWARD : "Traversing forward with backward scan"; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreScannerType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreScannerType.java new file mode 100644 index 0000000..2aebf8a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreScannerType.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +public enum MemstoreScannerType { + COMPACT_FORWARD, USER_SCAN_FORWARD, USER_SCAN_BACKWARD +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java index 41c13f5..afaf785 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.NavigableSet; @@ -134,4 +135,12 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner { checkReseek(flushed); return this.heap.backwardSeek(key); } + + @Override + protected List getScanners(Collection storefiles, boolean usePread, + boolean isCompaction, boolean includeMemstore) throws IOException { + return store.getScanners(storefiles, cacheBlocks, get, usePread, isCompaction, matcher, + scan.getStartRow(), scan.getStopRow(), this.readPt, includeMemstore, + MemstoreScannerType.USER_SCAN_BACKWARD); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 853a4cf..6dbe7a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -84,30 +84,6 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf throws IOException; /** - * Get all scanners with no filtering based on TTL (that happens further down - * the line). - * @param cacheBlocks - * @param isGet - * @param usePread - * @param isCompaction - * @param matcher - * @param startRow - * @param stopRow - * @param readPt - * @return all scanners for this store - */ - List getScanners( - boolean cacheBlocks, - boolean isGet, - boolean usePread, - boolean isCompaction, - ScanQueryMatcher matcher, - byte[] startRow, - byte[] stopRow, - long readPt - ) throws IOException; - - /** * Create scanners on the given files and if needed on the memstore with no filtering based on TTL * (that happens further down the line). * @param files the list of files on which the scanners has to be created @@ -120,11 +96,13 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * @param stopRow the stop row * @param readPt the read point of the current scan * @param includeMemstoreScanner true if memstore has to be included + * @param memstoreScannerType the type of memstore scanner * @return scanners on the given files and on the memstore if specified */ - List getScanners(List files, boolean cacheBlocks, boolean isGet, - boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, - byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException; + List getScanners(Collection files, boolean cacheBlocks, boolean isGet, + boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, + byte[] stopRow, long readPt, boolean includeMemstoreScanner, + MemstoreScannerType memstoreScannerType) throws IOException; ScanInfo getScanInfo(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index e008a40..a4b3e63 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.NavigableSet; @@ -132,7 +133,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // Indicates whether there was flush during the course of the scan protected volatile boolean flushed = false; // generally we get one file from a flush - protected List flushedStoreFiles = new ArrayList(1); + protected Collection flushedStoreFiles = new ArrayList(1); // The current list of scanners protected List currentScanners = new ArrayList(); // flush update lock @@ -351,10 +352,15 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner protected List getScannersNoCompaction() throws IOException { final boolean isCompaction = false; boolean usePread = get || scanUsePread; - return selectScannersFrom(store.getScanners(cacheBlocks, get, usePread, - isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt)); + return selectScannersFrom(getScanners(null, usePread, isCompaction, true)); } + protected List getScanners(Collection storefiles, boolean usePread, + boolean isCompaction, boolean includeMemstore) throws IOException { + return store.getScanners(storefiles, cacheBlocks, get, usePread, isCompaction, matcher, + scan.getStartRow(), scan.getStopRow(), this.readPt, includeMemstore, + MemstoreScannerType.USER_SCAN_FORWARD); + } /** * Seek the specified scanners with the given key * @param scanners @@ -814,8 +820,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner List scanners = null; try { flushLock.lock(); - scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread, - isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true)); + scanners = selectScannersFrom(getScanners(flushedStoreFiles, usePread, isCompaction, true)); // Clear the current set of flushed store files so that they don't get added again flushedStoreFiles.clear(); } finally { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index f465625..50d8cf8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -86,7 +86,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 211a6d8..0837714 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -193,7 +193,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { ScanType scanType = ScanType.USER_SCAN; InternalScanner scanner = new StoreScanner(new Scan( Bytes.toBytes(startRowId)), scanInfo, scanType, null, - memstore.getScanners(0)); + memstore.getScanners(0, MemstoreScannerType.USER_SCAN_FORWARD)); List results = new ArrayList(); for (int i = 0; scanner.next(results); i++) { int rowId = startRowId + i; @@ -471,7 +471,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { assertEquals(2, memstore.getActive().getCellsCount()); // opening scanner before clear the snapshot - List scanners = memstore.getScanners(0); + List scanners = memstore.getScanners(0, MemstoreScannerType.USER_SCAN_FORWARD); // Shouldn't putting back the chunks to pool,since some scanners are opening // based on their data memstore.clearSnapshot(snapshot.getId()); @@ -494,7 +494,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { memstore.add(new KeyValue(row, fam, qf6, val)); memstore.add(new KeyValue(row, fam, qf7, val)); // opening scanners - scanners = memstore.getScanners(0); + scanners = memstore.getScanners(0, MemstoreScannerType.USER_SCAN_FORWARD); // close scanners before clear the snapshot for (KeyValueScanner scanner : scanners) { scanner.close(); @@ -533,7 +533,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { // pipeline bucket 2 ((CompactingMemStore)memstore).flushInMemory(); // opening scanner before force flushing - List scanners = memstore.getScanners(0); + List scanners = memstore.getScanners(0, MemstoreScannerType.USER_SCAN_FORWARD); // Shouldn't putting back the chunks to pool,since some scanners are opening // based on their data ((CompactingMemStore)memstore).enableCompaction(); @@ -568,7 +568,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { memstore.add(new KeyValue(row, fam, qf2, 4, val)); memstore.add(new KeyValue(row, fam, qf3, 4, val)); // opening scanners - scanners = memstore.getScanners(0); + scanners = memstore.getScanners(0, MemstoreScannerType.USER_SCAN_FORWARD); // close scanners before clear the snapshot for (KeyValueScanner scanner : scanners) { scanner.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java index fefe2c1..9903697 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java @@ -18,29 +18,25 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdge; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryMXBean; -import java.util.ArrayList; -import java.util.List; - /** @@ -280,8 +276,10 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) { Threads.sleep(10); } - List scanners = memstore.getScanners(Long.MAX_VALUE); - MemStoreScanner scanner = new MemStoreScanner(CellComparator.COMPARATOR, scanners); + List scanners = + memstore.getScanners(Long.MAX_VALUE, MemstoreScannerType.USER_SCAN_FORWARD); + MemStoreScanner scanner = new MemStoreScanner(CellComparator.COMPARATOR, scanners, + MemstoreScannerType.USER_SCAN_FORWARD); int count = 0; while (scanner.next() != null) { count++; @@ -345,17 +343,4 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore } regionServicesForStores.addAndGetGlobalMemstoreSize(hmc.getActive().size() - size);// } - - private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge { - long t = 1234; - - @Override public long currentTime() { - return t; - } - - public void setCurrentTimeMillis(long t) { - this.t = t; - } - } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index fdc6c92..99ad4ee 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -144,7 +144,8 @@ public class TestDefaultMemStore { @Test public void testScanAcrossSnapshot() throws IOException { int rowCount = addRows(this.memstore); - List memstorescanners = this.memstore.getScanners(0); + List memstorescanners = + this.memstore.getScanners(null, 0, MemstoreScannerType.USER_SCAN_FORWARD); Scan scan = new Scan(); List result = new ArrayList(); Configuration conf = HBaseConfiguration.create(); @@ -170,7 +171,8 @@ public class TestDefaultMemStore { scanner.close(); } - memstorescanners = this.memstore.getScanners(mvcc.getReadPoint()); + memstorescanners = + this.memstore.getScanners(mvcc.getReadPoint(), MemstoreScannerType.USER_SCAN_FORWARD); // Now assert can count same number even if a snapshot mid-scan. s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); count = 0; @@ -195,7 +197,8 @@ public class TestDefaultMemStore { for (KeyValueScanner scanner : memstorescanners) { scanner.close(); } - memstorescanners = this.memstore.getScanners(mvcc.getReadPoint()); + memstorescanners = + this.memstore.getScanners(mvcc.getReadPoint(), MemstoreScannerType.USER_SCAN_FORWARD); // Assert that new values are seen in kvset as we scan. long ts = System.currentTimeMillis(); s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); @@ -261,7 +264,8 @@ public class TestDefaultMemStore { protected void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) throws IOException { - List memstorescanners = this.memstore.getScanners(mvcc.getReadPoint()); + List memstorescanners = + this.memstore.getScanners(mvcc.getReadPoint(), MemstoreScannerType.USER_SCAN_FORWARD); assertEquals(1, memstorescanners.size()); final KeyValueScanner scanner = memstorescanners.get(0); scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW)); @@ -303,12 +307,14 @@ public class TestDefaultMemStore { kv1.setSequenceId(w.getWriteNumber()); memstore.add(kv1); - KeyValueScanner s = this.memstore.getScanners(mvcc.getReadPoint()).get(0); + KeyValueScanner s = this.memstore + .getScanners(mvcc.getReadPoint(), MemstoreScannerType.USER_SCAN_FORWARD).get(0); assertScannerResults(s, new KeyValue[]{}); mvcc.completeAndWait(w); - s = this.memstore.getScanners(mvcc.getReadPoint()).get(0); + s = this.memstore.getScanners(mvcc.getReadPoint(), MemstoreScannerType.USER_SCAN_FORWARD) + .get(0); assertScannerResults(s, new KeyValue[]{kv1}); w = mvcc.begin(); @@ -316,12 +322,14 @@ public class TestDefaultMemStore { kv2.setSequenceId(w.getWriteNumber()); memstore.add(kv2); - s = this.memstore.getScanners(mvcc.getReadPoint()).get(0); + s = this.memstore.getScanners(mvcc.getReadPoint(), MemstoreScannerType.USER_SCAN_FORWARD) + .get(0); assertScannerResults(s, new KeyValue[]{kv1}); mvcc.completeAndWait(w); - s = this.memstore.getScanners(mvcc.getReadPoint()).get(0); + s = this.memstore.getScanners(mvcc.getReadPoint(), MemstoreScannerType.USER_SCAN_FORWARD) + .get(0); assertScannerResults(s, new KeyValue[]{kv1, kv2}); } @@ -354,7 +362,8 @@ public class TestDefaultMemStore { mvcc.completeAndWait(w); // BEFORE STARTING INSERT 2, SEE FIRST KVS - KeyValueScanner s = this.memstore.getScanners(mvcc.getReadPoint()).get(0); + KeyValueScanner s = this.memstore + .getScanners(mvcc.getReadPoint(), MemstoreScannerType.USER_SCAN_FORWARD).get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START INSERT 2: Write both columns val2 @@ -368,7 +377,8 @@ public class TestDefaultMemStore { memstore.add(kv22); // BEFORE COMPLETING INSERT 2, SEE FIRST KVS - s = this.memstore.getScanners(mvcc.getReadPoint()).get(0); + s = this.memstore.getScanners(mvcc.getReadPoint(), MemstoreScannerType.USER_SCAN_FORWARD) + .get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // COMPLETE INSERT 2 @@ -377,7 +387,8 @@ public class TestDefaultMemStore { // NOW SHOULD SEE NEW KVS IN ADDITION TO OLD KVS. // See HBASE-1485 for discussion about what we should do with // the duplicate-TS inserts - s = this.memstore.getScanners(mvcc.getReadPoint()).get(0); + s = this.memstore.getScanners(mvcc.getReadPoint(), MemstoreScannerType.USER_SCAN_FORWARD) + .get(0); assertScannerResults(s, new KeyValue[]{kv21, kv11, kv22, kv12}); } @@ -407,7 +418,8 @@ public class TestDefaultMemStore { mvcc.completeAndWait(w); // BEFORE STARTING INSERT 2, SEE FIRST KVS - KeyValueScanner s = this.memstore.getScanners(mvcc.getReadPoint()).get(0); + KeyValueScanner s = this.memstore + .getScanners(mvcc.getReadPoint(), MemstoreScannerType.USER_SCAN_FORWARD).get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START DELETE: Insert delete for one of the columns @@ -418,14 +430,16 @@ public class TestDefaultMemStore { memstore.add(kvDel); // BEFORE COMPLETING DELETE, SEE FIRST KVS - s = this.memstore.getScanners(mvcc.getReadPoint()).get(0); + s = this.memstore.getScanners(mvcc.getReadPoint(), MemstoreScannerType.USER_SCAN_FORWARD) + .get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // COMPLETE DELETE mvcc.completeAndWait(w); // NOW WE SHOULD SEE DELETE - s = this.memstore.getScanners(mvcc.getReadPoint()).get(0); + s = this.memstore.getScanners(mvcc.getReadPoint(), MemstoreScannerType.USER_SCAN_FORWARD) + .get(0); assertScannerResults(s, new KeyValue[]{kv11, kvDel, kv12}); } @@ -439,14 +453,14 @@ public class TestDefaultMemStore { final byte[] q1 = Bytes.toBytes("q1"); final MultiVersionConcurrencyControl mvcc; - final MemStore memstore; + final AbstractMemStore memstore; final AtomicLong startSeqNum; AtomicReference caughtException; public ReadOwnWritesTester(int id, - MemStore memstore, + AbstractMemStore memstore, MultiVersionConcurrencyControl mvcc, AtomicReference caughtException, AtomicLong startSeqNum) @@ -481,7 +495,8 @@ public class TestDefaultMemStore { mvcc.completeAndWait(w); // Assert that we can read back - KeyValueScanner s = this.memstore.getScanners(mvcc.getReadPoint()).get(0); + KeyValueScanner s = this.memstore + .getScanners(mvcc.getReadPoint(), MemstoreScannerType.USER_SCAN_FORWARD).get(0); s.seek(kv); Cell ret = s.next(); @@ -584,7 +599,7 @@ public class TestDefaultMemStore { ScanType scanType = ScanType.USER_SCAN; try (InternalScanner scanner = new StoreScanner(new Scan( Bytes.toBytes(startRowId)), scanInfo, scanType, null, - memstore.getScanners(0))) { + memstore.getScanners(0, MemstoreScannerType.USER_SCAN_FORWARD))) { List results = new ArrayList(); for (int i = 0; scanner.next(results); i++) { int rowId = startRowId + i; @@ -1093,9 +1108,9 @@ public class TestDefaultMemStore { } } - static void doScan(MemStore ms, int iteration) throws IOException { + static void doScan(DefaultMemStore ms, int iteration) throws IOException { long nanos = System.nanoTime(); - KeyValueScanner s = ms.getScanners(0).get(0); + KeyValueScanner s = ms.getScanners(0, MemstoreScannerType.USER_SCAN_FORWARD).get(0); s.seek(KeyValueUtil.createFirstOnRow(new byte[]{})); System.out.println(iteration + " create/seek took: " + (System.nanoTime() - nanos)/1000); @@ -1108,7 +1123,7 @@ public class TestDefaultMemStore { } public static void main(String [] args) throws IOException { - MemStore ms = new DefaultMemStore(); + DefaultMemStore ms = new DefaultMemStore(); long n1 = System.nanoTime(); addRows(25000, ms); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 2042f52..80d220d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -90,7 +90,6 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.TestMobSnapshotCloneIndependence; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; @@ -103,7 +102,6 @@ import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.master.procedure.TestMasterFailoverWithProcedures; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index cfbb098..f525b7a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -164,7 +164,7 @@ public class TestMemStoreChunkPool { assertEquals(2, memstore.getActive().getCellsCount()); // opening scanner before clear the snapshot - List scanners = memstore.getScanners(0); + List scanners = memstore.getScanners(0, MemstoreScannerType.USER_SCAN_FORWARD); // Shouldn't putting back the chunks to pool,since some scanners are opening // based on their data memstore.clearSnapshot(snapshot.getId()); @@ -186,7 +186,7 @@ public class TestMemStoreChunkPool { memstore.add(new KeyValue(row, fam, qf6, val)); memstore.add(new KeyValue(row, fam, qf7, val)); // opening scanners - scanners = memstore.getScanners(0); + scanners = memstore.getScanners(0, MemstoreScannerType.USER_SCAN_FORWARD); // close scanners before clear the snapshot for (KeyValueScanner scanner : scanners) { scanner.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java index 0ec859c..c3a6512 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java @@ -124,11 +124,14 @@ public class TestReversibleScanners { public void testReversibleMemstoreScanner() throws IOException { MemStore memstore = new DefaultMemStore(); writeMemstore(memstore); - List scanners = memstore.getScanners(Long.MAX_VALUE); + List scanners = + memstore.getScanners(KeyValue.LOWESTKEY, Long.MAX_VALUE, + MemstoreScannerType.USER_SCAN_BACKWARD); seekTestOfReversibleKeyValueScanner(scanners.get(0)); for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { LOG.info("Setting read point to " + readPoint); - scanners = memstore.getScanners(readPoint); + scanners = memstore.getScanners(KeyValue.LOWESTKEY, readPoint, + MemstoreScannerType.USER_SCAN_BACKWARD); seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint); } @@ -495,7 +498,12 @@ public class TestReversibleScanners { List fileScanners = StoreFileScanner .getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true, false, false, readPoint); - List memScanners = memstore.getScanners(readPoint); + Cell startKey = KeyValue.LOWESTKEY; + if (doSeek) { + startKey = KeyValueUtil.createFirstOnRow(startRow); + } + List memScanners = + memstore.getScanners(startKey, readPoint, MemstoreScannerType.USER_SCAN_BACKWARD); List scanners = new ArrayList( fileScanners.size() + 1); scanners.addAll(fileScanners); @@ -507,7 +515,6 @@ public class TestReversibleScanners { scanner.seekToLastRow(); } } else { - KeyValue startKey = KeyValueUtil.createFirstOnRow(startRow); for (KeyValueScanner scanner : scanners) { scanner.backwardSeek(startKey); }