From 176a431800db694e8c8340e2b724f4e694938453 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 24 Apr 2017 16:40:27 +0800 Subject: [PATCH] HBASE-17917 Use pread by default for all user scan --- .../apache/hadoop/hbase/io/hfile/HFileBlock.java | 70 ++++----- .../apache/hadoop/hbase/regionserver/ScanInfo.java | 17 ++- .../hadoop/hbase/regionserver/StoreFile.java | 14 +- .../hbase/regionserver/StoreFileScanner.java | 36 ++--- .../hadoop/hbase/regionserver/StoreScanner.java | 159 +++++++++++++-------- .../hadoop/hbase/regionserver/MockStoreFile.java | 13 +- .../hbase/regionserver/TestCompactingMemStore.java | 2 +- .../hadoop/hbase/regionserver/TestCompaction.java | 6 +- .../regionserver/TestDefaultCompactSelection.java | 4 +- .../hbase/regionserver/TestDefaultMemStore.java | 7 +- .../hbase/regionserver/TestMajorCompaction.java | 6 +- .../hbase/regionserver/TestReversibleScanners.java | 5 +- .../hbase/regionserver/TestStoreScanner.java | 13 +- .../TestCompactionScanQueryMatcher.java | 4 +- .../querymatcher/TestUserScanQueryMatcher.java | 23 +-- .../hbase/util/TestCoprocessorScanPolicy.java | 21 ++- 16 files changed, 222 insertions(+), 178 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 066a9fa..abd260d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.io.hfile; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; @@ -24,8 +27,6 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -53,9 +54,6 @@ import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.io.IOUtils; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - /** * Reads {@link HFile} version 2 blocks to HFiles and via {@link Cacheable} Interface to caches. * Version 2 was introduced in hbase-0.92.0. No longer has support for version 1 blocks since @@ -1398,7 +1396,7 @@ public class HFileBlock implements Cacheable { static class FSReaderImpl implements FSReader { /** The file system stream of the underlying {@link HFile} that * does or doesn't do checksum validations in the filesystem */ - protected FSDataInputStreamWrapper streamWrapper; + private FSDataInputStreamWrapper streamWrapper; private HFileBlockDecodingContext encodedBlockDecodingCtx; @@ -1414,22 +1412,18 @@ public class HFileBlock implements Cacheable { private AtomicReference prefetchedHeader = new AtomicReference<>(new PrefetchedHeader()); /** The size of the file we are reading from, or -1 if unknown. */ - protected long fileSize; + private long fileSize; /** The size of the header */ + @VisibleForTesting protected final int hdrSize; /** The filesystem used to access data */ - protected HFileSystem hfs; - - private final Lock streamLock = new ReentrantLock(); + private HFileSystem hfs; - /** The default buffer size for our buffered streams */ - public static final int DEFAULT_BUFFER_SIZE = 1 << 20; - - protected HFileContext fileContext; + private HFileContext fileContext; // Cache the fileName - protected String pathName; + private String pathName; FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path, HFileContext fileContext) throws IOException { @@ -1504,39 +1498,33 @@ public class HFileBlock implements Cacheable { * next header * @throws IOException */ - protected int readAtOffset(FSDataInputStream istream, byte [] dest, int destOffset, int size, + @VisibleForTesting + protected int readAtOffset(FSDataInputStream istream, byte[] dest, int destOffset, int size, boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException { if (peekIntoNextBlock && destOffset + size + hdrSize > dest.length) { // We are asked to read the next block's header as well, but there is // not enough room in the array. - throw new IOException("Attempted to read " + size + " bytes and " + - hdrSize + " bytes of next header into a " + dest.length + - "-byte array at offset " + destOffset); + throw new IOException("Attempted to read " + size + " bytes and " + hdrSize + + " bytes of next header into a " + dest.length + "-byte array at offset " + destOffset); } - if (!pread && streamLock.tryLock()) { + if (!pread) { // Seek + read. Better for scanning. - try { - HFileUtil.seekOnMultipleSources(istream, fileOffset); - - long realOffset = istream.getPos(); - if (realOffset != fileOffset) { - throw new IOException("Tried to seek to " + fileOffset + " to " - + "read " + size + " bytes, but pos=" + realOffset - + " after seek"); - } + HFileUtil.seekOnMultipleSources(istream, fileOffset); + long realOffset = istream.getPos(); + if (realOffset != fileOffset) { + throw new IOException("Tried to seek to " + fileOffset + " to " + "read " + size + + " bytes, but pos=" + realOffset + " after seek"); + } - if (!peekIntoNextBlock) { - IOUtils.readFully(istream, dest, destOffset, size); - return -1; - } + if (!peekIntoNextBlock) { + IOUtils.readFully(istream, dest, destOffset, size); + return -1; + } - // Try to read the next block header. - if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) { - return -1; - } - } finally { - streamLock.unlock(); + // Try to read the next block header. + if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) { + return -1; } } else { // Positional read. Better for random reads; or when the streamLock is already locked. @@ -1545,7 +1533,6 @@ public class HFileBlock implements Cacheable { return -1; } } - assert peekIntoNextBlock; return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize; } @@ -1699,6 +1686,7 @@ public class HFileBlock implements Cacheable { * If HBase checksum is switched off, then use HDFS checksum. * @return the HFileBlock or null if there is a HBase checksum mismatch */ + @VisibleForTesting protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum) throws IOException { @@ -1810,7 +1798,7 @@ public class HFileBlock implements Cacheable { * If the block doesn't uses checksum, returns false. * @return True if checksum matches, else false. */ - protected boolean validateChecksum(long offset, ByteBuffer data, int hdrSize) + private boolean validateChecksum(long offset, ByteBuffer data, int hdrSize) throws IOException { // If this is an older version of the block that does not have checksums, then return false // indicating that checksum verification did not succeed. Actually, this method should never diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java index 349e166..2a66e55 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java @@ -48,6 +48,7 @@ public class ScanInfo { private boolean usePread; private long cellsPerTimeoutCheck; private boolean parallelSeekEnabled; + private final long preadMaxBytes; private final Configuration conf; public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT @@ -58,14 +59,14 @@ public class ScanInfo { * @param conf * @param family {@link HColumnDescriptor} describing the column family * @param ttl Store's TTL (in ms) - * @param timeToPurgeDeletes duration in ms after which a delete marker can - * be purged during a major compaction. + * @param timeToPurgeDeletes duration in ms after which a delete marker can be purged during a + * major compaction. * @param comparator The store's comparator */ public ScanInfo(final Configuration conf, final HColumnDescriptor family, final long ttl, final long timeToPurgeDeletes, final CellComparator comparator) { - this(conf, family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, family - .getKeepDeletedCells(), timeToPurgeDeletes, comparator); + this(conf, family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, + family.getKeepDeletedCells(), family.getBlocksize(), timeToPurgeDeletes, comparator); } /** @@ -74,6 +75,7 @@ public class ScanInfo { * @param minVersions Store's MIN_VERSIONS setting * @param maxVersions Store's VERSIONS setting * @param ttl Store's TTL (in ms) + * @param blockSize Store's block size * @param timeToPurgeDeletes duration in ms after which a delete marker can * be purged during a major compaction. * @param keepDeletedCells Store's keepDeletedCells setting @@ -81,7 +83,7 @@ public class ScanInfo { */ public ScanInfo(final Configuration conf, final byte[] family, final int minVersions, final int maxVersions, final long ttl, final KeepDeletedCells keepDeletedCells, - final long timeToPurgeDeletes, final CellComparator comparator) { + final long blockSize, final long timeToPurgeDeletes, final CellComparator comparator) { this.family = family; this.minVersions = minVersions; this.maxVersions = maxVersions; @@ -99,6 +101,7 @@ public class ScanInfo { perHeartbeat: StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK; this.parallelSeekEnabled = conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false); + this.preadMaxBytes = conf.getLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 4 * blockSize); this.conf = conf; } @@ -149,4 +152,8 @@ public class ScanInfo { public CellComparator getComparator() { return comparator; } + + long getPreadMaxBytes() { + return preadMaxBytes; + } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index c53fbf08..91ff97a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -589,11 +589,17 @@ public class StoreFile { return reader; } + public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder, + boolean canOptimizeForNonNullColumn) { + return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder, + canOptimizeForNonNullColumn); + } + public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks, - boolean pread, boolean isCompaction, long readPt, long scannerOrder, - boolean canOptimizeForNonNullColumn) throws IOException { - return createStreamReader(canUseDropBehind).getStoreFileScanner( - cacheBlocks, pread, isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn); + boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) + throws IOException { + return createStreamReader(canUseDropBehind).getStoreFileScanner(cacheBlocks, false, + isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index aa4f897..b727144 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -95,25 +95,21 @@ public class StoreFileScanner implements KeyValueScanner { } /** - * Return an array of scanners corresponding to the given - * set of store files. + * Return an array of scanners corresponding to the given set of store files. */ - public static List getScannersForStoreFiles( - Collection files, - boolean cacheBlocks, - boolean usePread, long readPt) throws IOException { - return getScannersForStoreFiles(files, cacheBlocks, - usePread, false, false, readPt); + public static List getScannersForStoreFiles(Collection files, + boolean cacheBlocks, boolean usePread, long readPt) throws IOException { + return getScannersForStoreFiles(files, cacheBlocks, usePread, false, false, readPt); } /** * Return an array of scanners corresponding to the given set of store files. */ - public static List getScannersForStoreFiles( - Collection files, boolean cacheBlocks, boolean usePread, - boolean isCompaction, boolean useDropBehind, long readPt) throws IOException { - return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, - useDropBehind, null, readPt); + public static List getScannersForStoreFiles(Collection files, + boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean useDropBehind, + long readPt) throws IOException { + return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, useDropBehind, null, + readPt); } /** @@ -126,11 +122,17 @@ public class StoreFileScanner implements KeyValueScanner { List scanners = new ArrayList<>(files.size()); List sortedFiles = new ArrayList<>(files); Collections.sort(sortedFiles, StoreFile.Comparators.SEQ_ID); + boolean canOptimizeForNonNullColumn = matcher != null ? !matcher.hasNullColumnInQuery() : false; for (int i = 0, n = sortedFiles.size(); i < n; i++) { StoreFile sf = sortedFiles.get(i); sf.initReader(); - StoreFileScanner scanner = sf.getReader().getStoreFileScanner(cacheBlocks, usePread, - isCompaction, readPt, i, matcher != null ? !matcher.hasNullColumnInQuery() : false); + StoreFileScanner scanner; + if (usePread) { + scanner = sf.getPreadScanner(cacheBlocks, readPt, i, canOptimizeForNonNullColumn); + } else { + scanner = sf.getStreamScanner(canUseDrop, cacheBlocks, isCompaction, readPt, i, + canOptimizeForNonNullColumn); + } scanners.add(scanner); } return scanners; @@ -148,8 +150,8 @@ public class StoreFileScanner implements KeyValueScanner { boolean succ = false; try { for (int i = 0, n = sortedFiles.size(); i < n; i++) { - scanners.add(sortedFiles.get(i).getStreamScanner(canUseDropBehind, false, false, true, - readPt, i, false)); + scanners.add( + sortedFiles.get(i).getStreamScanner(canUseDropBehind, false, true, readPt, i, false)); } succ = true; } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 3bc6a0f..8173168 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -63,36 +63,35 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private static final Log LOG = LogFactory.getLog(StoreScanner.class); // In unit tests, the store could be null protected final Store store; - protected ScanQueryMatcher matcher; + private ScanQueryMatcher matcher; protected KeyValueHeap heap; - protected boolean cacheBlocks; + private boolean cacheBlocks; - protected long countPerRow = 0; - protected int storeLimit = -1; - protected int storeOffset = 0; + private long countPerRow = 0; + private int storeLimit = -1; + private int storeOffset = 0; // Used to indicate that the scanner has closed (see HBASE-1107) - // Doesnt need to be volatile because it's always accessed via synchronized methods - protected boolean closing = false; - protected final boolean get; - protected final boolean explicitColumnQuery; - protected final boolean useRowColBloom; + // Do not need to be volatile because it's always accessed via synchronized methods + private boolean closing = false; + private final boolean get; + private final boolean explicitColumnQuery; + private final boolean useRowColBloom; /** * A flag that enables StoreFileScanner parallel-seeking */ - protected boolean parallelSeekEnabled = false; - protected ExecutorService executor; - protected final Scan scan; - protected final NavigableSet columns; - protected final long oldestUnexpiredTS; - protected final long now; - protected final int minVersions; - protected final long maxRowSize; - protected final long cellsPerHeartbeatCheck; + private boolean parallelSeekEnabled = false; + private ExecutorService executor; + private final Scan scan; + private final long oldestUnexpiredTS; + private final long now; + private final int minVersions; + private final long maxRowSize; + private final long cellsPerHeartbeatCheck; // Collects all the KVHeap that are eagerly getting closed during the // course of a scan - protected List heapsForDelayedClose = new ArrayList<>(); + private final List heapsForDelayedClose = new ArrayList<>(); /** * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not @@ -101,14 +100,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private long kvsScanned = 0; private Cell prevCell = null; + private final long preadMaxBytes; + private long bytesRead; + /** We don't ever expect to change this, the constant is just for clarity. */ static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true; public static final String STORESCANNER_PARALLEL_SEEK_ENABLE = "hbase.storescanner.parallel.seek.enable"; /** Used during unit testing to ensure that lazy seek does save seek ops */ - protected static boolean lazySeekEnabledGlobally = - LAZY_SEEK_ENABLED_BY_DEFAULT; + private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT; /** * The number of cells scanned in between timeout checks. Specifying a larger value means that @@ -123,19 +124,29 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner */ public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000; + /** + * If the read type if Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned + * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of + * block size for this store. + */ + public static final String STORESCANNER_PREAD_MAX_BYTES = "hbase.storescanner.pread.max.bytes"; + // if heap == null and lastTop != null, you need to reseek given the key below - protected Cell lastTop = null; + private Cell lastTop = null; + + private final Scan.ReadType readType; // A flag whether use pread for scan - private final boolean scanUsePread; + // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data. + private boolean scanUsePread; // Indicates whether there was flush during the course of the scan - protected volatile boolean flushed = false; + private volatile boolean flushed = false; // generally we get one file from a flush - protected List flushedStoreFiles = new ArrayList<>(1); + private final List flushedStoreFiles = new ArrayList<>(1); // The current list of scanners - protected List currentScanners = new ArrayList<>(); + private final List currentScanners = new ArrayList<>(); // flush update lock - private ReentrantLock flushLock = new ReentrantLock(); + private final ReentrantLock flushLock = new ReentrantLock(); protected final long readPt; @@ -156,7 +167,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner int numCol = columns == null ? 0 : columns.size(); explicitColumnQuery = numCol > 0; this.scan = scan; - this.columns = columns; this.now = EnvironmentEdgeManager.currentTime(); this.oldestUnexpiredTS = now - scanInfo.getTtl(); this.minVersions = scanInfo.getMinVersions(); @@ -169,32 +179,31 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.maxRowSize = scanInfo.getTableMaxRowSize(); if (get) { + this.readType = Scan.ReadType.PREAD; this.scanUsePread = true; } else { - switch (scan.getReadType()) { - case STREAM: - this.scanUsePread = false; - break; - case PREAD: - this.scanUsePread = true; - break; - default: - this.scanUsePread = scanInfo.isUsePread(); - break; + if (scan.getReadType() == Scan.ReadType.DEFAULT) { + this.readType = scanInfo.isUsePread() ? Scan.ReadType.PREAD : Scan.ReadType.DEFAULT; + } else { + this.readType = scan.getReadType(); + } + // Always start with pread unless user specific stream. Will change to stream later if + // readType is default if the scan keeps running for a long time. + this.scanUsePread = this.readType != Scan.ReadType.STREAM; + } + this.preadMaxBytes = scanInfo.getPreadMaxBytes(); + this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck(); + // Parallel seeking is on if the config allows and more there is more than one store file. + if (this.store != null && this.store.getStorefilesCount() > 1) { + RegionServerServices rsService = ((HStore) store).getHRegion().getRegionServerServices(); + if (rsService != null && scanInfo.isParallelSeekEnabled()) { + this.parallelSeekEnabled = true; + this.executor = rsService.getExecutorService(); } } - this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck(); - // Parallel seeking is on if the config allows and more there is more than one store file. - if (this.store != null && this.store.getStorefilesCount() > 1) { - RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices(); - if (rsService != null && scanInfo.isParallelSeekEnabled()) { - this.parallelSeekEnabled = true; - this.executor = rsService.getExecutorService(); - } - } } - protected void addCurrentScanners(List scanners) { + private void addCurrentScanners(List scanners) { this.currentScanners.addAll(scanners); } @@ -361,7 +370,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner * Get a filtered list of scanners. Assumes we are not in a compaction. * @return list of scanners to seek */ - protected List getScannersNoCompaction() throws IOException { + private List getScannersNoCompaction() throws IOException { return selectScannersFrom(store.getScanners(cacheBlocks, get, scanUsePread, false, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt)); } @@ -414,7 +423,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner /** * Filters the given list of scanners using Bloom filter, time range, and * TTL. + *

+ * Will be overridden by testcase so declared as protected. */ + @VisibleForTesting protected List selectScannersFrom( final List allScanners) { boolean memOnly; @@ -521,7 +533,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner throw new IllegalArgumentException("Scanner context cannot be null"); } boolean flushed = checkFlushed(); - if (checkReseek(flushed)) { + boolean switchToStream = switchToStreamRead(); + if (checkReseek(flushed || switchToStream)) { return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); } @@ -604,6 +617,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner count++; int cellSize = CellUtil.estimatedSerializedSizeOf(cell); totalBytesRead += cellSize; + bytesRead += cellSize; // Update the progress of the scanner context scannerContext.incrementSizeProgress(cellSize, CellUtil.estimatedHeapSizeOf(cell)); @@ -638,7 +652,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner case DONE: // Optimization for Gets! If DONE, no more to get on this row, early exit! - if (this.scan.isGetScan()) { + if (get) { // Then no more to this row... exit. close(false);// Do all cleanup except heap.close() return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); @@ -764,7 +778,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // rows beyond the current one: don't try to optimize. We are DONE. Return the *_NEXT_ROW // qcode as is. When the caller gets these flags on a Get Scan, it knows it can shut down the // Scan. - if (!this.scan.isGetScan()) { + if (!get) { Cell nextIndexedKey = getNextIndexedKey(); if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY && matcher.compareKeyForNextRow(nextIndexedKey, cell) > 0) { @@ -793,18 +807,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } /** - * @param flushed indicates if there was a flush - * @return true if top of heap has changed (and KeyValueHeap has to try the - * next KV) + * @param reopen indicate that we need to reopen the scanner due to flush or switch from pread to + * stream + * @return true if top of heap has changed (and KeyValueHeap has to try the next KV) * @throws IOException */ - protected boolean checkReseek(boolean flushed) throws IOException { + protected final boolean checkReseek(boolean reopen) throws IOException { if (flushed && this.lastTop != null) { resetScannerStack(this.lastTop); - if (this.heap.peek() == null - || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) { - LOG.debug("Storescanner.peek() is changed where before = " - + this.lastTop.toString() + ",and after = " + this.heap.peek()); + if (this.heap.peek() == null || + store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) { + LOG.debug("Storescanner.peek() is changed where before = " + this.lastTop.toString() + + ",and after = " + this.heap.peek()); this.lastTop = null; return true; } @@ -814,7 +828,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return false; } - protected void resetScannerStack(Cell lastTopKey) throws IOException { + private void resetScannerStack(Cell lastTopKey) throws IOException { // When we have the scan object, should we not pass it to getScanners() to get a limited set of // scanners? We did so in the constructor and we could have done it now by storing the scan // object from the constructor @@ -899,7 +913,26 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return heap.reseek(kv); } - protected boolean checkFlushed() { + private boolean switchToStreamRead() { + if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing) { + return false; + } + if (bytesRead > preadMaxBytes) { + if (LOG.isDebugEnabled()) { + LOG.debug("Switch to stream read because we have already read " + bytesRead + + " bytes from this scanner"); + } + // we may already set this value when calling checkFlushed so check it first. + if (this.lastTop == null) { + this.lastTop = peek(); + } + scanUsePread = false; + return true; + } + return false; + } + + protected final boolean checkFlushed() { // check the var without any lock. Suppose even if we see the old // value here still it is ok to continue because we will not be resetting // the heap but will continue with the referenced memstore's snapshot. For compactions diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java index d52c6c7..91b85d3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java @@ -145,10 +145,17 @@ public class MockStoreFile extends StoreFile { } @Override + public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder, + boolean canOptimizeForNonNullColumn) { + return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder, + canOptimizeForNonNullColumn); + } + + @Override public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks, - boolean pread, boolean isCompaction, long readPt, long scannerOrder, - boolean canOptimizeForNonNullColumn) throws IOException { - return getReader().getStoreFileScanner(cacheBlocks, pread, isCompaction, readPt, scannerOrder, + boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) + throws IOException { + return getReader().getStoreFileScanner(cacheBlocks, false, isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 9e90f3e..04435db 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -192,7 +192,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { Configuration conf = HBaseConfiguration.create(); for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) { ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, - KeepDeletedCells.FALSE, 0, this.memstore.getComparator()); + KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator()); ScanType scanType = ScanType.USER_SCAN; InternalScanner scanner = new StoreScanner(new Scan( Bytes.toBytes(startRowId)), scanInfo, scanType, null, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 1bf6ea7..5f4c0aa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -212,9 +212,9 @@ public class TestCompaction { for (Store hstore: this.r.stores.values()) { HStore store = (HStore)hstore; ScanInfo old = store.getScanInfo(); - ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), - old.getMinVersions(), old.getMaxVersions(), ttl, - old.getKeepDeletedCells(), 0, old.getComparator()); + ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), old.getMinVersions(), + old.getMaxVersions(), ttl, old.getKeepDeletedCells(), HConstants.DEFAULT_BLOCKSIZE, 0, + old.getComparator()); store.setScanInfo(si); } Thread.sleep(ttl); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java index 3c41fc5..584285b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java @@ -160,8 +160,8 @@ public class TestDefaultCompactSelection extends TestCompactionPolicy { ScanInfo oldScanInfo = store.getScanInfo(); ScanInfo newScanInfo = new ScanInfo(oldScanInfo.getConfiguration(), oldScanInfo.getFamily(), oldScanInfo.getMinVersions(), oldScanInfo.getMaxVersions(), 600, - oldScanInfo.getKeepDeletedCells(), oldScanInfo.getTimeToPurgeDeletes(), - oldScanInfo.getComparator()); + oldScanInfo.getKeepDeletedCells(), oldScanInfo.getPreadMaxBytes(), + oldScanInfo.getTimeToPurgeDeletes(), oldScanInfo.getComparator()); store.setScanInfo(newScanInfo); // Do not compact empty store file List candidates = sfCreate(0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 41b304b..e0bfa26 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -162,9 +162,8 @@ public class TestDefaultMemStore { Scan scan = new Scan(); List result = new ArrayList<>(); Configuration conf = HBaseConfiguration.create(); - ScanInfo scanInfo = - new ScanInfo(conf, null, 0, 1, HConstants.LATEST_TIMESTAMP, KeepDeletedCells.FALSE, 0, - this.memstore.getComparator()); + ScanInfo scanInfo = new ScanInfo(conf, null, 0, 1, HConstants.LATEST_TIMESTAMP, + KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator()); ScanType scanType = ScanType.USER_SCAN; StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); int count = 0; @@ -602,7 +601,7 @@ public class TestDefaultMemStore { Configuration conf = HBaseConfiguration.create(); for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) { ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, - KeepDeletedCells.FALSE, 0, this.memstore.getComparator()); + KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator()); ScanType scanType = ScanType.USER_SCAN; try (InternalScanner scanner = new StoreScanner(new Scan( Bytes.toBytes(startRowId)), scanInfo, scanType, null, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java index 9d00d38..0b35f95 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java @@ -292,9 +292,9 @@ public class TestMajorCompaction { for (Store hstore : r.getStores()) { HStore store = ((HStore) hstore); ScanInfo old = store.getScanInfo(); - ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), - old.getMinVersions(), old.getMaxVersions(), ttl, - old.getKeepDeletedCells(), 0, old.getComparator()); + ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), old.getMinVersions(), + old.getMaxVersions(), ttl, old.getKeepDeletedCells(), old.getPreadMaxBytes(), 0, + old.getComparator()); store.setScanInfo(si); } Thread.sleep(1000); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java index c1fd6a3..2dfdf5b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java @@ -264,8 +264,9 @@ public class TestReversibleScanners { BloomType.NONE, true); ScanType scanType = ScanType.USER_SCAN; - ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE, - Long.MAX_VALUE, KeepDeletedCells.FALSE, 0, CellComparator.COMPARATOR); + ScanInfo scanInfo = + new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE, Long.MAX_VALUE, + KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR); // Case 1.Test a full reversed scan Scan scan = new Scan(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index ccbf067..c5428c5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -67,8 +67,8 @@ public class TestStoreScanner { private static final String CF_STR = "cf"; private static final byte [] CF = Bytes.toBytes(CF_STR); static Configuration CONF = HBaseConfiguration.create(); - private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, - Long.MAX_VALUE, KeepDeletedCells.FALSE, 0, CellComparator.COMPARATOR); + private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE, + KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR); private ScanType scanType = ScanType.USER_SCAN; /** @@ -688,8 +688,8 @@ public class TestStoreScanner { List scanners = scanFixture(kvs); Scan scan = new Scan(); scan.setMaxVersions(1); - ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE, 0, - CellComparator.COMPARATOR); + ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE, + HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR); ScanType scanType = ScanType.USER_SCAN; try (StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners)) { List results = new ArrayList<>(); @@ -761,8 +761,8 @@ public class TestStoreScanner { Scan scan = new Scan(); scan.setMaxVersions(1); // scanner with ttl equal to 500 - ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE, 0, - CellComparator.COMPARATOR); + ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE, + HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR); ScanType scanType = ScanType.USER_SCAN; try (StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners)) { @@ -827,6 +827,7 @@ public class TestStoreScanner { 0 /* minVersions */, 2 /* maxVersions */, 500 /* ttl */, KeepDeletedCells.FALSE /* keepDeletedCells */, + HConstants.DEFAULT_BLOCKSIZE /* block size */, 200, /* timeToPurgeDeletes */ CellComparator.COMPARATOR); try (StoreScanner scanner = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java index af8c27d..73c92e4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java @@ -73,8 +73,8 @@ public class TestCompactionScanQueryMatcher extends AbstractTestScanQueryMatcher throws IOException { long now = EnvironmentEdgeManager.currentTime(); // Set time to purge deletes to negative value to avoid it ever happening. - ScanInfo scanInfo = new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, -1L, - rowComparator); + ScanInfo scanInfo = new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, + HConstants.DEFAULT_BLOCKSIZE, -1L, rowComparator); CompactionScanQueryMatcher qm = CompactionScanQueryMatcher.create(scanInfo, ScanType.COMPACT_RETAIN_DELETES, Long.MAX_VALUE, HConstants.OLDEST_TIMESTAMP, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java index b4e4311..f3cf604 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java @@ -54,7 +54,8 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher { long now = EnvironmentEdgeManager.currentTime(); // Do with fam2 which has a col2 qualifier. UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan, - new ScanInfo(this.conf, fam2, 10, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), + new ScanInfo(this.conf, fam2, 10, 1, ttl, KeepDeletedCells.FALSE, + HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator), get.getFamilyMap().get(fam2), now - ttl, now, null); Cell kv = new KeyValue(row1, fam2, col2, 1, data); Cell cell = CellUtil.createLastOnRowCol(kv); @@ -79,8 +80,9 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher { long now = EnvironmentEdgeManager.currentTime(); // 2,4,5 - UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan, - new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), + UserScanQueryMatcher qm = UserScanQueryMatcher.create( + scan, new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, + HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator), get.getFamilyMap().get(fam2), now - ttl, now, null); List memstore = new ArrayList<>(6); @@ -122,9 +124,9 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher { expected.add(ScanQueryMatcher.MatchCode.DONE); long now = EnvironmentEdgeManager.currentTime(); - UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan, - new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), null, - now - ttl, now, null); + UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan, new ScanInfo(this.conf, fam2, 0, 1, + ttl, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator), + null, now - ttl, now, null); List memstore = new ArrayList<>(6); memstore.add(new KeyValue(row1, fam2, col1, 1, data)); @@ -168,7 +170,8 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher { long now = EnvironmentEdgeManager.currentTime(); UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan, - new ScanInfo(this.conf, fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), + new ScanInfo(this.conf, fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, + HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator), get.getFamilyMap().get(fam2), now - testTTL, now, null); KeyValue[] kvs = new KeyValue[] { new KeyValue(row1, fam2, col1, now - 100, data), @@ -209,9 +212,9 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher { ScanQueryMatcher.MatchCode.DONE }; long now = EnvironmentEdgeManager.currentTime(); - UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan, - new ScanInfo(this.conf, fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), null, - now - testTTL, now, null); + UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan, new ScanInfo(this.conf, fam2, 0, 1, + testTTL, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator), + null, now - testTTL, now, null); KeyValue[] kvs = new KeyValue[] { new KeyValue(row1, fam2, col1, now - 100, data), new KeyValue(row1, fam2, col2, now - 50, data), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java index 27e93a0..720ad29 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java @@ -245,11 +245,10 @@ public class TestCoprocessorScanPolicy { Integer newVersions = versions.get(store.getTableName()); ScanInfo oldSI = store.getScanInfo(); HColumnDescriptor family = store.getFamily(); - ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), - family.getName(), family.getMinVersions(), - newVersions == null ? family.getMaxVersions() : newVersions, + ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(), + family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions, newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), - oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); + family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); Scan scan = new Scan(); scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions); return new StoreScanner(store, scanInfo, scan, scanners, @@ -266,11 +265,10 @@ public class TestCoprocessorScanPolicy { Integer newVersions = versions.get(store.getTableName()); ScanInfo oldSI = store.getScanInfo(); HColumnDescriptor family = store.getFamily(); - ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), - family.getName(), family.getMinVersions(), - newVersions == null ? family.getMaxVersions() : newVersions, + ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(), + family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions, newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), - oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); + family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); Scan scan = new Scan(); scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions); return new StoreScanner(store, scanInfo, scan, scanners, scanType, @@ -287,11 +285,10 @@ public class TestCoprocessorScanPolicy { Integer newVersions = versions.get(store.getTableName()); ScanInfo oldSI = store.getScanInfo(); HColumnDescriptor family = store.getFamily(); - ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), - family.getName(), family.getMinVersions(), - newVersions == null ? family.getMaxVersions() : newVersions, + ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(), + family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions, newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), - oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); + family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); return new StoreScanner(store, scanInfo, scan, targetCols, readPt); } else { return s; -- 2.7.4