From 52562f6a58517e6e8033f410406dfc3a95b6529c Mon Sep 17 00:00:00 2001 From: eshcar Date: Mon, 27 Feb 2017 19:18:25 +0200 Subject: [PATCH] HBASE-17655: Removing MemStoreScanner and SnapshotScanner --- .../example/ZooKeeperScanPolicyObserver.java | 4 +- .../hbase/coprocessor/BaseRegionObserver.java | 559 +++++++++++++++++++++ .../hadoop/hbase/coprocessor/RegionObserver.java | 18 +- .../hadoop/hbase/mob/DefaultMobStoreFlusher.java | 2 +- .../hbase/regionserver/AbstractMemStore.java | 14 + .../hbase/regionserver/CompactingMemStore.java | 20 +- .../regionserver/CompositeImmutableSegment.java | 33 +- .../hadoop/hbase/regionserver/DefaultMemStore.java | 15 +- .../hbase/regionserver/DefaultStoreFlusher.java | 2 +- .../hbase/regionserver/ImmutableSegment.java | 12 +- .../hbase/regionserver/MemStoreCompactor.java | 2 +- .../MemStoreCompactorSegmentsIterator.java | 15 +- .../MemStoreMergerSegmentsIterator.java | 50 +- .../hadoop/hbase/regionserver/MemStoreScanner.java | 334 ------------ .../regionserver/MemStoreSegmentsIterator.java | 17 +- .../hbase/regionserver/MemStoreSnapshot.java | 13 +- .../hbase/regionserver/RegionCoprocessorHost.java | 7 +- .../apache/hadoop/hbase/regionserver/Segment.java | 6 +- .../hadoop/hbase/regionserver/SegmentScanner.java | 8 +- .../hadoop/hbase/regionserver/SnapshotScanner.java | 105 ---- .../hadoop/hbase/regionserver/StoreFlusher.java | 8 +- .../hbase/regionserver/StripeStoreFlusher.java | 2 +- .../hbase/coprocessor/SimpleRegionObserver.java | 2 +- .../TestRegionObserverScannerOpenHook.java | 6 +- .../hbase/regionserver/NoOpScanPolicyObserver.java | 4 +- .../hbase/regionserver/TestCompactingMemStore.java | 30 +- .../TestCompactingToCellArrayMapMemStore.java | 32 +- .../hbase/regionserver/TestDefaultMemStore.java | 20 +- .../hbase/regionserver/TestMemStoreChunkPool.java | 14 +- .../hbase/regionserver/TestReversibleScanners.java | 66 ++- .../hbase/util/TestCoprocessorScanPolicy.java | 9 +- 31 files changed, 799 insertions(+), 630 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java index 2343c1d..b7df9b4 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java @@ -188,7 +188,7 @@ public class ZooKeeperScanPolicyObserver implements RegionObserver { @Override public InternalScanner preFlushScannerOpen(final ObserverContext c, - Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { + Store store, List scanners, InternalScanner s) throws IOException { ScanInfo scanInfo = getScanInfo(store, c.getEnvironment()); if (scanInfo == null) { // take default action @@ -196,7 +196,7 @@ public class ZooKeeperScanPolicyObserver implements RegionObserver { } Scan scan = new Scan(); scan.setMaxVersions(scanInfo.getMaxVersions()); - return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), + return new StoreScanner(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java new file mode 100644 index 0000000..8f6d743 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java @@ -0,0 +1,559 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import com.google.common.collect.ImmutableList; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; +import org.apache.hadoop.hbase.io.Reference; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Region.Operation; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileReader; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WALKey; + +/** + * An abstract class that implements RegionObserver. + * By extending it, you can create your own region observer without + * overriding all abstract methods of RegionObserver. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +@InterfaceStability.Evolving +public class BaseRegionObserver implements RegionObserver { + @Override + public void start(CoprocessorEnvironment e) throws IOException { } + + @Override + public void stop(CoprocessorEnvironment e) throws IOException { } + + @Override + public void preOpen(ObserverContext e) throws IOException { } + + @Override + public void postOpen(ObserverContext e) { } + + @Override + public void postLogReplay(ObserverContext e) { } + + @Override + public void preClose(ObserverContext c, boolean abortRequested) + throws IOException { } + + @Override + public void postClose(ObserverContext e, + boolean abortRequested) { } + + @Override + public InternalScanner preFlushScannerOpen(final ObserverContext c, + final Store store, final List scanners, final InternalScanner s) + throws IOException { + return s; + } + + @Override + public InternalScanner preFlushScannerOpen(final ObserverContext c, + final Store store, final List scanners, final InternalScanner s, + final long readPoint) throws IOException { + return preFlushScannerOpen(c, store, scanners, s); + } + + @Override + public void preFlush(ObserverContext e) throws IOException { + } + + @Override + public void postFlush(ObserverContext e) throws IOException { + } + + @Override + public InternalScanner preFlush(ObserverContext e, Store store, + InternalScanner scanner) throws IOException { + return scanner; + } + + @Override + public void postFlush(ObserverContext e, Store store, + StoreFile resultFile) throws IOException { + } + + @Override + public void preSplit(ObserverContext e) throws IOException { + } + + @Override + public void preSplit(ObserverContext c, + byte[] splitRow) throws IOException { + } + + @Override + public void preSplitBeforePONR(ObserverContext ctx, + byte[] splitKey, List metaEntries) throws IOException { + } + + @Override + public void preSplitAfterPONR( + ObserverContext ctx) throws IOException { + } + + @Override + public void preRollBackSplit(ObserverContext ctx) + throws IOException { + } + + @Override + public void postRollBackSplit( + ObserverContext ctx) throws IOException { + } + + @Override + public void postCompleteSplit( + ObserverContext ctx) throws IOException { + } + + @Override + public void postSplit(ObserverContext e, Region l, Region r) + throws IOException { + } + + @Override + public void preCompactSelection(final ObserverContext c, + final Store store, final List candidates) throws IOException { } + + @Override + public void preCompactSelection(final ObserverContext c, + final Store store, final List candidates, final CompactionRequest request) + throws IOException { + preCompactSelection(c, store, candidates); + } + + @Override + public void postCompactSelection(final ObserverContext c, + final Store store, final ImmutableList selected) { } + + @Override + public void postCompactSelection(final ObserverContext c, + final Store store, final ImmutableList selected, CompactionRequest request) { + postCompactSelection(c, store, selected); + } + + @Override + public InternalScanner preCompact(ObserverContext e, + final Store store, final InternalScanner scanner, final ScanType scanType) + throws IOException { + return scanner; + } + + @Override + public InternalScanner preCompact(ObserverContext e, + final Store store, final InternalScanner scanner, final ScanType scanType, + CompactionRequest request) throws IOException { + return preCompact(e, store, scanner, scanType); + } + + @Override + public InternalScanner preCompactScannerOpen( + final ObserverContext c, final Store store, + List scanners, final ScanType scanType, final long earliestPutTs, + final InternalScanner s) throws IOException { + return s; + } + + @Override + public InternalScanner preCompactScannerOpen( + final ObserverContext c, final Store store, + List scanners, final ScanType scanType, final long earliestPutTs, + final InternalScanner s, CompactionRequest request) throws IOException { + return preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s); + } + + @Override + public InternalScanner preCompactScannerOpen(ObserverContext c, + Store store, List scanners, ScanType scanType, long earliestPutTs, + InternalScanner s, CompactionRequest request, long readPoint) throws IOException { + return preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s, request); + } + + @Override + public void postCompact(ObserverContext e, final Store store, + final StoreFile resultFile) throws IOException { + } + +@Override + public void postCompact(ObserverContext e, final Store store, + final StoreFile resultFile, CompactionRequest request) throws IOException { + postCompact(e, store, resultFile); + } + + @Override + public void preGetOp(final ObserverContext e, + final Get get, final List results) throws IOException { + } + + @Override + public void postGetOp(final ObserverContext e, + final Get get, final List results) throws IOException { + } + + @Override + public boolean preExists(final ObserverContext e, + final Get get, final boolean exists) throws IOException { + return exists; + } + + @Override + public boolean postExists(final ObserverContext e, + final Get get, boolean exists) throws IOException { + return exists; + } + + @Override + public void prePut(final ObserverContext e, + final Put put, final WALEdit edit, final Durability durability) throws IOException { + } + + @Override + public void postPut(final ObserverContext e, + final Put put, final WALEdit edit, final Durability durability) throws IOException { + } + + @Override + public void preDelete(final ObserverContext e, final Delete delete, + final WALEdit edit, final Durability durability) throws IOException { + } + + @Override + public void prePrepareTimeStampForDeleteVersion( + final ObserverContext e, final Mutation delete, + final Cell cell, final byte[] byteNow, final Get get) throws IOException { + } + + @Override + public void postDelete(final ObserverContext e, + final Delete delete, final WALEdit edit, final Durability durability) + throws IOException { + } + + @Override + public void preBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress miniBatchOp) throws IOException { + } + + @Override + public void postBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress miniBatchOp) throws IOException { + } + + @Override + public void postBatchMutateIndispensably(final ObserverContext ctx, + MiniBatchOperationInProgress miniBatchOp, final boolean success) throws IOException { + } + + @Override + public boolean preCheckAndPut(final ObserverContext e, + final byte [] row, final byte [] family, final byte [] qualifier, + final CompareOp compareOp, final ByteArrayComparable comparator, + final Put put, final boolean result) throws IOException { + return result; + } + + @Override + public boolean preCheckAndPutAfterRowLock( + final ObserverContext e, + final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp, + final ByteArrayComparable comparator, final Put put, + final boolean result) throws IOException { + return result; + } + + @Override + public boolean postCheckAndPut(final ObserverContext e, + final byte [] row, final byte [] family, final byte [] qualifier, + final CompareOp compareOp, final ByteArrayComparable comparator, + final Put put, final boolean result) throws IOException { + return result; + } + + @Override + public boolean preCheckAndDelete(final ObserverContext e, + final byte [] row, final byte [] family, final byte [] qualifier, + final CompareOp compareOp, final ByteArrayComparable comparator, + final Delete delete, final boolean result) throws IOException { + return result; + } + + @Override + public boolean preCheckAndDeleteAfterRowLock( + final ObserverContext e, + final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp, + final ByteArrayComparable comparator, final Delete delete, + final boolean result) throws IOException { + return result; + } + + @Override + public boolean postCheckAndDelete(final ObserverContext e, + final byte [] row, final byte [] family, final byte [] qualifier, + final CompareOp compareOp, final ByteArrayComparable comparator, + final Delete delete, final boolean result) throws IOException { + return result; + } + + @Override + public Result preAppend(final ObserverContext e, + final Append append) throws IOException { + return null; + } + + @Override + public Result preAppendAfterRowLock(final ObserverContext e, + final Append append) throws IOException { + return null; + } + + @Override + public Result postAppend(final ObserverContext e, + final Append append, final Result result) throws IOException { + return result; + } + + @Override + public long preIncrementColumnValue(final ObserverContext e, + final byte [] row, final byte [] family, final byte [] qualifier, + final long amount, final boolean writeToWAL) throws IOException { + return amount; + } + + @Override + public long postIncrementColumnValue(final ObserverContext e, + final byte [] row, final byte [] family, final byte [] qualifier, + final long amount, final boolean writeToWAL, long result) + throws IOException { + return result; + } + + @Override + public Result preIncrement(final ObserverContext e, + final Increment increment) throws IOException { + return null; + } + + @Override + public Result preIncrementAfterRowLock(final ObserverContext e, + final Increment increment) throws IOException { + return null; + } + + @Override + public Result postIncrement(final ObserverContext e, + final Increment increment, final Result result) throws IOException { + return result; + } + + @Override + public RegionScanner preScannerOpen(final ObserverContext e, + final Scan scan, final RegionScanner s) throws IOException { + return s; + } + + @Override + public KeyValueScanner preStoreScannerOpen(final ObserverContext c, + final Store store, final Scan scan, final NavigableSet targetCols, + final KeyValueScanner s) throws IOException { + return s; + } + + @Override + public KeyValueScanner preStoreScannerOpen(final ObserverContext c, + final Store store, final Scan scan, final NavigableSet targetCols, + final KeyValueScanner s, final long readPt) throws IOException { + return preStoreScannerOpen(c, store, scan, targetCols, s); + } + + @Override + public RegionScanner postScannerOpen(final ObserverContext e, + final Scan scan, final RegionScanner s) throws IOException { + return s; + } + + @Override + public boolean preScannerNext(final ObserverContext e, + final InternalScanner s, final List results, + final int limit, final boolean hasMore) throws IOException { + return hasMore; + } + + @Override + public boolean postScannerNext(final ObserverContext e, + final InternalScanner s, final List results, final int limit, + final boolean hasMore) throws IOException { + return hasMore; + } + + @Override + @Deprecated + public boolean postScannerFilterRow(final ObserverContext e, + final InternalScanner s, final byte[] currentRow, final int offset, final short length, + final boolean hasMore) throws IOException { + return hasMore; + } + + @Override + public boolean postScannerFilterRow(final ObserverContext e, + final InternalScanner s, final Cell curRowCell, final boolean hasMore) throws IOException { + return postScannerFilterRow(e, s, curRowCell.getRowArray(), curRowCell.getRowOffset(), + curRowCell.getRowLength(), hasMore); + } + + @Override + public void preScannerClose(final ObserverContext e, + final InternalScanner s) throws IOException { + } + + @Override + public void postScannerClose(final ObserverContext e, + final InternalScanner s) throws IOException { + } + + @Override + public void preReplayWALs(ObserverContext env, + HRegionInfo info, Path edits) throws IOException { + } + + @Override + public void postReplayWALs(ObserverContext env, + HRegionInfo info, Path edits) throws IOException { + } + + /** + * Implementers should override this version of the method and leave the deprecated one as-is. + */ + @Override + public void preWALRestore(ObserverContext env, + HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { + } + + /** + * Implementers should override this version of the method and leave the deprecated one as-is. + */ + @Override + public void postWALRestore(ObserverContext env, + HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { + } + + @Override + public void preBulkLoadHFile(final ObserverContext ctx, + List> familyPaths) throws IOException { + } + + @Override + public void preCommitStoreFile(final ObserverContext ctx, + final byte[] family, final List> pairs) throws IOException { + } + + @Override + public void postCommitStoreFile(final ObserverContext ctx, + final byte[] family, Path srcPath, Path dstPath) throws IOException { + } + + @Override + public boolean postBulkLoadHFile(ObserverContext ctx, + List> stagingFamilyPaths, Map> finalPaths, + boolean hasLoaded) throws IOException { + return postBulkLoadHFile(ctx, stagingFamilyPaths, hasLoaded); + } + + @Override + public boolean postBulkLoadHFile(ObserverContext ctx, + List> stagingFamilyPaths, boolean hasLoaded) throws IOException { + return hasLoaded; + } + + @Override + public StoreFileReader preStoreFileReaderOpen(ObserverContext ctx, + FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf, + Reference r, StoreFileReader reader) throws IOException { + return reader; + } + + @Override + public StoreFileReader postStoreFileReaderOpen(ObserverContext ctx, + FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf, + Reference r, StoreFileReader reader) throws IOException { + return reader; + } + + @Override + public Cell postMutationBeforeWAL(ObserverContext ctx, + MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException { + return newCell; + } + + @Override + public void postStartRegionOperation(final ObserverContext ctx, + Operation op) throws IOException { + } + + @Override + public void postCloseRegionOperation(final ObserverContext ctx, + Operation op) throws IOException { + } + + @Override + public DeleteTracker postInstantiateDeleteTracker( + final ObserverContext ctx, DeleteTracker delTracker) + throws IOException { + return delTracker; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index a3db3b1..c154283 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -128,16 +128,16 @@ public interface RegionObserver extends Coprocessor { * effect in this hook. * @param c the environment provided by the region server * @param store the store being flushed - * @param memstoreScanner the scanner for the memstore that is flushed + * @param scanners the scanners for the memstore that is flushed * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain * @return the scanner to use during the flush. {@code null} if the default implementation * is to be used. - * @deprecated Use {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, + * @deprecated Use {@link #preFlushScannerOpen(ObserverContext, Store, List, * InternalScanner, long)} */ @Deprecated default InternalScanner preFlushScannerOpen(final ObserverContext c, - final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s) + final Store store, final List scanners, final InternalScanner s) throws IOException { return s; } @@ -151,16 +151,16 @@ public interface RegionObserver extends Coprocessor { * effect in this hook. * @param c the environment provided by the region server * @param store the store being flushed - * @param memstoreScanner the scanner for the memstore that is flushed + * @param scanners the scanners for the memstore that is flushed * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain * @param readPoint the readpoint to create scanner * @return the scanner to use during the flush. {@code null} if the default implementation * is to be used. */ default InternalScanner preFlushScannerOpen(final ObserverContext c, - final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s, + final Store store, final List scanners, final InternalScanner s, final long readPoint) throws IOException { - return preFlushScannerOpen(c, store, memstoreScanner, s); + return preFlushScannerOpen(c, store, scanners, s); } /** @@ -1113,8 +1113,7 @@ public interface RegionObserver extends Coprocessor { * Called before a store opens a new scanner. * This hook is called when a "user" scanner is opened. *

- * See {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, InternalScanner, - * long)} and {@link #preCompactScannerOpen(ObserverContext, + * See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} and {@link #preCompactScannerOpen(ObserverContext, * Store, List, ScanType, long, InternalScanner, CompactionRequest, long)} * to override scanners created for flushes or compactions, resp. *

@@ -1145,8 +1144,7 @@ public interface RegionObserver extends Coprocessor { * Called before a store opens a new scanner. * This hook is called when a "user" scanner is opened. *

- * See {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, InternalScanner, - * long)} and {@link #preCompactScannerOpen(ObserverContext, + * See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} and {@link #preCompactScannerOpen(ObserverContext, * Store, List, ScanType, long, InternalScanner, CompactionRequest, long)} * to override scanners created for flushes or compactions, resp. *

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java index 77f167e..ecab5d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java @@ -101,7 +101,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher { // Use a store scanner to find which rows to flush. long smallestReadPoint = store.getSmallestReadPoint(); - InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint); + InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint); if (scanner == null) { return result; // NULL scanner returned from coprocessor hooks means skip normal processing } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index 225dd73..b438194 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -59,6 +59,20 @@ public abstract class AbstractMemStore implements MemStore { public final static long DEEP_OVERHEAD = FIXED_OVERHEAD; + public static long addToScanners(List segments, long readPt, long order, + List scanners) { + for (Segment item : segments) { + order = addToScanners(item, readPt, order, scanners); + } + return order; + } + + protected static long addToScanners(Segment segment, long readPt, long order, + List scanners) { + scanners.add(segment.getScanner(readPt, order)); + return order-1; + } + protected AbstractMemStore(final Configuration conf, final CellComparator c) { this.conf = conf; this.comparator = c; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index e7f4a67..9c2c2d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; @@ -294,21 +293,14 @@ public class CompactingMemStore extends AbstractMemStore { */ public List getScanners(long readPt) throws IOException { List pipelineList = pipeline.getSegments(); - int order = pipelineList.size() + snapshot.getNumOfSegments(); + long order = pipelineList.size() + snapshot.getNumOfSegments() + 1; // The list of elements in pipeline + the active element + the snapshot segment - // TODO : This will change when the snapshot is made of more than one element // The order is the Segment ordinal - List list = new ArrayList(order+1); - list.add(this.active.getScanner(readPt, order + 1)); - for (Segment item : pipelineList) { - list.add(item.getScanner(readPt, order)); - order--; - } - for (Segment item : snapshot.getAllSegments()) { - list.add(item.getScanner(readPt, order)); - order--; - } - return Collections. singletonList(new MemStoreScanner(getComparator(), list)); + List list = new ArrayList((int) order); + order = addToScanners(active, readPt, order, list); + order = addToScanners(pipelineList, readPt, order, list); + order = addToScanners(snapshot.getAllSegments(), readPt, order, list); + return list; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java index 30d17fb..5f53638 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java @@ -22,11 +22,9 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; -import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; @@ -73,15 +71,6 @@ public class CompositeImmutableSegment extends ImmutableSegment { } /** - * Builds a special scanner for the MemStoreSnapshot object that is different than the - * general segment scanner. - * @return a special scanner for the MemStoreSnapshot object - */ - public KeyValueScanner getSnapshotScanner() { - return getScanner(Long.MAX_VALUE, Long.MAX_VALUE); - } - - /** * @return whether the segment has any cells */ public boolean isEmpty() { @@ -140,8 +129,7 @@ public class CompositeImmutableSegment extends ImmutableSegment { * @return a scanner for the given read point */ public KeyValueScanner getScanner(long readPoint) { - // Long.MAX_VALUE is DEFAULT_SCANNER_ORDER - return getScanner(readPoint,Long.MAX_VALUE); + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); } /** @@ -149,19 +137,14 @@ public class CompositeImmutableSegment extends ImmutableSegment { * @return a scanner for the given read point */ public KeyValueScanner getScanner(long readPoint, long order) { - KeyValueScanner resultScanner; - List list = new ArrayList(segments.size()); - for (ImmutableSegment s : segments) { - list.add(s.getScanner(readPoint, order)); - } - - try { - resultScanner = new MemStoreScanner(getComparator(), list); - } catch (IOException ie) { - throw new IllegalStateException(ie); - } + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } - return resultScanner; + @Override + public List getScanners(long readPoint, long order) { + List list = new ArrayList<>(segments.size()); + AbstractMemStore.addToScanners(segments, readPoint, order, list); + return list; } public boolean isTagsPresent() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 63af570..73a4602 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import org.apache.commons.logging.Log; @@ -72,10 +71,6 @@ public class DefaultMemStore extends AbstractMemStore { super(conf, c); } - void dump() { - super.dump(LOG); - } - /** * Creates a snapshot of the current memstore. * Snapshot must be cleared by call to {@link #clearSnapshot(long)} @@ -126,11 +121,11 @@ public class DefaultMemStore extends AbstractMemStore { * Scanners are ordered from 0 (oldest) to newest in increasing order. */ public List getScanners(long readPt) throws IOException { - List list = new ArrayList(2); - list.add(this.active.getScanner(readPt, 1)); - list.add(this.snapshot.getScanner(readPt, 0)); - return Collections. singletonList( - new MemStoreScanner(getComparator(), list)); + List list = new ArrayList<>(); + long order = snapshot.getNumOfSegments(); + order = addToScanners(active, readPt, order, list); + order = addToScanners(snapshot.getAllSegments(), readPt, order, list); + return list; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java index 93837b7..66f310e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java @@ -52,7 +52,7 @@ public class DefaultStoreFlusher extends StoreFlusher { // Use a store scanner to find which rows to flush. long smallestReadPoint = store.getSmallestReadPoint(); - InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint); + InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint); if (scanner == null) { return result; // NULL scanner returned from coprocessor hooks means skip normal processing } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index faa9b67..d103a58 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -35,9 +35,7 @@ import java.util.List; /** * ImmutableSegment is an abstract class that extends the API supported by a {@link Segment}, - * and is not needed for a {@link MutableSegment}. Specifically, the method - * {@link ImmutableSegment#getSnapshotScanner()} builds a special scanner for the - * {@link MemStoreSnapshot} object. + * and is not needed for a {@link MutableSegment}. */ @InterfaceAudience.Private public class ImmutableSegment extends Segment { @@ -132,14 +130,6 @@ public class ImmutableSegment extends Segment { } ///////////////////// PUBLIC METHODS ///////////////////// - /** - * Builds a special scanner for the MemStoreSnapshot object that is different than the - * general segment scanner. - * @return a special scanner for the MemStoreSnapshot object - */ - public KeyValueScanner getSnapshotScanner() { - return new SnapshotScanner(this); - } @Override public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index c435098..dfa7d18 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -252,7 +252,7 @@ public class MemStoreCompactor { iterator = new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(), compactingMemStore.getComparator(), - compactionKVMax, compactingMemStore.getStore()); + compactionKVMax); result = SegmentFactory.instance().createImmutableSegmentByMerge( compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java index f31c973..4514c7a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.client.Scan; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -50,11 +49,16 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator List segments, CellComparator comparator, int compactionKVMax, Store store ) throws IOException { - super(segments,comparator,compactionKVMax,store); + super(compactionKVMax); + List scanners = new ArrayList(); + // create the list of scanners to traverse over all the data + // no dirty reads here as these are immutable segments + int order = segments.size(); + AbstractMemStore.addToScanners(segments, Integer.MAX_VALUE, order, scanners); // build the scanner based on Query Matcher // reinitialize the compacting scanner for each instance of iterator - compactingScanner = createScanner(store, scanner); + compactingScanner = createScanner(store, scanners); hasMore = compactingScanner.next(kvs, scannerContext); @@ -93,7 +97,6 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator public void close() { compactingScanner.close(); compactingScanner = null; - scanner = null; } @Override @@ -106,13 +109,13 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator * * @return the scanner */ - private StoreScanner createScanner(Store store, KeyValueScanner scanner) + private StoreScanner createScanner(Store store, List scanners) throws IOException { Scan scan = new Scan(); scan.setMaxVersions(); //Get all available versions StoreScanner internalScanner = - new StoreScanner(store, store.getScanInfo(), scan, Collections.singletonList(scanner), + new StoreScanner(store, store.getScanInfo(), scan, scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java index 625fc76..3cb3163 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import java.io.IOException; +import java.util.ArrayList; import java.util.List; /** @@ -33,32 +34,63 @@ import java.util.List; @InterfaceAudience.Private public class MemStoreMergerSegmentsIterator extends MemStoreSegmentsIterator { + // heap of scanners, lazily initialized + private KeyValueHeap heap = null; + // remember the initial version of the scanners list + List scanners = new ArrayList(); + + private boolean closed = false; + // C-tor public MemStoreMergerSegmentsIterator(List segments, CellComparator comparator, - int compactionKVMax, Store store - ) throws IOException { - super(segments,comparator,compactionKVMax,store); + int compactionKVMax) throws IOException { + super(compactionKVMax); + // create the list of scanners to traverse over all the data + // no dirty reads here as these are immutable segments + int order = segments.size(); + AbstractMemStore.addToScanners(segments, Integer.MAX_VALUE, order, scanners); + heap = new KeyValueHeap(scanners, comparator); } @Override public boolean hasNext() { - return (scanner.peek()!=null); + if (closed) { + return false; + } + if (this.heap != null) { + return (this.heap.peek() != null); + } + // Doing this way in case some test cases tries to peek directly + return false; } @Override public Cell next() { - Cell result = null; try { // try to get next - result = scanner.next(); + if (!closed && heap != null) { + return heap.next(); + } } catch (IOException ie) { throw new IllegalStateException(ie); } - return result; + return null; } public void close() { - scanner.close(); - scanner = null; + if (closed) { + return; + } + // Ensuring that all the segment scanners are closed + if (heap != null) { + heap.close(); + // It is safe to do close as no new calls will be made to this scanner. + heap = null; + } else { + for (KeyValueScanner scanner : scanners) { + scanner.close(); + } + } + closed = true; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java deleted file mode 100644 index 2ccdf68..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java +++ /dev/null @@ -1,334 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.htrace.Trace; - -/** - * This is the scanner for any MemStore implementation, derived from MemStore. - * The MemStoreScanner combines KeyValueScanner from different Segments and - * uses the key-value heap and the reversed key-value heap for the aggregated key-values set. - * It is assumed that only traversing forward or backward is used (without zigzagging in between) - */ -@InterfaceAudience.Private -public class MemStoreScanner extends NonLazyKeyValueScanner { - - // heap of scanners, lazily initialized - private KeyValueHeap heap; - - // indicates if the scanner is created for inmemoryCompaction - private boolean inmemoryCompaction; - - // remember the initial version of the scanners list - List scanners; - - private final CellComparator comparator; - - private boolean closed; - - /** - * Creates either a forward KeyValue heap or Reverse KeyValue heap based on the type of scan - * and the heap is lazily initialized - * @param comparator Cell Comparator - * @param scanners List of scanners, from which the heap will be built - * @param inmemoryCompaction true if used for inmemoryCompaction. - * In this case, creates a forward heap always. - */ - public MemStoreScanner(CellComparator comparator, List scanners, - boolean inmemoryCompaction) throws IOException { - super(); - this.comparator = comparator; - this.scanners = scanners; - if (Trace.isTracing() && Trace.currentSpan() != null) { - Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner"); - } - this.inmemoryCompaction = inmemoryCompaction; - if (inmemoryCompaction) { - // init the forward scanner in case of inmemoryCompaction - initForwardKVHeapIfNeeded(comparator, scanners); - } - } - - /** - * Creates either a forward KeyValue heap or Reverse KeyValue heap based on the type of scan - * and the heap is lazily initialized - * @param comparator Cell Comparator - * @param scanners List of scanners, from which the heap will be built - */ - public MemStoreScanner(CellComparator comparator, List scanners) - throws IOException { - this(comparator, scanners, false); - } - - private void initForwardKVHeapIfNeeded(CellComparator comparator, List scanners) - throws IOException { - if (heap == null) { - // lazy init - // In a normal scan case, at the StoreScanner level before the KVHeap is - // created we do a seek or reseek. So that will happen - // on all the scanners that the StoreScanner is - // made of. So when we get any of those call to this scanner we init the - // heap here with normal forward KVHeap. - this.heap = new KeyValueHeap(scanners, comparator); - } - } - - private boolean initReverseKVHeapIfNeeded(Cell seekKey, CellComparator comparator, - List scanners) throws IOException { - boolean res = false; - if (heap == null) { - // lazy init - // In a normal reverse scan case, at the ReversedStoreScanner level before the - // ReverseKeyValueheap is - // created we do a seekToLastRow or backwardSeek. So that will happen - // on all the scanners that the ReversedStoreSCanner is - // made of. So when we get any of those call to this scanner we init the - // heap here with ReversedKVHeap. - if (CellUtil.matchingRow(seekKey, HConstants.EMPTY_START_ROW)) { - for (KeyValueScanner scanner : scanners) { - res |= scanner.seekToLastRow(); - } - } else { - for (KeyValueScanner scanner : scanners) { - res |= scanner.backwardSeek(seekKey); - } - } - this.heap = new ReversedKeyValueHeap(scanners, comparator); - } - return res; - } - - /** - * Returns the cell from the top-most scanner without advancing the iterator. - * The backward traversal is assumed, only if specified explicitly - */ - @Override - public Cell peek() { - if (closed) { - return null; - } - if (this.heap != null) { - return this.heap.peek(); - } - // Doing this way in case some test cases tries to peek directly to avoid NPE - return null; - } - - /** - * Gets the next cell from the top-most scanner. Assumed forward scanning. - */ - @Override - public Cell next() throws IOException { - if (closed) { - return null; - } - if(this.heap != null) { - // loop over till the next suitable value - // take next value from the heap - for (Cell currentCell = heap.next(); - currentCell != null; - currentCell = heap.next()) { - // all the logic of presenting cells is inside the internal KeyValueScanners - // located inside the heap - return currentCell; - } - } - return null; - } - - /** - * Set the scanner at the seek key. Assumed forward scanning. - * Must be called only once: there is no thread safety between the scanner - * and the memStore. - * - * @param cell seek value - * @return false if the key is null or if there is no data - */ - @Override - public boolean seek(Cell cell) throws IOException { - if (closed) { - return false; - } - initForwardKVHeapIfNeeded(comparator, scanners); - - if (cell == null) { - close(); - return false; - } - - return heap.seek(cell); - } - - /** - * Move forward on the sub-lists set previously by seek. Assumed forward scanning. - * - * @param cell seek value (should be non-null) - * @return true if there is at least one KV to read, false otherwise - */ - @Override - public boolean reseek(Cell cell) throws IOException { - /* - * See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation. - * This code is executed concurrently with flush and puts, without locks. - * Two points must be known when working on this code: - * 1) It's not possible to use the 'kvTail' and 'snapshot' - * variables, as they are modified during a flush. - * 2) The ideal implementation for performance would use the sub skip list - * implicitly pointed by the iterators 'kvsetIt' and - * 'snapshotIt'. Unfortunately the Java API does not offer a method to - * get it. So we remember the last keys we iterated to and restore - * the reseeked set to at least that point. - * - * TODO: The above comment copied from the original MemStoreScanner - */ - if (closed) { - return false; - } - initForwardKVHeapIfNeeded(comparator, scanners); - return heap.reseek(cell); - } - - /** - * MemStoreScanner returns Long.MAX_VALUE because it will always have the latest data among all - * scanners. - * @see KeyValueScanner#getScannerOrder() - */ - @Override - public long getScannerOrder() { - return Long.MAX_VALUE; - } - - @Override - public void close() { - if (closed) { - return; - } - // Ensuring that all the segment scanners are closed - if (heap != null) { - heap.close(); - // It is safe to do close as no new calls will be made to this scanner. - heap = null; - } else { - for (KeyValueScanner scanner : scanners) { - scanner.close(); - } - } - closed = true; - } - - /** - * Set the scanner at the seek key. Assumed backward scanning. - * - * @param cell seek value - * @return false if the key is null or if there is no data - */ - @Override - public boolean backwardSeek(Cell cell) throws IOException { - // The first time when this happens it sets the scanners to the seek key - // passed by the incoming scan's start row - if (closed) { - return false; - } - initReverseKVHeapIfNeeded(cell, comparator, scanners); - return heap.backwardSeek(cell); - } - - /** - * Assumed backward scanning. - * - * @param cell seek value - * @return false if the key is null or if there is no data - */ - @Override - public boolean seekToPreviousRow(Cell cell) throws IOException { - if (closed) { - return false; - } - initReverseKVHeapIfNeeded(cell, comparator, scanners); - if (heap.peek() == null) { - restartBackwardHeap(cell); - } - return heap.seekToPreviousRow(cell); - } - - @Override - public boolean seekToLastRow() throws IOException { - if (closed) { - return false; - } - return initReverseKVHeapIfNeeded(KeyValue.LOWESTKEY, comparator, scanners); - } - - /** - * Check if this memstore may contain the required keys - * @return False if the key definitely does not exist in this Memstore - */ - @Override - public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { - // TODO : Check if this can be removed. - if (inmemoryCompaction) { - return true; - } - - for (KeyValueScanner sc : scanners) { - if (sc.shouldUseScanner(scan, store, oldestUnexpiredTS)) { - return true; - } - } - return false; - } - - // debug method - @Override - public String toString() { - StringBuffer buf = new StringBuffer(); - int i = 1; - for (KeyValueScanner scanner : scanners) { - buf.append("scanner (" + i + ") " + scanner.toString() + " ||| "); - i++; - } - return buf.toString(); - } - /****************** Private methods ******************/ - /** - * Restructure the ended backward heap after rerunning a seekToPreviousRow() - * on each scanner - * @return false if given Cell does not exist in any scanner - */ - private boolean restartBackwardHeap(Cell cell) throws IOException { - boolean res = false; - for (KeyValueScanner scan : scanners) { - res |= scan.seekToPreviousRow(cell); - } - this.heap = - new ReversedKeyValueHeap(scanners, comparator); - return res; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java index e2f4ebb..dd6f28f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java @@ -35,29 +35,14 @@ import java.util.*; @InterfaceAudience.Private public abstract class MemStoreSegmentsIterator implements Iterator { - // scanner for full or partial pipeline (heap of segment scanners) - // we need to keep those scanners in order to close them at the end - protected KeyValueScanner scanner; - protected final ScannerContext scannerContext; // C-tor - public MemStoreSegmentsIterator(List segments, CellComparator comparator, - int compactionKVMax, Store store) throws IOException { + public MemStoreSegmentsIterator(int compactionKVMax) throws IOException { this.scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); - // list of Scanners of segments in the pipeline, when compaction starts - List scanners = new ArrayList(); - - // create the list of scanners to traverse over all the data - // no dirty reads here as these are immutable segments - for (ImmutableSegment segment : segments) { - scanners.add(segment.getScanner(Integer.MAX_VALUE)); - } - - scanner = new MemStoreScanner(comparator, scanners, true); } public abstract void close(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java index 61e7876..82fec2f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import java.util.List; /** * Holds details of the snapshot taken on a MemStore. Details include the snapshot's identifier, * count of cells in it and total memory size occupied by all the cells, timestamp information of @@ -31,7 +32,7 @@ public class MemStoreSnapshot { private final long dataSize; private final long heapOverhead; private final TimeRangeTracker timeRangeTracker; - private final KeyValueScanner scanner; + private final List scanners; private final boolean tagsPresent; public MemStoreSnapshot(long id, ImmutableSegment snapshot) { @@ -40,7 +41,7 @@ public class MemStoreSnapshot { this.dataSize = snapshot.keySize(); this.heapOverhead = snapshot.heapOverhead(); this.timeRangeTracker = snapshot.getTimeRangeTracker(); - this.scanner = snapshot.getSnapshotScanner(); + this.scanners = snapshot.getScanners(Long.MAX_VALUE, Long.MAX_VALUE); this.tagsPresent = snapshot.isTagsPresent(); } @@ -66,21 +67,21 @@ public class MemStoreSnapshot { } public long getHeapOverhead() { - return this.heapOverhead; + return heapOverhead; } /** * @return {@link TimeRangeTracker} for all the Cells in the snapshot. */ public TimeRangeTracker getTimeRangeTracker() { - return this.timeRangeTracker; + return timeRangeTracker; } /** * @return {@link KeyValueScanner} for iterating over the snapshot */ - public KeyValueScanner getScanner() { - return this.scanner; + public List getScanners() { + return scanners; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 649273d..91351d1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -629,17 +629,16 @@ public class RegionCoprocessorHost /** * See - * {@link RegionObserver#preFlushScannerOpen(ObserverContext, - * Store, KeyValueScanner, InternalScanner, long)} + * {@link RegionObserver#preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} */ public InternalScanner preFlushScannerOpen(final Store store, - final KeyValueScanner memstoreScanner, final long readPoint) throws IOException { + final List scanners, final long readPoint) throws IOException { return execOperationWithResult(null, coprocessors.isEmpty() ? null : new RegionOperationWithResult() { @Override public void call(RegionObserver oserver, ObserverContext ctx) throws IOException { - setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult(), readPoint)); + setResult(oserver.preFlushScannerOpen(ctx, store, scanners, getResult(), readPoint)); } }); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java index 8581517..2faf1dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java @@ -18,7 +18,7 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.SortedSet; @@ -115,9 +115,7 @@ public abstract class Segment { } public List getScanners(long readPoint, long order) { - List scanners = new ArrayList(1); - scanners.add(getScanner(readPoint, order)); - return scanners; + return Collections.singletonList(new SegmentScanner(this, readPoint, order)); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index 5e2e36f..6f622d1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.SortedSet; +import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -287,9 +288,7 @@ public class SegmentScanner implements KeyValueScanner { @Override public boolean requestSeek(Cell c, boolean forward, boolean useBloom) throws IOException { - - throw new IllegalStateException( - "requestSeek cannot be called on MutableCellSetSegmentScanner"); + return NonLazyKeyValueScanner.doRealSeek(this, c, forward); } /** @@ -309,8 +308,7 @@ public class SegmentScanner implements KeyValueScanner { */ @Override public void enforceSeek() throws IOException { - throw new IllegalStateException( - "enforceSeek cannot be called on MutableCellSetSegmentScanner"); + throw new NotImplementedException("enforceSeek must not be called on a SegmentScanner"); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java deleted file mode 100644 index 6300e00..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.commons.lang.NotImplementedException; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Scan; - -/** - * Scans the snapshot. Acts as a simple scanner that just iterates over all the cells - * in the segment - */ -@InterfaceAudience.Private -public class SnapshotScanner extends SegmentScanner { - - public SnapshotScanner(Segment immutableSegment) { - // Snapshot scanner does not need readpoint. It should read all the cells in the - // segment - super(immutableSegment, Long.MAX_VALUE); - } - - @Override - public Cell peek() { // sanity check, the current should be always valid - if (closed) { - return null; - } - return current; - } - - @Override - public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { - return true; - } - - @Override - public boolean backwardSeek(Cell key) throws IOException { - throw new NotImplementedException( - "backwardSeek must not be called on a " + "non-reversed scanner"); - } - - @Override - public boolean seekToPreviousRow(Cell key) throws IOException { - throw new NotImplementedException( - "seekToPreviousRow must not be called on a " + "non-reversed scanner"); - } - - @Override - public boolean seekToLastRow() throws IOException { - throw new NotImplementedException( - "seekToLastRow must not be called on a " + "non-reversed scanner"); - } - - @Override - protected Iterator getIterator(Cell cell) { - return segment.iterator(); - } - - @Override - protected void updateCurrent() { - if (iter.hasNext()) { - current = iter.next(); - } else { - current = null; - } - } - - @Override - public boolean seek(Cell seekCell) { - // restart iterator - iter = getIterator(seekCell); - return reseek(seekCell); - } - - @Override - public boolean reseek(Cell seekCell) { - while (iter.hasNext()) { - Cell next = iter.next(); - int ret = segment.getComparator().compare(next, seekCell); - if (ret >= 0) { - current = next; - return true; - } - } - return false; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index abfd3fc..7cc5821 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -74,22 +74,22 @@ abstract class StoreFlusher { /** * Creates the scanner for flushing snapshot. Also calls coprocessors. - * @param snapshotScanner + * @param snapshotScanners * @param smallestReadPoint * @return The scanner; null if coprocessor is canceling the flush. */ - protected InternalScanner createScanner(KeyValueScanner snapshotScanner, + protected InternalScanner createScanner(List snapshotScanners, long smallestReadPoint) throws IOException { InternalScanner scanner = null; if (store.getCoprocessorHost() != null) { - scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanner, + scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanners, smallestReadPoint); } if (scanner == null) { Scan scan = new Scan(); scan.setMaxVersions(store.getScanInfo().getMaxVersions()); scanner = new StoreScanner(store, store.getScanInfo(), scan, - Collections.singletonList(snapshotScanner), ScanType.COMPACT_RETAIN_DELETES, + snapshotScanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP); } assert scanner != null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java index 22c3ce7..ee157ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java @@ -62,7 +62,7 @@ public class StripeStoreFlusher extends StoreFlusher { if (cellsCount == 0) return result; // don't flush if there are no entries long smallestReadPoint = store.getSmallestReadPoint(); - InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint); + InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint); if (scanner == null) { return result; // NULL scanner returned from coprocessor hooks means skip normal processing } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index ec4601c..24b5051 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -187,7 +187,7 @@ public class SimpleRegionObserver implements RegionObserver { @Override public InternalScanner preFlushScannerOpen(final ObserverContext c, - Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { + Store store, List scanners, InternalScanner s) throws IOException { ctPreFlushScannerOpen.incrementAndGet(); return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index ce36af8..80d0e3a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -50,7 +49,6 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.Region; @@ -122,11 +120,11 @@ public class TestRegionObserverScannerOpenHook { public static class NoDataFromFlush implements RegionObserver { @Override public InternalScanner preFlushScannerOpen(ObserverContext c, - Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { + Store store, List scanners, InternalScanner s) throws IOException { Scan scan = new Scan(); scan.setFilter(new NoDataFilter()); return new StoreScanner(store, store.getScanInfo(), scan, - Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES, + scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java index 2d096fa..c47ed68 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java @@ -43,13 +43,13 @@ public class NoOpScanPolicyObserver implements RegionObserver { */ @Override public InternalScanner preFlushScannerOpen(final ObserverContext c, - Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { + Store store, List scanners, InternalScanner s) throws IOException { ScanInfo oldSI = store.getScanInfo(); ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getFamily(), oldSI.getTtl(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); Scan scan = new Scan(); scan.setMaxVersions(oldSI.getMaxVersions()); - return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), + return new StoreScanner(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 65ad956..30470d0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -385,8 +385,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { memstore.add(new KeyValue(row, fam, qf4, val), null); memstore.add(new KeyValue(row, fam, qf5, val), null); assertEquals(2, memstore.getActive().getCellsCount()); - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); int chunkCount = chunkPool.getPoolSize(); @@ -427,8 +429,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { List scanners = memstore.getScanners(0); // Shouldn't putting back the chunks to pool,since some scanners are opening // based on their data - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() == 0); @@ -456,8 +460,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { } // Since no opening scanner, the chunks of snapshot should be put back to // pool - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() > 0); } @@ -525,8 +531,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { // Creating another snapshot MemStoreSnapshot snapshot = memstore.snapshot(); - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); snapshot = memstore.snapshot(); @@ -541,8 +549,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { } // Since no opening scanner, the chunks of snapshot should be put back to // pool - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() > 0); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java index 748576c..d60151e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java @@ -338,13 +338,17 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore } List scanners = memstore.getScanners(Long.MAX_VALUE); // seek - scanners.get(0).seek(KeyValue.LOWESTKEY); int count = 0; - while (scanners.get(0).next() != null) { - count++; + for(int i = 0; i < scanners.size(); i++) { + scanners.get(i).seek(KeyValue.LOWESTKEY); + while (scanners.get(i).next() != null) { + count++; + } } assertEquals("the count should be ", count, 150); - scanners.get(0).close(); + for(int i = 0; i < scanners.size(); i++) { + scanners.get(i).close(); + } } @Test @@ -359,7 +363,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore // Just doing the cnt operation here MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator( ((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(), - CellComparator.COMPARATOR, 10, ((CompactingMemStore) memstore).getStore()); + CellComparator.COMPARATOR, 10); int cnt = 0; try { while (itr.next() != null) { @@ -420,8 +424,10 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore List scanners = memstore.getScanners(0); // Shouldn't putting back the chunks to pool,since some scanners are opening // based on their data - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() == 0); @@ -449,8 +455,10 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore } // Since no opening scanner, the chunks of snapshot should be put back to // pool - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() > 0); } @@ -480,8 +488,10 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore memstore.add(new KeyValue(row, fam, qf4, val), null); memstore.add(new KeyValue(row, fam, qf5, val), null); assertEquals(2, memstore.getActive().getCellsCount()); - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); int chunkCount = chunkPool.getPoolSize(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 43c185a..65ce7e4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -266,12 +266,20 @@ public class TestDefaultMemStore { protected void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) throws IOException { List memstorescanners = this.memstore.getScanners(mvcc.getReadPoint()); - assertEquals(1, memstorescanners.size()); - final KeyValueScanner scanner = memstorescanners.get(0); - scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW)); - assertEquals(kv1, scanner.next()); - assertEquals(kv2, scanner.next()); - assertNull(scanner.next()); + assertEquals(2, memstorescanners.size()); + final KeyValueScanner scanner0 = memstorescanners.get(0); + final KeyValueScanner scanner1 = memstorescanners.get(1); + scanner0.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW)); + scanner1.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW)); + Cell n0 = scanner0.next(); + Cell n1 = scanner1.next(); + assertTrue(kv1.equals(n0) || kv1.equals(n1)); + assertTrue(kv2.equals(n0) + || kv2.equals(n1) + || kv2.equals(scanner0.next()) + || kv2.equals(scanner1.next())); + assertNull(scanner0.next()); + assertNull(scanner1.next()); } protected void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index 42aad5c..37a7664 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -138,7 +138,9 @@ public class TestMemStoreChunkPool { memstore.add(new KeyValue(row, fam, qf5, val), null); assertEquals(2, memstore.getActive().getCellsCount()); // close the scanner - this is how the snapshot will be used - snapshot.getScanner().close(); + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); int chunkCount = chunkPool.getPoolSize(); @@ -182,7 +184,9 @@ public class TestMemStoreChunkPool { // Shouldn't putting back the chunks to pool,since some scanners are opening // based on their data // close the snapshot scanner - snapshot.getScanner().close(); + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() == 0); @@ -209,8 +213,10 @@ public class TestMemStoreChunkPool { } // Since no opening scanner, the chunks of snapshot should be put back to // pool - // close the snapshot scanner - snapshot.getScanner().close(); + // close the snapshot scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() > 0); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java index ea16edf..e60735d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java @@ -120,7 +120,7 @@ public class TestReversibleScanners { LOG.info("Setting read point to " + readPoint); scanners = StoreFileScanner.getScannersForStoreFiles( Collections.singletonList(sf), false, true, false, false, readPoint); - seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint); + seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint); } } @@ -135,7 +135,7 @@ public class TestReversibleScanners { for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { LOG.info("Setting read point to " + readPoint); scanners = memstore.getScanners(readPoint); - seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint); + seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint); } } @@ -561,38 +561,68 @@ public class TestReversibleScanners { } private void seekTestOfReversibleKeyValueScannerWithMVCC( - KeyValueScanner scanner, int readPoint) throws IOException { - /** - * Test with MVCC - */ - // Test seek to last row - KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan( - ROWSIZE - 1, 0, readPoint); - assertEquals(expectedKey != null, scanner.seekToLastRow()); - assertEquals(expectedKey, scanner.peek()); + List scanners, int readPoint) throws IOException { + /** + * Test with MVCC + */ + // Test seek to last row + KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan( + ROWSIZE - 1, 0, readPoint); + boolean res = false; + for (KeyValueScanner scanner : scanners) { + res |= scanner.seekToLastRow(); + } + assertEquals(expectedKey != null, res); + res = false; + for (KeyValueScanner scanner : scanners) { + res |= (expectedKey.equals(scanner.peek())); + } + assertTrue(res); // Test backward seek in two cases // Case1: seek in the same row in backwardSeek expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2, QUALSIZE - 2, readPoint); - assertEquals(expectedKey != null, scanner.backwardSeek(expectedKey)); - assertEquals(expectedKey, scanner.peek()); + res = false; + for (KeyValueScanner scanner : scanners) { + res |= scanner.backwardSeek(expectedKey); + } + assertEquals(expectedKey != null, res); + res = false; + for (KeyValueScanner scanner : scanners) { + res |= (expectedKey.equals(scanner.peek())); + } + assertTrue(res); // Case2: seek to the previous row in backwardSeek int seekRowNum = ROWSIZE - 3; KeyValue seekKey = KeyValueUtil.createLastOnRow(ROWS[seekRowNum]); expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0, readPoint); - assertEquals(expectedKey != null, scanner.backwardSeek(seekKey)); - assertEquals(expectedKey, scanner.peek()); + res = false; + for (KeyValueScanner scanner : scanners) { + res |= scanner.backwardSeek(expectedKey); + } + res = false; + for (KeyValueScanner scanner : scanners) { + res |= (expectedKey.equals(scanner.peek())); + } + assertTrue(res); // Test seek to previous row seekRowNum = ROWSIZE - 4; expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0, readPoint); - assertEquals(expectedKey != null, scanner.seekToPreviousRow(KeyValueUtil - .createFirstOnRow(ROWS[seekRowNum]))); - assertEquals(expectedKey, scanner.peek()); + res = false; + for (KeyValueScanner scanner : scanners) { + res |= scanner.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[seekRowNum])); + } + assertEquals(expectedKey != null, res); + res = false; + for (KeyValueScanner scanner : scanners) { + res |= (expectedKey.equals(scanner.peek())); + } + assertTrue(res); } private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java index 304717a..9f062c6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -45,10 +44,10 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.ScanInfo; @@ -209,7 +208,7 @@ public class TestCoprocessorScanPolicy { EnvironmentEdgeManager.reset(); } - public static class ScanObserver implements RegionObserver { + public static class ScanObserver extends BaseRegionObserver { private Map ttls = new HashMap(); private Map versions = @@ -240,7 +239,7 @@ public class TestCoprocessorScanPolicy { @Override public InternalScanner preFlushScannerOpen( final ObserverContext c, - Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { + Store store, List scanners, InternalScanner s) throws IOException { Long newTtl = ttls.get(store.getTableName()); if (newTtl != null) { System.out.println("PreFlush:" + newTtl); @@ -255,7 +254,7 @@ public class TestCoprocessorScanPolicy { oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); Scan scan = new Scan(); scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions); - return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), + return new StoreScanner(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); } -- 2.10.1 (Apple Git-78)