.../apache/hadoop/hbase/regionserver/HStore.java | 14 +++-- .../hadoop/hbase/regionserver/KeyValueScanner.java | 6 +++ .../hbase/regionserver/NonLazyKeyValueScanner.java | 5 ++ .../hbase/regionserver/ScanQueryMatcher.java | 5 +- .../apache/hadoop/hbase/regionserver/Store.java | 2 +- .../hadoop/hbase/regionserver/StoreFile.java | 5 +- .../hbase/regionserver/StoreFileScanner.java | 26 +++++++-- .../hadoop/hbase/regionserver/StoreFlusher.java | 1 + .../hadoop/hbase/regionserver/StoreScanner.java | 62 ++++++++++++++++------ .../hbase/regionserver/compactions/Compactor.java | 48 +++++++++++++++-- .../regionserver/compactions/DefaultCompactor.java | 8 +-- .../regionserver/compactions/StripeCompactor.java | 17 +++--- .../hbase/regionserver/TestFSErrorsExposed.java | 2 +- .../hbase/regionserver/TestQueryMatcher.java | 10 ++-- .../hbase/regionserver/TestReversibleScanners.java | 6 +-- 15 files changed, 162 insertions(+), 55 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 3cebaf6..7659c4f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1094,7 +1094,7 @@ public class HStore implements Store { * @return all scanners for this store */ @Override - public List getScanners(boolean cacheBlocks, boolean isGet, + public Pair, Boolean> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt) throws IOException { Collection storeFilesToScan; @@ -1103,6 +1103,7 @@ public class HStore implements Store { try { storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow); + // TODO : Implement hasTags for memstore - by avoiding tags length even during flush memStoreScanners = this.memstore.getScanners(readPt); } finally { this.lock.readLock().unlock(); @@ -1113,15 +1114,18 @@ public class HStore implements Store { // TODO this used to get the store files in descending order, // but now we get them in ascending order, which I think is // actually more correct, since memstore get put at the end. - List sfScanners = StoreFileScanner + Pair, Boolean> sfScanners = StoreFileScanner .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, matcher, readPt); List scanners = - new ArrayList(sfScanners.size()+1); - scanners.addAll(sfScanners); + new ArrayList(sfScanners.getFirst().size()+1); + scanners.addAll(sfScanners.getFirst()); // Then the memstore scanners scanners.addAll(memStoreScanners); - return scanners; + Pair, Boolean> res = new Pair, Boolean>(); + res.setFirst(scanners); + res.setSecond(sfScanners.getSecond()); + return res; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java index 76a9d0f..e001822 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java @@ -162,4 +162,10 @@ public interface KeyValueScanner { * if known, or null otherwise */ public Cell getNextIndexedKey(); + + /** + * Returns true if any of the HFile/memstore scanned has cell tags in it + * @return + */ + public boolean hasTag(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java index 957f417..dfc1a76 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java @@ -71,4 +71,9 @@ public abstract class NonLazyKeyValueScanner implements KeyValueScanner { public Cell getNextIndexedKey() { return null; } + + @Override + public boolean hasTag() { + return true; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java index 032b4ce..1f83d0a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java @@ -270,11 +270,12 @@ public class ScanQueryMatcher { * - got to the next row (MatchCode.DONE) * * @param cell KeyValue to check + * @param hasTag whether the matcher should check for tags in every cell * @return The match code instance. * @throws IOException in case there is an internal consistency problem * caused by a data corruption. */ - public MatchCode match(Cell cell) throws IOException { + public MatchCode match(Cell cell, boolean hasTag) throws IOException { if (filter != null && filter.filterAllRemaining()) { return MatchCode.DONE_SCAN; } @@ -316,7 +317,7 @@ public class ScanQueryMatcher { qualifierLength); } // check if the cell is expired by cell TTL - if (HStore.isCellTTLExpired(cell, this.oldestUnexpiredTS, this.now)) { + if (hasTag && HStore.isCellTTLExpired(cell, this.oldestUnexpiredTS, this.now)) { return MatchCode.SKIP; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index a77fc0e..938166c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -95,7 +95,7 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * @param readPt * @return all scanners for this store */ - List getScanners( + Pair, Boolean> getScanners( boolean cacheBlocks, boolean isGet, boolean usePread, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 8cb20cd..d399c38 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -1090,9 +1090,8 @@ public class StoreFile { public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt) { - return new StoreFileScanner(this, - getScanner(cacheBlocks, pread, isCompaction), - !isCompaction, reader.hasMVCCInfo(), readPt); + return new StoreFileScanner(this, getScanner(cacheBlocks, pread, isCompaction), + !isCompaction, reader.hasMVCCInfo(), readPt, reader.getFileContext().isIncludesTags()); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 22fd46e..7155d83 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.StoreFile.Reader; +import org.apache.hadoop.hbase.util.Pair; /** * KeyValueScanner adaptor over the Reader. It also provides hooks into @@ -66,25 +67,27 @@ public class StoreFileScanner implements KeyValueScanner { private ScanQueryMatcher matcher; private long readPt; + private boolean hasTag; /** * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} * @param hfs HFile scanner */ public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC, - boolean hasMVCC, long readPt) { + boolean hasMVCC, long readPt, boolean hasTag) { this.readPt = readPt; this.reader = reader; this.hfs = hfs; this.enforceMVCC = useMVCC; this.hasMVCCInfo = hasMVCC; + this.hasTag = hasTag; } /** * Return an array of scanners corresponding to the given * set of store files. */ - public static List getScannersForStoreFiles( + public static Pair, Boolean> getScannersForStoreFiles( Collection files, boolean cacheBlocks, boolean usePread, long readPt) throws IOException { @@ -95,7 +98,7 @@ public class StoreFileScanner implements KeyValueScanner { /** * Return an array of scanners corresponding to the given set of store files. */ - public static List getScannersForStoreFiles( + public static Pair, Boolean> getScannersForStoreFiles( Collection files, boolean cacheBlocks, boolean usePread, boolean isCompaction, long readPt) throws IOException { return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, @@ -107,19 +110,27 @@ public class StoreFileScanner implements KeyValueScanner { * And set the ScanQueryMatcher for each store file scanner for further * optimization */ - public static List getScannersForStoreFiles( + // Any better way to return the hasTag part here? + public static Pair, Boolean> getScannersForStoreFiles( Collection files, boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException { List scanners = new ArrayList( files.size()); + boolean hasTag = false; + Pair, Boolean> res = new Pair, Boolean>(); for (StoreFile file : files) { StoreFile.Reader r = file.createReader(); StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, isCompaction, readPt); scanner.setScanQueryMatcher(matcher); + if(!hasTag) { + hasTag = scanner.hasTag(); + } scanners.add(scanner); } - return scanners; + res.setFirst(scanners); + res.setSecond(hasTag); + return res; } public String toString() { @@ -488,4 +499,9 @@ public class StoreFileScanner implements KeyValueScanner { public Cell getNextIndexedKey() { return hfs.getNextIndexedKey(); } + + @Override + public boolean hasTag() { + return hasTag; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index bcc0a90..b9fd437 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -83,6 +83,7 @@ abstract class StoreFlusher { if (scanner == null) { Scan scan = new Scan(); scan.setMaxVersions(store.getScanInfo().getMaxVersions()); + // TODO : handle hasTags here, now we will pass true by default scanner = new StoreScanner(store, store.getScanInfo(), scan, Collections.singletonList(snapshotScanner), ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index f7e06ef..47e8cbf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; /** * Scanner scans both the memstore and the Store. Coalesce KeyValue stream @@ -61,6 +62,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner protected ScanQueryMatcher matcher; protected KeyValueHeap heap; protected boolean cacheBlocks; + // Better to have it on by default? + protected boolean hasTag = true; protected int countPerRow = 0; protected int storeLimit = -1; @@ -208,13 +211,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.store.addChangedReaderObserver(this); // Pass columns to try to filter out unnecessary StoreFiles. - List scanners = getScannersNoCompaction(); + Pair, Boolean> scanners = getScannersNoCompaction(); // Seek all scanners to the start of the Row (or if the exact matching row // key does not exist, then to the start of the next matching Row). // Always check bloom filter to optimize the top row seek for delete // family marker. - seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery + seekScanners(scanners.getFirst(), matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, isParallelSeekEnabled); // set storeLimit @@ -224,7 +227,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.storeOffset = scan.getRowOffsetPerColumnFamily(); // Combine all seeked scanners with a heap - resetKVHeap(scanners, store.getComparator()); + resetKVHeap(scanners.getFirst(), store.getComparator()); + this.hasTag = scanners.getSecond(); + LOG.debug("The hasTag for this scanner is "+this.hasTag); } /** @@ -238,9 +243,27 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner * versions */ public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, - List scanners, ScanType scanType, - long smallestReadPoint, long earliestPutTs) throws IOException { - this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null); + List scanners, ScanType scanType, long smallestReadPoint, + long earliestPutTs) throws IOException { + this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null, + true); + } + /** + * Used for compactions.

+ * + * Opens a scanner across specified StoreFiles. + * @param store who we scan + * @param scan the spec + * @param scanners ancillary scanners + * @param smallestReadPoint the readPoint that we should use for tracking + * versions + * @param hasTag Indicates if the scanners should consider tags in cells + */ + public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, + List scanners, ScanType scanType, long smallestReadPoint, + long earliestPutTs, boolean hasTag) throws IOException { + this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null, + hasTag); } /** @@ -253,17 +276,19 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner * @param smallestReadPoint the readPoint that we should use for tracking versions * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW. * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW. + * @param hasTag Indicates if the scanners should consider tags in cells */ public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, List scanners, long smallestReadPoint, long earliestPutTs, - byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { + byte[] dropDeletesFromRow, byte[] dropDeletesToRow, boolean hasTag) throws IOException { this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, - earliestPutTs, dropDeletesFromRow, dropDeletesToRow); + earliestPutTs, dropDeletesFromRow, dropDeletesToRow, hasTag); } private StoreScanner(Store store, ScanInfo scanInfo, Scan scan, List scanners, ScanType scanType, long smallestReadPoint, - long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { + long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow, boolean hasTag) + throws IOException { this(store, false, scan, null, scanInfo.getTtl(), scanInfo.getMinVersions(), ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED)); if (dropDeletesFromRow == null) { @@ -282,6 +307,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // Combine all seeked scanners with a heap resetKVHeap(scanners, store.getComparator()); + this.hasTag = hasTag; + LOG.debug("The hasTag for this compaction scanner is "+this.hasTag); } /** Constructor for testing. */ @@ -326,11 +353,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner * Get a filtered list of scanners. Assumes we are not in a compaction. * @return list of scanners to seek */ - protected List getScannersNoCompaction() throws IOException { + protected Pair, Boolean> getScannersNoCompaction() throws IOException { final boolean isCompaction = false; boolean usePread = isGet || scanUsePread; - return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread, - isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt)); + Pair, Boolean> scanners = store.getScanners(cacheBlocks, isGet, usePread, + isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt); + List res = selectScannersFrom(scanners.getFirst()); + return new Pair, Boolean>(res, scanners.getSecond()); } /** @@ -542,7 +571,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner checkScanOrder(prevCell, cell, comparator); prevCell = cell; - ScanQueryMatcher.MatchCode qcode = matcher.match(cell); + ScanQueryMatcher.MatchCode qcode = matcher.match(cell, hasTag); qcode = optimize(qcode, cell); switch(qcode) { case INCLUDE: @@ -747,13 +776,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner /* When we have the scan object, should we not pass it to getScanners() * to get a limited set of scanners? We did so in the constructor and we * could have done it now by storing the scan object from the constructor */ - List scanners = getScannersNoCompaction(); + Pair, Boolean> scanners = getScannersNoCompaction(); // Seek all scanners to the initial key - seekScanners(scanners, lastTopKey, false, isParallelSeekEnabled); + seekScanners(scanners.getFirst(), lastTopKey, false, isParallelSeekEnabled); // Combine all seeked scanners with a heap - resetKVHeap(scanners, store.getComparator()); + resetKVHeap(scanners.getFirst(), store.getComparator()); + hasTag = scanners.getSecond(); // Reset the state of the Query Matcher and set to top row. // Only reset and call setRow if the row changes; avoids confusing the diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index d1bb657..25337c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; /** @@ -183,7 +184,7 @@ public abstract class Compactor { * @param filesToCompact Files. * @return Scanners. */ - protected List createFileScanners( + protected Pair, Boolean> createFileScanners( final Collection filesToCompact, long smallestReadPoint) throws IOException { return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true, smallestReadPoint); @@ -315,14 +316,34 @@ public abstract class Compactor { * @param scanType Scan type. * @param smallestReadPoint Smallest MVCC read point. * @param earliestPutTs Earliest put across all files. + * @param hasTag Indicates if tags has to be considered by the scanner * @return A compaction scanner. */ protected InternalScanner createScanner(Store store, List scanners, - ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { + ScanType scanType, long smallestReadPoint, long earliestPutTs) + throws IOException { Scan scan = new Scan(); scan.setMaxVersions(store.getFamily().getMaxVersions()); return new StoreScanner(store, store.getScanInfo(), scan, scanners, - scanType, smallestReadPoint, earliestPutTs); + scanType, smallestReadPoint, earliestPutTs, true); + } + + /** + * @param store store + * @param scanners Store file scanners. + * @param scanType Scan type. + * @param smallestReadPoint Smallest MVCC read point. + * @param earliestPutTs Earliest put across all files. + * @param hasTag Indicates if tags has to be considered by the scanner + * @return A compaction scanner. + */ + protected InternalScanner createScanner(Store store, List scanners, + ScanType scanType, long smallestReadPoint, long earliestPutTs, boolean hasTag) + throws IOException { + Scan scan = new Scan(); + scan.setMaxVersions(store.getFamily().getMaxVersions()); + return new StoreScanner(store, store.getScanInfo(), scan, scanners, + scanType, smallestReadPoint, earliestPutTs, hasTag); } /** @@ -340,6 +361,25 @@ public abstract class Compactor { Scan scan = new Scan(); scan.setMaxVersions(store.getFamily().getMaxVersions()); return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint, - earliestPutTs, dropDeletesFromRow, dropDeletesToRow); + earliestPutTs, dropDeletesFromRow, dropDeletesToRow, true); + } + + /** + * @param store The store. + * @param scanners Store file scanners. + * @param smallestReadPoint Smallest MVCC read point. + * @param earliestPutTs Earliest put across all files. + * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null. + * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null. + * @param hasTag Indicates if tags has to be considered by the scanner + * @return A compaction scanner. + */ + protected InternalScanner createScanner(Store store, List scanners, + long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, + byte[] dropDeletesToRow, boolean hasTag) throws IOException { + Scan scan = new Scan(); + scan.setMaxVersions(store.getFamily().getMaxVersions()); + return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint, + earliestPutTs, dropDeletesFromRow, dropDeletesToRow, hasTag); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index 5d712c1..b425622 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.util.Pair; /** * Compact passed set of files. Create an instance and then call @@ -67,10 +68,10 @@ public class DefaultCompactor extends Compactor { for (StoreFile f : request.getFiles()) { readersToClose.add(new StoreFile(f)); } - scanners = createFileScanners(readersToClose, smallestReadPoint); + scanners = createFileScanners(readersToClose, smallestReadPoint).getFirst(); } else { readersToClose = Collections.emptyList(); - scanners = createFileScanners(request.getFiles(), smallestReadPoint); + scanners = createFileScanners(request.getFiles(), smallestReadPoint).getFirst(); } StoreFile.Writer writer = null; @@ -85,7 +86,8 @@ public class DefaultCompactor extends Compactor { request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES; scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners); if (scanner == null) { - scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs); + scanner = createScanner(store, scanners, scanType, smallestReadPoint, + fd.earliestPutTs, fd.maxTagsLength > 0); } scanner = postCreateCoprocScanner(request, scanType, scanner); if (scanner == null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index b957e16..190fc32 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; /** * This is the placeholder for stripe compactor. The implementation, @@ -89,7 +90,8 @@ public class StripeCompactor extends Compactor { this.progress = new CompactionProgress(fd.maxKeyCount); long smallestReadPoint = getSmallestReadPoint(); - List scanners = createFileScanners(filesToCompact, smallestReadPoint); + Pair, Boolean> scanners = createFileScanners(filesToCompact, + smallestReadPoint); boolean finished = false; InternalScanner scanner = null; @@ -97,13 +99,14 @@ public class StripeCompactor extends Compactor { try { // Get scanner to use. ScanType coprocScanType = ScanType.COMPACT_RETAIN_DELETES; - scanner = preCreateCoprocScanner(request, coprocScanType, fd.earliestPutTs, scanners); + scanner = preCreateCoprocScanner(request, coprocScanType, fd.earliestPutTs, scanners.getFirst()); if (scanner == null) { - scanner = (majorRangeFromRow == null) - ? createScanner(store, scanners, - ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, fd.earliestPutTs) - : createScanner(store, scanners, - smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow); + scanner = (majorRangeFromRow == null) + ? createScanner(store, scanners.getFirst(), + ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, fd.earliestPutTs, + scanners.getSecond()) + : createScanner(store, scanners.getFirst(), smallestReadPoint, + fd.earliestPutTs, majorRangeFromRow, majorRangeToRow, scanners.getSecond()); } scanner = postCreateCoprocScanner(request, coprocScanType, scanner); if (scanner == null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java index 0d3fa13..386ede3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java @@ -147,7 +147,7 @@ public class TestFSErrorsExposed { List scanners = StoreFileScanner.getScannersForStoreFiles( Collections.singletonList(sf), false, true, false, // 0 is passed as readpoint because this test operates on StoreFile directly - 0); + 0).getFirst(); KeyValueScanner scanner = scanners.get(0); FaultyInputStream inStream = faultyfs.inStreams.get(0).get(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java index 2df2f5a..5d4614a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java @@ -116,7 +116,7 @@ public class TestQueryMatcher extends HBaseTestCase { qm.setRow(k.getRowArray(), k.getRowOffset(), k.getRowLength()); for (KeyValue kv : memstore){ - actual.add(qm.match(kv)); + actual.add(qm.match(kv, true)); } assertEquals(expected.size(), actual.size()); @@ -181,7 +181,7 @@ public class TestQueryMatcher extends HBaseTestCase { qm.setRow(k.getRowArray(), k.getRowOffset(), k.getRowLength()); for(KeyValue kv : memstore) { - actual.add(qm.match(kv)); + actual.add(qm.match(kv, true)); } assertEquals(expected.size(), actual.size()); @@ -236,7 +236,7 @@ public class TestQueryMatcher extends HBaseTestCase { List actual = new ArrayList(kvs.length); for (KeyValue kv : kvs) { - actual.add( qm.match(kv) ); + actual.add( qm.match(kv, true)); } assertEquals(expected.length, actual.size()); @@ -291,7 +291,7 @@ public class TestQueryMatcher extends HBaseTestCase { List actual = new ArrayList(kvs.length); for (KeyValue kv : kvs) { - actual.add( qm.match(kv) ); + actual.add( qm.match(kv, true)); } assertEquals(expected.length, actual.size()); @@ -343,7 +343,7 @@ public class TestQueryMatcher extends HBaseTestCase { qm.setRow(row, 0, (short)row.length); prevRow = row; } - actual.add(qm.match(new KeyValue(row, fam2, null, now, Type.Delete))); + actual.add(qm.match(new KeyValue(row, fam2, null, now, Type.Delete), true)); } assertEquals(expected.length, actual.size()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java index 8052d9c..e9a316e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java @@ -106,13 +106,13 @@ public class TestReversibleScanners { List scanners = StoreFileScanner .getScannersForStoreFiles(Collections.singletonList(sf), false, true, - false, Long.MAX_VALUE); + false, Long.MAX_VALUE).getFirst(); StoreFileScanner scanner = scanners.get(0); seekTestOfReversibleKeyValueScanner(scanner); for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { LOG.info("Setting read point to " + readPoint); scanners = StoreFileScanner.getScannersForStoreFiles( - Collections.singletonList(sf), false, true, false, readPoint); + Collections.singletonList(sf), false, true, false, readPoint).getFirst(); seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint); } } @@ -493,7 +493,7 @@ public class TestReversibleScanners { throws IOException { List fileScanners = StoreFileScanner .getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true, - false, readPoint); + false, readPoint).getFirst(); List memScanners = memstore.getScanners(readPoint); List scanners = new ArrayList( fileScanners.size() + 1);