Index: src/main/java/org/apache/hadoop/hbase/KeyValue.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/KeyValue.java (revision 1427046) +++ src/main/java/org/apache/hadoop/hbase/KeyValue.java (working copy) @@ -1751,6 +1751,19 @@ } /** + * Create a KeyValue that is smaller than all other possible KeyValues + * for the given row. That is any (valid) KeyValue on 'row' would sort + * _after_ the result. + * + * @param row - row key (arbitrary byte array) + * @return First possible KeyValue on passed row + */ + public static KeyValue createFirstOnRow(final byte [] row, int roffset, short rlength) { + return new KeyValue(row, roffset, rlength, + null, 0, 0, null, 0, 0, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0); + } + + /** * Creates a KeyValue that is smaller than all other KeyValues that * are older than the passed timestamp. * @param row - row key (arbitrary byte array) Index: src/main/java/org/apache/hadoop/hbase/client/Scan.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Scan.java (revision 1427046) +++ src/main/java/org/apache/hadoop/hbase/client/Scan.java (working copy) @@ -82,6 +82,7 @@ */ public class Scan extends OperationWithAttributes implements Writable { private static final String RAW_ATTR = "_raw_"; + private static final String ONDEMAND_ATTR = "_ondemand_"; private static final String ISOLATION_LEVEL = "_isolationlevel_"; private static final byte SCAN_VERSION = (byte)2; @@ -460,6 +461,34 @@ } /** + * Set the value indicating whether loading CFs on demand should be allowed (cluster + * default is false). On-demand CF loading doesn't load column families until necessary, e.g. + * if you filter on one column, the other column family data will be loaded only for the rows + * that are included in result, not all rows like in normal case. + * With column-specific filters, like SingleColumnValueFilter w/filterIfMissing == true, + * this can deliver huge perf gains when there's a cf with lots of data; however, it can + * also lead to some inconsistent results, as follows: + * - if someone does a concurrent update to both column families in question you may get a row + * that never existed, e.g. for { rowKey = 5, { cat_videos => 1 }, { video => "my cat" } } + * someone puts rowKey 5 with { cat_videos => 0 }, { video => "my dog" }, concurrent scan + * filtering on "cat_videos == 1" can get { rowKey = 5, { cat_videos => 1 }, + * { video => "my dog" } }. + * - if there's a concurrent split and you have more than 2 column families, some rows may be + * missing some column families. + */ + public void setLoadColumnFamiliesOnDemand(boolean value) { + setAttribute(ONDEMAND_ATTR, Bytes.toBytes(value)); + } + + /** + * Get the logical value indicating whether on-demand CF loading should be allowed. + */ + public boolean doLoadColumnFamiliesOnDemand() { + byte[] attr = getAttribute(ONDEMAND_ATTR); + return attr == null ? false : Bytes.toBoolean(attr); + } + + /** * Compile the table and column family (i.e. schema) information * into a String. Useful for parsing and aggregation by debugging, * logging, and administration tools. @@ -488,7 +517,7 @@ * Useful for debugging, logging, and administration tools. * @param maxCols a limit on the number of columns output prior to truncation * @return Map - */ + */ @Override public Map toMap(int maxCols) { // start with the fingerpring map and build on top of it Index: src/main/java/org/apache/hadoop/hbase/filter/Filter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/filter/Filter.java (revision 1427046) +++ src/main/java/org/apache/hadoop/hbase/filter/Filter.java (working copy) @@ -167,4 +167,12 @@ * not sure which key to seek to next. */ public KeyValue getNextKeyHint(final KeyValue currentKV); + + /** + * Check that given column family is essential for filter to check row. Most + * filters always return true here. But some could have more sophisticated + * logic which could significantly reduce scanning process by not even + * touching columns until we are 100% sure that it's data is needed in result. + */ + public boolean isFamilyEssential(byte[] name); } Index: src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java (revision 1427046) +++ src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java (working copy) @@ -131,6 +131,16 @@ } /** + * By default, we require all scan's column families to be present. Our + * subclasses may be more precise. + * + * @inheritDoc + */ + public boolean isFamilyEssential(byte[] name) { + return true; + } + + /** * Given the filter's arguments it constructs the filter *

* @param filterArguments the filter's arguments Index: src/main/java/org/apache/hadoop/hbase/filter/FilterList.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/filter/FilterList.java (revision 1427046) +++ src/main/java/org/apache/hadoop/hbase/filter/FilterList.java (working copy) @@ -318,6 +318,16 @@ } @Override + public boolean isFamilyEssential(byte[] name) { + for (Filter filter : filters) { + if (filter.isFamilyEssential(name)) { + return true; + } + } + return false; + } + + @Override public String toString() { return toString(MAX_LOG_FILTERS); } Index: src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java (revision 1427046) +++ src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java (working copy) @@ -24,6 +24,8 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import java.util.ArrayList; +import java.util.List; +import java.util.Iterator; /** * A {@link Filter} that checks a single column value, but does not emit the @@ -76,16 +78,22 @@ super(family, qualifier, compareOp, comparator); } - public ReturnCode filterKeyValue(KeyValue keyValue) { - ReturnCode superRetCode = super.filterKeyValue(keyValue); - if (superRetCode == ReturnCode.INCLUDE) { + // We cleaned result row in FilterRow to be consistent with scanning process. + public boolean hasFilterRow() { + return true; + } + + // Here we remove from row all key values from testing column + public void filterRow(List kvs) { + Iterator it = kvs.iterator(); + while (it.hasNext()) { + KeyValue kv = (KeyValue)it.next(); // If the current column is actually the tested column, // we will skip it instead. - if (keyValue.matchingColumn(this.columnFamily, this.columnQualifier)) { - return ReturnCode.SKIP; + if (kv.matchingColumn(this.columnFamily, this.columnQualifier)) { + it.remove(); } } - return superRetCode; } public static Filter createFilterFromArguments(ArrayList filterArguments) { Index: src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java (revision 1427046) +++ src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java (working copy) @@ -307,6 +307,15 @@ out.writeBoolean(latestVersionOnly); } + /** + * The only CF this filter needs is given column family. So, it's the only essential + * column in whole scan. If filterIfMissing == false, all families are essential, + * because of possibility of skipping the rows without any data in filtered CF. + */ + public boolean isFamilyEssential(byte[] name) { + return !this.filterIfMissing || Bytes.equals(name, this.columnFamily); + } + @Override public String toString() { return String.format("%s (%s, %s, %s, %s)", Index: src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java (revision 1427046) +++ src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java (working copy) @@ -104,6 +104,10 @@ } } + public boolean isFamilyEssential(byte[] name) { + return filter.isFamilyEssential(name); + } + @Override public String toString() { return this.getClass().getSimpleName() + " " + this.filter.toString(); Index: src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java (revision 1427046) +++ src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java (working copy) @@ -105,6 +105,10 @@ } } + public boolean isFamilyEssential(byte[] name) { + return filter.isFamilyEssential(name); + } + @Override public String toString() { return this.getClass().getSimpleName() + " " + this.filter.toString(); Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1427046) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -178,6 +178,8 @@ public static final Log LOG = LogFactory.getLog(HRegion.class); private static final String MERGEDIR = ".merges"; + public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY = "hbase.hregion.scan.loadColumnFamiliesOnDemand"; + final AtomicBoolean closed = new AtomicBoolean(false); /* Closing can take some time; use the closing flag if there is stuff we don't * want to do while in closing state; e.g. like offer this region up to the @@ -263,8 +265,13 @@ KeyValue.KVComparator comparator; private ConcurrentHashMap scannerReadPoints; + /** + * The default setting for whether to enable on-demand CF loading for + * scan requests to this region. Requests can override it. + */ + private boolean isLoadingCfsOnDemandDefault = false; - /* + /** * @return The smallest mvcc readPoint across all the scanners in this * region. Writes older than this readPoint, are included in every * read operation. @@ -416,6 +423,8 @@ this.conf = conf; this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION); + + this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, false); this.regionInfo = regionInfo; this.htableDescriptor = htd; this.rsServices = rsServices; @@ -857,6 +866,10 @@ return mvcc; } + public boolean isLoadingCfsOnDemandDefault() { + return this.isLoadingCfsOnDemandDefault; + } + /** * Close down this HRegion. Flush the cache, shut down each HStore, don't * service any more calls. @@ -3467,6 +3480,15 @@ class RegionScannerImpl implements RegionScanner { // Package local for testability KeyValueHeap storeHeap = null; + /** Heap of key-values that are not essential for the provided filters and are thus read + * on demand, if on-demand column family loading is enabled.*/ + KeyValueHeap joinedHeap = null; + /** + * If the joined heap data gathering is interrupted due to scan limits, this will + * contain the row for which we are populating the values.*/ + private KeyValue joinedContinuationRow = null; + // KeyValue indicating that limit is reached when scanning + private final KeyValue KV_LIMIT = new KeyValue(); private final byte [] stopRow; private Filter filter; private List results = new ArrayList(); @@ -3506,7 +3528,10 @@ scannerReadPoints.put(this, this.readPt); } + // Here we separate all scanners into two lists - scanner that provide data required + // by the filter to operate (scanners list) and all others (joinedScanners list). List scanners = new ArrayList(); + List joinedScanners = new ArrayList(); if (additionalScanners != null) { scanners.addAll(additionalScanners); } @@ -3515,9 +3540,17 @@ scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); KeyValueScanner scanner = store.getScanner(scan, entry.getValue()); - scanners.add(scanner); + if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() + || this.filter.isFamilyEssential(entry.getKey())) { + scanners.add(scanner); + } else { + joinedScanners.add(scanner); + } } this.storeHeap = new KeyValueHeap(scanners, comparator); + if (!joinedScanners.isEmpty()) { + this.joinedHeap = new KeyValueHeap(joinedScanners, comparator); + } } RegionScannerImpl(Scan scan) throws IOException { @@ -3599,6 +3632,42 @@ return next(outResults, batch, metric); } + private void populateFromJoinedHeap(int limit, String metric) throws IOException { + assert joinedContinuationRow != null; + KeyValue kv = populateResult(this.joinedHeap, limit, joinedContinuationRow.getBuffer(), + joinedContinuationRow.getRowOffset(), joinedContinuationRow.getRowLength(), metric); + if (kv == KV_LIMIT) { + // We are done with this row, reset the continuation. + joinedContinuationRow = null; + } + // As the data is obtained from two independent heaps, we need to + // ensure that result list is sorted, because Result relies on that. + Collections.sort(results, comparator); + } + + /** + * Fetches records with this row into result list, until next row or limit (if not -1). + * @param heap KeyValueHeap to fetch data from. It must be positioned on correct row before call. + * @param limit Max amount of KVs to place in result list, -1 means no limit. + * @param currentRow Byte array with key we are fetching. + * @param offset offset for currentRow + * @param length length for currentRow + * @param metric Metric key to be passed into KeyValueHeap::next(). + * @return true if limit reached, false otherwise. + */ + private KeyValue populateResult(KeyValueHeap heap, int limit, byte[] currentRow, int offset, + short length, String metric) throws IOException { + KeyValue nextKv; + do { + heap.next(results, limit - results.size(), metric); + if (limit > 0 && results.size() == limit) { + return KV_LIMIT; + } + nextKv = heap.peek(); + } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length)); + return nextKv; + } + /* * @return True if a filter rules the scanner is over, done. */ @@ -3608,6 +3677,11 @@ private boolean nextInternal(int limit, String metric) throws IOException { RpcCallContext rpcCall = HBaseServer.getCurrentCall(); + // The loop here is used only when at some point during the next we determine + // that due to effects of filters or otherwise, we have an empty row in the result. + // Then we loop and try again. Otherwise, we must get out on the first iteration via return, + // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row, + // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow). while (true) { if (rpcCall != null) { // If a user specifies a too-restrictive or too-slow scanner, the @@ -3617,7 +3691,9 @@ rpcCall.throwExceptionIfCallerDisconnected(); } + // Let's see what we have in the storeHeap. KeyValue current = this.storeHeap.peek(); + byte[] currentRow = null; int offset = 0; short length = 0; @@ -3626,41 +3702,48 @@ offset = current.getRowOffset(); length = current.getRowLength(); } - if (isStopRow(currentRow, offset, length)) { - if (filter != null && filter.hasFilterRow()) { - filter.filterRow(results); + boolean stopRow = isStopRow(currentRow, offset, length); + // Check if we were getting data from the joinedHeap abd hit the limit. + // If not, then it's main path - getting results from storeHeap. + if (joinedContinuationRow == null) { + // First, check if we are at a stop row. If so, there are no more results. + if (stopRow) { + if (filter != null && filter.hasFilterRow()) { + filter.filterRow(results); + } + if (filter != null && filter.filterRow()) { + results.clear(); + } + return false; } - if (filter != null && filter.filterRow()) { - results.clear(); + + // Check if rowkey filter wants to exclude this row. If so, loop to next. + // Techically, if we hit limits before on this row, we don't need this call. + if (filterRowKey(currentRow, offset, length)) { + nextRow(currentRow, offset, length); + continue; } - return false; - } else if (filterRowKey(currentRow, offset, length)) { - nextRow(currentRow, offset, length); - } else { - KeyValue nextKv; - do { - this.storeHeap.next(results, limit - results.size(), metric); - if (limit > 0 && results.size() == limit) { - if (this.filter != null && filter.hasFilterRow()) { - throw new IncompatibleFilterException( - "Filter with filterRow(List) incompatible with scan with limit!"); - } - return true; // we are expecting more yes, but also limited to how many we can return. + // Ok, we are good, let's try to get some results from the main heap. + KeyValue nextKv = populateResult(this.storeHeap, limit, currentRow, offset, length, metric); + if (nextKv == KV_LIMIT) { + if (this.filter != null && filter.hasFilterRow()) { + throw new IncompatibleFilterException( + "Filter whose hasFilterRow() returns true is incompatible with scan with limit!"); } - nextKv = this.storeHeap.peek(); - } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length)); + return true; // We hit the limit. + } + stopRow = nextKv == null || isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength()); + // save that the row was empty before filters applied to it. + final boolean isEmptyRow = results.isEmpty(); - final boolean stopRow = nextKv == null || isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength()); - - // now that we have an entire row, lets process with a filters: - - // first filter with the filterRow(List) + // We have the part of the row necessary for filtering (all of it, usually). + // First filter with the filterRow(List). if (filter != null && filter.hasFilterRow()) { filter.filterRow(results); } - if (results.isEmpty() || filterRow()) { + if (isEmptyRow || filterRow()) { // this seems like a redundant step - we already consumed the row // there're no left overs. // the reasons for calling this method are: @@ -3669,12 +3752,48 @@ nextRow(currentRow, offset, length); // This row was totally filtered out, if this is NOT the last row, - // we should continue on. - + // we should continue on. Otherwise, nothing else to do. if (!stopRow) continue; + return false; } - return !stopRow; + + // Ok, we are done with storeHeap for this row. + // Now we may need to fetch additional, non-essential data into row. + // These values are not needed for filter to work, so we postpone their + // fetch to (possibly) reduce amount of data loads from disk. + if (this.joinedHeap != null) { + KeyValue nextJoinedKv = joinedHeap.peek(); + // If joinedHeap is pointing to some other row, try to seek to a correct one. + // We don't need to recheck that row here - populateResult will take care of that. + boolean mayHaveData = + (nextJoinedKv != null && nextJoinedKv.matchingRow(currentRow, offset, length)) + || this.joinedHeap.seek(KeyValue.createFirstOnRow(currentRow, offset, length)); + if (mayHaveData) { + joinedContinuationRow = current; + populateFromJoinedHeap(limit, metric); + } + } + } else { + // Populating from the joined map was stopped by limits, populate some more. + populateFromJoinedHeap(limit, metric); } + + // We may have just called populateFromJoinedMap and hit the limits. If that is + // the case, we need to call it again on the next next() invocation. + if (joinedContinuationRow != null) { + return true; + } + + // Finally, we are done with both joinedHeap and storeHeap. + // Double check to prevent empty rows from appearing in result. It could be + // the case when SingleValueExcludeFilter is used. + if (results.isEmpty()) { + nextRow(currentRow, offset, length); + if (!stopRow) continue; + } + + // We are done. Return the result. + return !stopRow; } } @@ -3709,6 +3828,10 @@ storeHeap.close(); storeHeap = null; } + if (joinedHeap != null) { + joinedHeap.close(); + joinedHeap = null; + } // no need to sychronize here. scannerReadPoints.remove(this); this.filterClosed = true; @@ -3723,16 +3846,21 @@ if (row == null) { throw new IllegalArgumentException("Row cannot be null."); } + boolean result = false; startRegionOperation(); try { // This could be a new thread from the last time we called next(). MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); KeyValue kv = KeyValue.createFirstOnRow(row); // use request seek to make use of the lazy seek option. See HBASE-5520 - return this.storeHeap.requestSeek(kv, true, true); + result = this.storeHeap.requestSeek(kv, true, true); + if (this.joinedHeap != null) { + result = this.joinedHeap.requestSeek(kv, true, true) || result; + } } finally { closeRegionOperation(); } + return result; } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1427046) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -2361,6 +2361,8 @@ try { HRegion r = getRegion(regionName); r.checkRow(scan.getStartRow(), "Scan"); + scan.setLoadColumnFamiliesOnDemand(r.isLoadingCfsOnDemandDefault() + || scan.doLoadColumnFamiliesOnDemand()); r.prepareScanner(scan); RegionScanner s = null; if (r.getCoprocessorHost() != null) { Index: src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java (revision 1427046) +++ src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java (working copy) @@ -27,6 +27,9 @@ import org.apache.hadoop.hbase.util.Bytes; import org.junit.experimental.categories.Category; +import java.util.List; +import java.util.ArrayList; + /** * Tests for {@link SingleColumnValueExcludeFilter}. Because this filter * extends {@link SingleColumnValueFilter}, only the added functionality is @@ -53,16 +56,18 @@ CompareOp.EQUAL, VAL_1); // A 'match' situation - KeyValue kv; - kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1); - // INCLUDE expected because test column has not yet passed - assertTrue("otherColumn", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE); - kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1); - // Test column will pass (will match), will SKIP because test columns are excluded - assertTrue("testedMatch", filter.filterKeyValue(kv) == Filter.ReturnCode.SKIP); - // Test column has already passed and matched, all subsequent columns are INCLUDE - kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1); - assertTrue("otherColumn", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE); + List kvs = new ArrayList(); + KeyValue kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1); + + kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1)); + kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1)); + kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1)); + + filter.filterRow(kvs); + + assertEquals("resultSize", kvs.size(), 2); + assertTrue("leftKV1", KeyValue.COMPARATOR.compare(kvs.get(0), kv) == 0); + assertTrue("leftKV2", KeyValue.COMPARATOR.compare(kvs.get(1), kv) == 0); assertFalse("allRemainingWhenMatch", filter.filterAllRemaining()); // A 'mismatch' situation Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1427046) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -66,9 +68,11 @@ import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.NullComparator; import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredTask; @@ -200,7 +204,7 @@ System.out.println(results); assertEquals(0, results.size()); } - + @Test public void testToShowNPEOnRegionScannerReseek() throws Exception{ String method = "testToShowNPEOnRegionScannerReseek"; @@ -2816,6 +2820,173 @@ } } + /** + * Added for HBASE-5416 + * + * Here we test scan optimization when only subset of CFs are used in filter + * conditions. + */ + public void testScanner_JoinedScanners() throws IOException { + byte [] tableName = Bytes.toBytes("testTable"); + byte [] cf_essential = Bytes.toBytes("essential"); + byte [] cf_joined = Bytes.toBytes("joined"); + byte [] cf_alpha = Bytes.toBytes("alpha"); + this.region = initHRegion(tableName, getName(), conf, cf_essential, cf_joined, cf_alpha); + try { + byte [] row1 = Bytes.toBytes("row1"); + byte [] row2 = Bytes.toBytes("row2"); + byte [] row3 = Bytes.toBytes("row3"); + + byte [] col_normal = Bytes.toBytes("d"); + byte [] col_alpha = Bytes.toBytes("a"); + + byte [] filtered_val = Bytes.toBytes(3); + + Put put = new Put(row1); + put.add(cf_essential, col_normal, Bytes.toBytes(1)); + put.add(cf_joined, col_alpha, Bytes.toBytes(1)); + region.put(put); + + put = new Put(row2); + put.add(cf_essential, col_alpha, Bytes.toBytes(2)); + put.add(cf_joined, col_normal, Bytes.toBytes(2)); + put.add(cf_alpha, col_alpha, Bytes.toBytes(2)); + region.put(put); + + put = new Put(row3); + put.add(cf_essential, col_normal, filtered_val); + put.add(cf_joined, col_normal, filtered_val); + region.put(put); + + // Check two things: + // 1. result list contains expected values + // 2. result list is sorted properly + + Scan scan = new Scan(); + Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal, + CompareOp.NOT_EQUAL, filtered_val); + scan.setFilter(filter); + scan.setLoadColumnFamiliesOnDemand(true); + InternalScanner s = region.getScanner(scan); + + List results = new ArrayList(); + assertTrue(s.next(results)); + assertEquals(results.size(), 1); + results.clear(); + + assertTrue(s.next(results)); + assertEquals(results.size(), 3); + assertTrue("orderCheck", results.get(0).matchingFamily(cf_alpha)); + assertTrue("orderCheck", results.get(1).matchingFamily(cf_essential)); + assertTrue("orderCheck", results.get(2).matchingFamily(cf_joined)); + results.clear(); + + assertFalse(s.next(results)); + assertEquals(results.size(), 0); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + /** + * HBASE-5416 + * + * Test case when scan limits amount of KVs returned on each next() call. + */ + public void testScanner_JoinedScannersWithLimits() throws IOException { + final byte [] tableName = Bytes.toBytes("testTable"); + final byte [] cf_first = Bytes.toBytes("first"); + final byte [] cf_second = Bytes.toBytes("second"); + + this.region = initHRegion(tableName, getName(), conf, cf_first, cf_second); + try { + final byte [] col_a = Bytes.toBytes("a"); + final byte [] col_b = Bytes.toBytes("b"); + + Put put; + + for (int i = 0; i < 10; i++) { + put = new Put(Bytes.toBytes("r" + Integer.toString(i))); + put.add(cf_first, col_a, Bytes.toBytes(i)); + if (i < 5) { + put.add(cf_first, col_b, Bytes.toBytes(i)); + put.add(cf_second, col_a, Bytes.toBytes(i)); + put.add(cf_second, col_b, Bytes.toBytes(i)); + } + region.put(put); + } + + Scan scan = new Scan(); + scan.setLoadColumnFamiliesOnDemand(true); + Filter bogusFilter = new FilterBase() { + @Override + public boolean isFamilyEssential(byte[] name) { + return Bytes.equals(name, cf_first); + } + @Override + public void readFields(DataInput arg0) throws IOException { + } + + @Override + public void write(DataOutput arg0) throws IOException { + } + }; + + scan.setFilter(bogusFilter); + InternalScanner s = region.getScanner(scan); + + // Our data looks like this: + // r0: first:a, first:b, second:a, second:b + // r1: first:a, first:b, second:a, second:b + // r2: first:a, first:b, second:a, second:b + // r3: first:a, first:b, second:a, second:b + // r4: first:a, first:b, second:a, second:b + // r5: first:a + // r6: first:a + // r7: first:a + // r8: first:a + // r9: first:a + + // But due to next's limit set to 3, we should get this: + // r0: first:a, first:b, second:a + // r0: second:b + // r1: first:a, first:b, second:a + // r1: second:b + // r2: first:a, first:b, second:a + // r2: second:b + // r3: first:a, first:b, second:a + // r3: second:b + // r4: first:a, first:b, second:a + // r4: second:b + // r5: first:a + // r6: first:a + // r7: first:a + // r8: first:a + // r9: first:a + + List results = new ArrayList(); + int index = 0; + while (true) { + boolean more = s.next(results, 3); + if ((index >> 1) < 5) { + if (index % 2 == 0) + assertEquals(results.size(), 3); + else + assertEquals(results.size(), 1); + } + else + assertEquals(results.size(), 1); + results.clear(); + index++; + if (!more) break; + } + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + ////////////////////////////////////////////////////////////////////////////// // Split test //////////////////////////////////////////////////////////////////////////////