From dda67b399003c6a130f3f7c3ed8d0659a771d924 Mon Sep 17 00:00:00 2001 From: eshcar Date: Thu, 16 Feb 2017 21:27:19 +0200 Subject: [PATCH] HBASE-17655: Removing MemStoreScanner and SnapshotScanner --- .../example/ZooKeeperScanPolicyObserver.java | 4 +- .../hbase/coprocessor/BaseRegionObserver.java | 6 +- .../hadoop/hbase/coprocessor/RegionObserver.java | 20 +- .../hadoop/hbase/mob/DefaultMobStoreFlusher.java | 2 +- .../hbase/regionserver/CompactingMemStore.java | 2 +- .../regionserver/CompositeImmutableSegment.java | 32 +- .../hadoop/hbase/regionserver/DefaultMemStore.java | 7 +- .../hbase/regionserver/DefaultStoreFlusher.java | 2 +- .../hbase/regionserver/ImmutableSegment.java | 12 +- .../hbase/regionserver/MemStoreCompactor.java | 2 +- .../MemStoreCompactorSegmentsIterator.java | 19 +- .../MemStoreMergerSegmentsIterator.java | 53 +++- .../hadoop/hbase/regionserver/MemStoreScanner.java | 334 --------------------- .../regionserver/MemStoreSegmentsIterator.java | 17 +- .../hbase/regionserver/MemStoreSnapshot.java | 13 +- .../hbase/regionserver/RegionCoprocessorHost.java | 7 +- .../apache/hadoop/hbase/regionserver/Segment.java | 6 +- .../hadoop/hbase/regionserver/SegmentScanner.java | 8 +- .../hadoop/hbase/regionserver/SnapshotScanner.java | 105 ------- .../hadoop/hbase/regionserver/StoreFlusher.java | 8 +- .../hbase/regionserver/StripeStoreFlusher.java | 2 +- .../hbase/coprocessor/SimpleRegionObserver.java | 2 +- .../TestRegionObserverScannerOpenHook.java | 6 +- .../hbase/regionserver/NoOpScanPolicyObserver.java | 4 +- .../hbase/regionserver/TestCompactingMemStore.java | 30 +- .../TestCompactingToCellArrayMapMemStore.java | 32 +- .../hbase/regionserver/TestDefaultMemStore.java | 20 +- .../hbase/regionserver/TestMemStoreChunkPool.java | 14 +- .../hbase/regionserver/TestReversibleScanners.java | 66 ++-- .../hbase/util/TestCoprocessorScanPolicy.java | 6 +- 30 files changed, 228 insertions(+), 613 deletions(-) delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java index 48d7a55..2f4d6bf 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java @@ -193,7 +193,7 @@ public class ZooKeeperScanPolicyObserver extends BaseRegionObserver { @Override public InternalScanner preFlushScannerOpen(final ObserverContext c, - Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { + Store store, List scanners, InternalScanner s) throws IOException { ScanInfo scanInfo = getScanInfo(store, c.getEnvironment()); if (scanInfo == null) { // take default action @@ -201,7 +201,7 @@ public class ZooKeeperScanPolicyObserver extends BaseRegionObserver { } Scan scan = new Scan(); scan.setMaxVersions(scanInfo.getMaxVersions()); - return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), + return new StoreScanner(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java index f5cc4de..8f6d743 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java @@ -97,16 +97,16 @@ public class BaseRegionObserver implements RegionObserver { @Override public InternalScanner preFlushScannerOpen(final ObserverContext c, - final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s) + final Store store, final List scanners, final InternalScanner s) throws IOException { return s; } @Override public InternalScanner preFlushScannerOpen(final ObserverContext c, - final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s, + final Store store, final List scanners, final InternalScanner s, final long readPoint) throws IOException { - return preFlushScannerOpen(c, store, memstoreScanner, s); + return preFlushScannerOpen(c, store, scanners, s); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 390b723..d0498d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -109,17 +109,15 @@ public interface RegionObserver extends Coprocessor { * effect in this hook. * @param c the environment provided by the region server * @param store the store being flushed - * @param memstoreScanner the scanner for the memstore that is flushed - * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain - * @return the scanner to use during the flush. {@code null} if the default implementation + * @param scanners the scanners for the memstore that is flushed + *@param s the base scanner, if not {@code null}, from previous RegionObserver in the chain @return the scanner to use during the flush. {@code null} if the default implementation * is to be used. * @throws IOException if an error occurred on the coprocessor - * @deprecated Use {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, - * InternalScanner, long)} + * @deprecated Use {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} */ @Deprecated InternalScanner preFlushScannerOpen(final ObserverContext c, - final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s) + final Store store, final List scanners, final InternalScanner s) throws IOException; /** @@ -131,7 +129,7 @@ public interface RegionObserver extends Coprocessor { * effect in this hook. * @param c the environment provided by the region server * @param store the store being flushed - * @param memstoreScanner the scanner for the memstore that is flushed + * @param scanners the scannerS for the memstore that is flushed * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain * @param readPoint the readpoint to create scanner * @return the scanner to use during the flush. {@code null} if the default implementation @@ -139,7 +137,7 @@ public interface RegionObserver extends Coprocessor { * @throws IOException if an error occurred on the coprocessor */ InternalScanner preFlushScannerOpen(final ObserverContext c, - final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s, + final Store store, final List scanners, final InternalScanner s, final long readPoint) throws IOException; /** @@ -1086,8 +1084,7 @@ public interface RegionObserver extends Coprocessor { * Called before a store opens a new scanner. * This hook is called when a "user" scanner is opened. *

- * See {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, InternalScanner, - * long)} and {@link #preCompactScannerOpen(ObserverContext, + * See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} and {@link #preCompactScannerOpen(ObserverContext, * Store, List, ScanType, long, InternalScanner, CompactionRequest, long)} * to override scanners created for flushes or compactions, resp. *

@@ -1117,8 +1114,7 @@ public interface RegionObserver extends Coprocessor { * Called before a store opens a new scanner. * This hook is called when a "user" scanner is opened. *

- * See {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, InternalScanner, - * long)} and {@link #preCompactScannerOpen(ObserverContext, + * See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} and {@link #preCompactScannerOpen(ObserverContext, * Store, List, ScanType, long, InternalScanner, CompactionRequest, long)} * to override scanners created for flushes or compactions, resp. *

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java index 77f167e..ecab5d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java @@ -101,7 +101,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher { // Use a store scanner to find which rows to flush. long smallestReadPoint = store.getSmallestReadPoint(); - InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint); + InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint); if (scanner == null) { return result; // NULL scanner returned from coprocessor hooks means skip normal processing } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index e7f4a67..a3f30a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -308,7 +308,7 @@ public class CompactingMemStore extends AbstractMemStore { list.add(item.getScanner(readPt, order)); order--; } - return Collections. singletonList(new MemStoreScanner(getComparator(), list)); + return list; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java index 30d17fb..8b3c060 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java @@ -22,11 +22,9 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; -import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; @@ -73,15 +71,6 @@ public class CompositeImmutableSegment extends ImmutableSegment { } /** - * Builds a special scanner for the MemStoreSnapshot object that is different than the - * general segment scanner. - * @return a special scanner for the MemStoreSnapshot object - */ - public KeyValueScanner getSnapshotScanner() { - return getScanner(Long.MAX_VALUE, Long.MAX_VALUE); - } - - /** * @return whether the segment has any cells */ public boolean isEmpty() { @@ -140,8 +129,7 @@ public class CompositeImmutableSegment extends ImmutableSegment { * @return a scanner for the given read point */ public KeyValueScanner getScanner(long readPoint) { - // Long.MAX_VALUE is DEFAULT_SCANNER_ORDER - return getScanner(readPoint,Long.MAX_VALUE); + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); } /** @@ -149,19 +137,17 @@ public class CompositeImmutableSegment extends ImmutableSegment { * @return a scanner for the given read point */ public KeyValueScanner getScanner(long readPoint, long order) { - KeyValueScanner resultScanner; - List list = new ArrayList(segments.size()); + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } + + @Override + public List getScanners(long readPoint, long order) { + List list = new ArrayList<>(segments.size()); for (ImmutableSegment s : segments) { list.add(s.getScanner(readPoint, order)); + order--; } - - try { - resultScanner = new MemStoreScanner(getComparator(), list); - } catch (IOException ie) { - throw new IllegalStateException(ie); - } - - return resultScanner; + return list; } public boolean isTagsPresent() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 63af570..b68faeb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -126,11 +126,10 @@ public class DefaultMemStore extends AbstractMemStore { * Scanners are ordered from 0 (oldest) to newest in increasing order. */ public List getScanners(long readPt) throws IOException { - List list = new ArrayList(2); + List list = new ArrayList<>(); list.add(this.active.getScanner(readPt, 1)); - list.add(this.snapshot.getScanner(readPt, 0)); - return Collections. singletonList( - new MemStoreScanner(getComparator(), list)); + list.addAll(this.snapshot.getScanners(readPt, 0)); + return list; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java index 93837b7..66f310e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java @@ -52,7 +52,7 @@ public class DefaultStoreFlusher extends StoreFlusher { // Use a store scanner to find which rows to flush. long smallestReadPoint = store.getSmallestReadPoint(); - InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint); + InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint); if (scanner == null) { return result; // NULL scanner returned from coprocessor hooks means skip normal processing } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index faa9b67..d103a58 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -35,9 +35,7 @@ import java.util.List; /** * ImmutableSegment is an abstract class that extends the API supported by a {@link Segment}, - * and is not needed for a {@link MutableSegment}. Specifically, the method - * {@link ImmutableSegment#getSnapshotScanner()} builds a special scanner for the - * {@link MemStoreSnapshot} object. + * and is not needed for a {@link MutableSegment}. */ @InterfaceAudience.Private public class ImmutableSegment extends Segment { @@ -132,14 +130,6 @@ public class ImmutableSegment extends Segment { } ///////////////////// PUBLIC METHODS ///////////////////// - /** - * Builds a special scanner for the MemStoreSnapshot object that is different than the - * general segment scanner. - * @return a special scanner for the MemStoreSnapshot object - */ - public KeyValueScanner getSnapshotScanner() { - return new SnapshotScanner(this); - } @Override public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index c435098..dfa7d18 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -252,7 +252,7 @@ public class MemStoreCompactor { iterator = new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(), compactingMemStore.getComparator(), - compactionKVMax, compactingMemStore.getStore()); + compactionKVMax); result = SegmentFactory.instance().createImmutableSegmentByMerge( compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java index f31c973..073da08 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java @@ -50,11 +50,19 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator List segments, CellComparator comparator, int compactionKVMax, Store store ) throws IOException { - super(segments,comparator,compactionKVMax,store); - + super(compactionKVMax); + + List scanners = new ArrayList(); + // create the list of scanners to traverse over all the data + // no dirty reads here as these are immutable segments + int order = segments.size(); + for (ImmutableSegment segment : segments) { + scanners.add(segment.getScanner(Integer.MAX_VALUE, order)); + order--; + } // build the scanner based on Query Matcher // reinitialize the compacting scanner for each instance of iterator - compactingScanner = createScanner(store, scanner); + compactingScanner = createScanner(store, scanners); hasMore = compactingScanner.next(kvs, scannerContext); @@ -93,7 +101,6 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator public void close() { compactingScanner.close(); compactingScanner = null; - scanner = null; } @Override @@ -106,13 +113,13 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator * * @return the scanner */ - private StoreScanner createScanner(Store store, KeyValueScanner scanner) + private StoreScanner createScanner(Store store, List scanners) throws IOException { Scan scan = new Scan(); scan.setMaxVersions(); //Get all available versions StoreScanner internalScanner = - new StoreScanner(store, store.getScanInfo(), scan, Collections.singletonList(scanner), + new StoreScanner(store, store.getScanInfo(), scan, scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java index 625fc76..1e2d844 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import java.io.IOException; +import java.util.ArrayList; import java.util.List; /** @@ -33,32 +34,66 @@ import java.util.List; @InterfaceAudience.Private public class MemStoreMergerSegmentsIterator extends MemStoreSegmentsIterator { + // heap of scanners, lazily initialized + private KeyValueHeap heap = null; + // remember the initial version of the scanners list + List scanners = new ArrayList(); + + private boolean closed = false; + // C-tor public MemStoreMergerSegmentsIterator(List segments, CellComparator comparator, - int compactionKVMax, Store store - ) throws IOException { - super(segments,comparator,compactionKVMax,store); + int compactionKVMax) throws IOException { + super(compactionKVMax); + // create the list of scanners to traverse over all the data + // no dirty reads here as these are immutable segments + int order = segments.size(); + for (ImmutableSegment segment : segments) { + scanners.add(segment.getScanner(Integer.MAX_VALUE, order)); + order--; + } + heap = new KeyValueHeap(scanners, comparator); } @Override public boolean hasNext() { - return (scanner.peek()!=null); + if (closed) { + return false; + } + if (this.heap != null) { + return (this.heap.peek() != null); + } + // Doing this way in case some test cases tries to peek directly + return false; } @Override public Cell next() { - Cell result = null; try { // try to get next - result = scanner.next(); + if (!closed && heap != null) { + return heap.next(); + } } catch (IOException ie) { throw new IllegalStateException(ie); } - return result; + return null; } public void close() { - scanner.close(); - scanner = null; + if (closed) { + return; + } + // Ensuring that all the segment scanners are closed + if (heap != null) { + heap.close(); + // It is safe to do close as no new calls will be made to this scanner. + heap = null; + } else { + for (KeyValueScanner scanner : scanners) { + scanner.close(); + } + } + closed = true; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java deleted file mode 100644 index 2ccdf68..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java +++ /dev/null @@ -1,334 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.htrace.Trace; - -/** - * This is the scanner for any MemStore implementation, derived from MemStore. - * The MemStoreScanner combines KeyValueScanner from different Segments and - * uses the key-value heap and the reversed key-value heap for the aggregated key-values set. - * It is assumed that only traversing forward or backward is used (without zigzagging in between) - */ -@InterfaceAudience.Private -public class MemStoreScanner extends NonLazyKeyValueScanner { - - // heap of scanners, lazily initialized - private KeyValueHeap heap; - - // indicates if the scanner is created for inmemoryCompaction - private boolean inmemoryCompaction; - - // remember the initial version of the scanners list - List scanners; - - private final CellComparator comparator; - - private boolean closed; - - /** - * Creates either a forward KeyValue heap or Reverse KeyValue heap based on the type of scan - * and the heap is lazily initialized - * @param comparator Cell Comparator - * @param scanners List of scanners, from which the heap will be built - * @param inmemoryCompaction true if used for inmemoryCompaction. - * In this case, creates a forward heap always. - */ - public MemStoreScanner(CellComparator comparator, List scanners, - boolean inmemoryCompaction) throws IOException { - super(); - this.comparator = comparator; - this.scanners = scanners; - if (Trace.isTracing() && Trace.currentSpan() != null) { - Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner"); - } - this.inmemoryCompaction = inmemoryCompaction; - if (inmemoryCompaction) { - // init the forward scanner in case of inmemoryCompaction - initForwardKVHeapIfNeeded(comparator, scanners); - } - } - - /** - * Creates either a forward KeyValue heap or Reverse KeyValue heap based on the type of scan - * and the heap is lazily initialized - * @param comparator Cell Comparator - * @param scanners List of scanners, from which the heap will be built - */ - public MemStoreScanner(CellComparator comparator, List scanners) - throws IOException { - this(comparator, scanners, false); - } - - private void initForwardKVHeapIfNeeded(CellComparator comparator, List scanners) - throws IOException { - if (heap == null) { - // lazy init - // In a normal scan case, at the StoreScanner level before the KVHeap is - // created we do a seek or reseek. So that will happen - // on all the scanners that the StoreScanner is - // made of. So when we get any of those call to this scanner we init the - // heap here with normal forward KVHeap. - this.heap = new KeyValueHeap(scanners, comparator); - } - } - - private boolean initReverseKVHeapIfNeeded(Cell seekKey, CellComparator comparator, - List scanners) throws IOException { - boolean res = false; - if (heap == null) { - // lazy init - // In a normal reverse scan case, at the ReversedStoreScanner level before the - // ReverseKeyValueheap is - // created we do a seekToLastRow or backwardSeek. So that will happen - // on all the scanners that the ReversedStoreSCanner is - // made of. So when we get any of those call to this scanner we init the - // heap here with ReversedKVHeap. - if (CellUtil.matchingRow(seekKey, HConstants.EMPTY_START_ROW)) { - for (KeyValueScanner scanner : scanners) { - res |= scanner.seekToLastRow(); - } - } else { - for (KeyValueScanner scanner : scanners) { - res |= scanner.backwardSeek(seekKey); - } - } - this.heap = new ReversedKeyValueHeap(scanners, comparator); - } - return res; - } - - /** - * Returns the cell from the top-most scanner without advancing the iterator. - * The backward traversal is assumed, only if specified explicitly - */ - @Override - public Cell peek() { - if (closed) { - return null; - } - if (this.heap != null) { - return this.heap.peek(); - } - // Doing this way in case some test cases tries to peek directly to avoid NPE - return null; - } - - /** - * Gets the next cell from the top-most scanner. Assumed forward scanning. - */ - @Override - public Cell next() throws IOException { - if (closed) { - return null; - } - if(this.heap != null) { - // loop over till the next suitable value - // take next value from the heap - for (Cell currentCell = heap.next(); - currentCell != null; - currentCell = heap.next()) { - // all the logic of presenting cells is inside the internal KeyValueScanners - // located inside the heap - return currentCell; - } - } - return null; - } - - /** - * Set the scanner at the seek key. Assumed forward scanning. - * Must be called only once: there is no thread safety between the scanner - * and the memStore. - * - * @param cell seek value - * @return false if the key is null or if there is no data - */ - @Override - public boolean seek(Cell cell) throws IOException { - if (closed) { - return false; - } - initForwardKVHeapIfNeeded(comparator, scanners); - - if (cell == null) { - close(); - return false; - } - - return heap.seek(cell); - } - - /** - * Move forward on the sub-lists set previously by seek. Assumed forward scanning. - * - * @param cell seek value (should be non-null) - * @return true if there is at least one KV to read, false otherwise - */ - @Override - public boolean reseek(Cell cell) throws IOException { - /* - * See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation. - * This code is executed concurrently with flush and puts, without locks. - * Two points must be known when working on this code: - * 1) It's not possible to use the 'kvTail' and 'snapshot' - * variables, as they are modified during a flush. - * 2) The ideal implementation for performance would use the sub skip list - * implicitly pointed by the iterators 'kvsetIt' and - * 'snapshotIt'. Unfortunately the Java API does not offer a method to - * get it. So we remember the last keys we iterated to and restore - * the reseeked set to at least that point. - * - * TODO: The above comment copied from the original MemStoreScanner - */ - if (closed) { - return false; - } - initForwardKVHeapIfNeeded(comparator, scanners); - return heap.reseek(cell); - } - - /** - * MemStoreScanner returns Long.MAX_VALUE because it will always have the latest data among all - * scanners. - * @see KeyValueScanner#getScannerOrder() - */ - @Override - public long getScannerOrder() { - return Long.MAX_VALUE; - } - - @Override - public void close() { - if (closed) { - return; - } - // Ensuring that all the segment scanners are closed - if (heap != null) { - heap.close(); - // It is safe to do close as no new calls will be made to this scanner. - heap = null; - } else { - for (KeyValueScanner scanner : scanners) { - scanner.close(); - } - } - closed = true; - } - - /** - * Set the scanner at the seek key. Assumed backward scanning. - * - * @param cell seek value - * @return false if the key is null or if there is no data - */ - @Override - public boolean backwardSeek(Cell cell) throws IOException { - // The first time when this happens it sets the scanners to the seek key - // passed by the incoming scan's start row - if (closed) { - return false; - } - initReverseKVHeapIfNeeded(cell, comparator, scanners); - return heap.backwardSeek(cell); - } - - /** - * Assumed backward scanning. - * - * @param cell seek value - * @return false if the key is null or if there is no data - */ - @Override - public boolean seekToPreviousRow(Cell cell) throws IOException { - if (closed) { - return false; - } - initReverseKVHeapIfNeeded(cell, comparator, scanners); - if (heap.peek() == null) { - restartBackwardHeap(cell); - } - return heap.seekToPreviousRow(cell); - } - - @Override - public boolean seekToLastRow() throws IOException { - if (closed) { - return false; - } - return initReverseKVHeapIfNeeded(KeyValue.LOWESTKEY, comparator, scanners); - } - - /** - * Check if this memstore may contain the required keys - * @return False if the key definitely does not exist in this Memstore - */ - @Override - public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { - // TODO : Check if this can be removed. - if (inmemoryCompaction) { - return true; - } - - for (KeyValueScanner sc : scanners) { - if (sc.shouldUseScanner(scan, store, oldestUnexpiredTS)) { - return true; - } - } - return false; - } - - // debug method - @Override - public String toString() { - StringBuffer buf = new StringBuffer(); - int i = 1; - for (KeyValueScanner scanner : scanners) { - buf.append("scanner (" + i + ") " + scanner.toString() + " ||| "); - i++; - } - return buf.toString(); - } - /****************** Private methods ******************/ - /** - * Restructure the ended backward heap after rerunning a seekToPreviousRow() - * on each scanner - * @return false if given Cell does not exist in any scanner - */ - private boolean restartBackwardHeap(Cell cell) throws IOException { - boolean res = false; - for (KeyValueScanner scan : scanners) { - res |= scan.seekToPreviousRow(cell); - } - this.heap = - new ReversedKeyValueHeap(scanners, comparator); - return res; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java index e2f4ebb..dd6f28f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java @@ -35,29 +35,14 @@ import java.util.*; @InterfaceAudience.Private public abstract class MemStoreSegmentsIterator implements Iterator { - // scanner for full or partial pipeline (heap of segment scanners) - // we need to keep those scanners in order to close them at the end - protected KeyValueScanner scanner; - protected final ScannerContext scannerContext; // C-tor - public MemStoreSegmentsIterator(List segments, CellComparator comparator, - int compactionKVMax, Store store) throws IOException { + public MemStoreSegmentsIterator(int compactionKVMax) throws IOException { this.scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); - // list of Scanners of segments in the pipeline, when compaction starts - List scanners = new ArrayList(); - - // create the list of scanners to traverse over all the data - // no dirty reads here as these are immutable segments - for (ImmutableSegment segment : segments) { - scanners.add(segment.getScanner(Integer.MAX_VALUE)); - } - - scanner = new MemStoreScanner(comparator, scanners, true); } public abstract void close(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java index 61e7876..82fec2f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import java.util.List; /** * Holds details of the snapshot taken on a MemStore. Details include the snapshot's identifier, * count of cells in it and total memory size occupied by all the cells, timestamp information of @@ -31,7 +32,7 @@ public class MemStoreSnapshot { private final long dataSize; private final long heapOverhead; private final TimeRangeTracker timeRangeTracker; - private final KeyValueScanner scanner; + private final List scanners; private final boolean tagsPresent; public MemStoreSnapshot(long id, ImmutableSegment snapshot) { @@ -40,7 +41,7 @@ public class MemStoreSnapshot { this.dataSize = snapshot.keySize(); this.heapOverhead = snapshot.heapOverhead(); this.timeRangeTracker = snapshot.getTimeRangeTracker(); - this.scanner = snapshot.getSnapshotScanner(); + this.scanners = snapshot.getScanners(Long.MAX_VALUE, Long.MAX_VALUE); this.tagsPresent = snapshot.isTagsPresent(); } @@ -66,21 +67,21 @@ public class MemStoreSnapshot { } public long getHeapOverhead() { - return this.heapOverhead; + return heapOverhead; } /** * @return {@link TimeRangeTracker} for all the Cells in the snapshot. */ public TimeRangeTracker getTimeRangeTracker() { - return this.timeRangeTracker; + return timeRangeTracker; } /** * @return {@link KeyValueScanner} for iterating over the snapshot */ - public KeyValueScanner getScanner() { - return this.scanner; + public List getScanners() { + return scanners; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index e4b47bc..5f990d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -634,17 +634,16 @@ public class RegionCoprocessorHost /** * See - * {@link RegionObserver#preFlushScannerOpen(ObserverContext, - * Store, KeyValueScanner, InternalScanner, long)} + * {@link RegionObserver#preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} */ public InternalScanner preFlushScannerOpen(final Store store, - final KeyValueScanner memstoreScanner, final long readPoint) throws IOException { + final List scanners, final long readPoint) throws IOException { return execOperationWithResult(null, coprocessors.isEmpty() ? null : new RegionOperationWithResult() { @Override public void call(RegionObserver oserver, ObserverContext ctx) throws IOException { - setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult(), readPoint)); + setResult(oserver.preFlushScannerOpen(ctx, store, scanners, getResult(), readPoint)); } }); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java index 8581517..2faf1dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java @@ -18,7 +18,7 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.SortedSet; @@ -115,9 +115,7 @@ public abstract class Segment { } public List getScanners(long readPoint, long order) { - List scanners = new ArrayList(1); - scanners.add(getScanner(readPoint, order)); - return scanners; + return Collections.singletonList(new SegmentScanner(this, readPoint, order)); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index 5e2e36f..6f622d1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.SortedSet; +import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -287,9 +288,7 @@ public class SegmentScanner implements KeyValueScanner { @Override public boolean requestSeek(Cell c, boolean forward, boolean useBloom) throws IOException { - - throw new IllegalStateException( - "requestSeek cannot be called on MutableCellSetSegmentScanner"); + return NonLazyKeyValueScanner.doRealSeek(this, c, forward); } /** @@ -309,8 +308,7 @@ public class SegmentScanner implements KeyValueScanner { */ @Override public void enforceSeek() throws IOException { - throw new IllegalStateException( - "enforceSeek cannot be called on MutableCellSetSegmentScanner"); + throw new NotImplementedException("enforceSeek must not be called on a SegmentScanner"); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java deleted file mode 100644 index 6300e00..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.commons.lang.NotImplementedException; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Scan; - -/** - * Scans the snapshot. Acts as a simple scanner that just iterates over all the cells - * in the segment - */ -@InterfaceAudience.Private -public class SnapshotScanner extends SegmentScanner { - - public SnapshotScanner(Segment immutableSegment) { - // Snapshot scanner does not need readpoint. It should read all the cells in the - // segment - super(immutableSegment, Long.MAX_VALUE); - } - - @Override - public Cell peek() { // sanity check, the current should be always valid - if (closed) { - return null; - } - return current; - } - - @Override - public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { - return true; - } - - @Override - public boolean backwardSeek(Cell key) throws IOException { - throw new NotImplementedException( - "backwardSeek must not be called on a " + "non-reversed scanner"); - } - - @Override - public boolean seekToPreviousRow(Cell key) throws IOException { - throw new NotImplementedException( - "seekToPreviousRow must not be called on a " + "non-reversed scanner"); - } - - @Override - public boolean seekToLastRow() throws IOException { - throw new NotImplementedException( - "seekToLastRow must not be called on a " + "non-reversed scanner"); - } - - @Override - protected Iterator getIterator(Cell cell) { - return segment.iterator(); - } - - @Override - protected void updateCurrent() { - if (iter.hasNext()) { - current = iter.next(); - } else { - current = null; - } - } - - @Override - public boolean seek(Cell seekCell) { - // restart iterator - iter = getIterator(seekCell); - return reseek(seekCell); - } - - @Override - public boolean reseek(Cell seekCell) { - while (iter.hasNext()) { - Cell next = iter.next(); - int ret = segment.getComparator().compare(next, seekCell); - if (ret >= 0) { - current = next; - return true; - } - } - return false; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index abfd3fc..7cc5821 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -74,22 +74,22 @@ abstract class StoreFlusher { /** * Creates the scanner for flushing snapshot. Also calls coprocessors. - * @param snapshotScanner + * @param snapshotScanners * @param smallestReadPoint * @return The scanner; null if coprocessor is canceling the flush. */ - protected InternalScanner createScanner(KeyValueScanner snapshotScanner, + protected InternalScanner createScanner(List snapshotScanners, long smallestReadPoint) throws IOException { InternalScanner scanner = null; if (store.getCoprocessorHost() != null) { - scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanner, + scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanners, smallestReadPoint); } if (scanner == null) { Scan scan = new Scan(); scan.setMaxVersions(store.getScanInfo().getMaxVersions()); scanner = new StoreScanner(store, store.getScanInfo(), scan, - Collections.singletonList(snapshotScanner), ScanType.COMPACT_RETAIN_DELETES, + snapshotScanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP); } assert scanner != null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java index 22c3ce7..ee157ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java @@ -62,7 +62,7 @@ public class StripeStoreFlusher extends StoreFlusher { if (cellsCount == 0) return result; // don't flush if there are no entries long smallestReadPoint = store.getSmallestReadPoint(); - InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint); + InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint); if (scanner == null) { return result; // NULL scanner returned from coprocessor hooks means skip normal processing } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index d4511b9..1d4a9f4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -187,7 +187,7 @@ public class SimpleRegionObserver extends BaseRegionObserver { @Override public InternalScanner preFlushScannerOpen(final ObserverContext c, - Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { + Store store, List scanners, InternalScanner s) throws IOException { ctPreFlushScannerOpen.incrementAndGet(); return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index 784eca7..7ccfa77 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -50,7 +49,6 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.Region; @@ -118,11 +116,11 @@ public class TestRegionObserverScannerOpenHook { public static class NoDataFromFlush extends BaseRegionObserver { @Override public InternalScanner preFlushScannerOpen(ObserverContext c, - Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { + Store store, List scanners, InternalScanner s) throws IOException { Scan scan = new Scan(); scan.setFilter(new NoDataFilter()); return new StoreScanner(store, store.getScanInfo(), scan, - Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES, + scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java index 025a28d..77a03ba 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java @@ -43,13 +43,13 @@ public class NoOpScanPolicyObserver extends BaseRegionObserver { */ @Override public InternalScanner preFlushScannerOpen(final ObserverContext c, - Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { + Store store, List scanners, InternalScanner s) throws IOException { ScanInfo oldSI = store.getScanInfo(); ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getFamily(), oldSI.getTtl(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); Scan scan = new Scan(); scan.setMaxVersions(oldSI.getMaxVersions()); - return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), + return new StoreScanner(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 65ad956..30470d0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -385,8 +385,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { memstore.add(new KeyValue(row, fam, qf4, val), null); memstore.add(new KeyValue(row, fam, qf5, val), null); assertEquals(2, memstore.getActive().getCellsCount()); - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); int chunkCount = chunkPool.getPoolSize(); @@ -427,8 +429,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { List scanners = memstore.getScanners(0); // Shouldn't putting back the chunks to pool,since some scanners are opening // based on their data - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() == 0); @@ -456,8 +460,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { } // Since no opening scanner, the chunks of snapshot should be put back to // pool - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() > 0); } @@ -525,8 +531,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { // Creating another snapshot MemStoreSnapshot snapshot = memstore.snapshot(); - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); snapshot = memstore.snapshot(); @@ -541,8 +549,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { } // Since no opening scanner, the chunks of snapshot should be put back to // pool - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() > 0); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java index 748576c..d60151e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java @@ -338,13 +338,17 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore } List scanners = memstore.getScanners(Long.MAX_VALUE); // seek - scanners.get(0).seek(KeyValue.LOWESTKEY); int count = 0; - while (scanners.get(0).next() != null) { - count++; + for(int i = 0; i < scanners.size(); i++) { + scanners.get(i).seek(KeyValue.LOWESTKEY); + while (scanners.get(i).next() != null) { + count++; + } } assertEquals("the count should be ", count, 150); - scanners.get(0).close(); + for(int i = 0; i < scanners.size(); i++) { + scanners.get(i).close(); + } } @Test @@ -359,7 +363,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore // Just doing the cnt operation here MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator( ((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(), - CellComparator.COMPARATOR, 10, ((CompactingMemStore) memstore).getStore()); + CellComparator.COMPARATOR, 10); int cnt = 0; try { while (itr.next() != null) { @@ -420,8 +424,10 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore List scanners = memstore.getScanners(0); // Shouldn't putting back the chunks to pool,since some scanners are opening // based on their data - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() == 0); @@ -449,8 +455,10 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore } // Since no opening scanner, the chunks of snapshot should be put back to // pool - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() > 0); } @@ -480,8 +488,10 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore memstore.add(new KeyValue(row, fam, qf4, val), null); memstore.add(new KeyValue(row, fam, qf5, val), null); assertEquals(2, memstore.getActive().getCellsCount()); - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); int chunkCount = chunkPool.getPoolSize(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 7c7551a..5f95158 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -266,12 +266,20 @@ public class TestDefaultMemStore { protected void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) throws IOException { List memstorescanners = this.memstore.getScanners(mvcc.getReadPoint()); - assertEquals(1, memstorescanners.size()); - final KeyValueScanner scanner = memstorescanners.get(0); - scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW)); - assertEquals(kv1, scanner.next()); - assertEquals(kv2, scanner.next()); - assertNull(scanner.next()); + assertEquals(2, memstorescanners.size()); + final KeyValueScanner scanner0 = memstorescanners.get(0); + final KeyValueScanner scanner1 = memstorescanners.get(1); + scanner0.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW)); + scanner1.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW)); + KeyValue n0 = (KeyValue) scanner0.next(); + KeyValue n1 = (KeyValue) scanner1.next(); + assertTrue(kv1.equals(n0) || kv1.equals(n1)); + assertTrue(kv2.equals(n0) + || kv2.equals(n1) + || kv2.equals(scanner0.next()) + || kv2.equals(scanner1.next())); + assertNull(scanner0.next()); + assertNull(scanner1.next()); } protected void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index 49b5139..406fda5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -136,7 +136,9 @@ public class TestMemStoreChunkPool { memstore.add(new KeyValue(row, fam, qf5, val), null); assertEquals(2, memstore.getActive().getCellsCount()); // close the scanner - this is how the snapshot will be used - snapshot.getScanner().close(); + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); int chunkCount = chunkPool.getPoolSize(); @@ -180,7 +182,9 @@ public class TestMemStoreChunkPool { // Shouldn't putting back the chunks to pool,since some scanners are opening // based on their data // close the snapshot scanner - snapshot.getScanner().close(); + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() == 0); @@ -207,8 +211,10 @@ public class TestMemStoreChunkPool { } // Since no opening scanner, the chunks of snapshot should be put back to // pool - // close the snapshot scanner - snapshot.getScanner().close(); + // close the snapshot scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() > 0); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java index 00e8f0a..cb3ebe8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java @@ -114,7 +114,7 @@ public class TestReversibleScanners { LOG.info("Setting read point to " + readPoint); scanners = StoreFileScanner.getScannersForStoreFiles( Collections.singletonList(sf), false, true, false, false, readPoint); - seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint); + seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint); } } @@ -129,7 +129,7 @@ public class TestReversibleScanners { for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { LOG.info("Setting read point to " + readPoint); scanners = memstore.getScanners(readPoint); - seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint); + seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint); } } @@ -555,38 +555,68 @@ public class TestReversibleScanners { } private void seekTestOfReversibleKeyValueScannerWithMVCC( - KeyValueScanner scanner, int readPoint) throws IOException { - /** - * Test with MVCC - */ - // Test seek to last row - KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan( - ROWSIZE - 1, 0, readPoint); - assertEquals(expectedKey != null, scanner.seekToLastRow()); - assertEquals(expectedKey, scanner.peek()); + List scanners, int readPoint) throws IOException { + /** + * Test with MVCC + */ + // Test seek to last row + KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan( + ROWSIZE - 1, 0, readPoint); + boolean res = false; + for (KeyValueScanner scanner : scanners) { + res |= scanner.seekToLastRow(); + } + assertEquals(expectedKey != null, res); + res = false; + for (KeyValueScanner scanner : scanners) { + res |= (expectedKey.equals(scanner.peek())); + } + assertTrue(res); // Test backward seek in two cases // Case1: seek in the same row in backwardSeek expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2, QUALSIZE - 2, readPoint); - assertEquals(expectedKey != null, scanner.backwardSeek(expectedKey)); - assertEquals(expectedKey, scanner.peek()); + res = false; + for (KeyValueScanner scanner : scanners) { + res |= scanner.backwardSeek(expectedKey); + } + assertEquals(expectedKey != null, res); + res = false; + for (KeyValueScanner scanner : scanners) { + res |= (expectedKey.equals(scanner.peek())); + } + assertTrue(res); // Case2: seek to the previous row in backwardSeek int seekRowNum = ROWSIZE - 3; KeyValue seekKey = KeyValueUtil.createLastOnRow(ROWS[seekRowNum]); expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0, readPoint); - assertEquals(expectedKey != null, scanner.backwardSeek(seekKey)); - assertEquals(expectedKey, scanner.peek()); + res = false; + for (KeyValueScanner scanner : scanners) { + res |= scanner.backwardSeek(expectedKey); + } + res = false; + for (KeyValueScanner scanner : scanners) { + res |= (expectedKey.equals(scanner.peek())); + } + assertTrue(res); // Test seek to previous row seekRowNum = ROWSIZE - 4; expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0, readPoint); - assertEquals(expectedKey != null, scanner.seekToPreviousRow(KeyValueUtil - .createFirstOnRow(ROWS[seekRowNum]))); - assertEquals(expectedKey, scanner.peek()); + res = false; + for (KeyValueScanner scanner : scanners) { + res |= scanner.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[seekRowNum])); + } + assertEquals(expectedKey != null, res); + res = false; + for (KeyValueScanner scanner : scanners) { + res |= (expectedKey.equals(scanner.peek())); + } + assertTrue(res); } private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java index a9297de..cee1138 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -50,7 +49,6 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.ScanInfo; @@ -242,7 +240,7 @@ public class TestCoprocessorScanPolicy { @Override public InternalScanner preFlushScannerOpen( final ObserverContext c, - Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { + Store store, List scanners, InternalScanner s) throws IOException { Long newTtl = ttls.get(store.getTableName()); if (newTtl != null) { System.out.println("PreFlush:" + newTtl); @@ -257,7 +255,7 @@ public class TestCoprocessorScanPolicy { oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); Scan scan = new Scan(); scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions); - return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), + return new StoreScanner(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); } -- 2.10.1 (Apple Git-78)