Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1345677) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.filter.NullComparator; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredTask; @@ -2675,6 +2676,158 @@ } } + /** + * Added for HBASE-5416 + * + * Here we test scan optimization when only subset of CFs are used in filter + * conditions. + */ + public void testScanner_JoinedScanners() throws IOException { + byte [] tableName = Bytes.toBytes("testTable"); + byte [] cf_essential = Bytes.toBytes("essential"); + byte [] cf_joined = Bytes.toBytes("joined"); + byte [] cf_alpha = Bytes.toBytes("alpha"); + this.region = initHRegion(tableName, getName(), cf_essential, cf_joined, cf_alpha); + try { + byte [] row1 = Bytes.toBytes("row1"); + byte [] row2 = Bytes.toBytes("row2"); + byte [] row3 = Bytes.toBytes("row3"); + + byte [] col_normal = Bytes.toBytes("d"); + byte [] col_alpha = Bytes.toBytes("a"); + + byte [] filtered_val = Bytes.toBytes(3); + + Put put = new Put(row1); + put.add(cf_essential, col_normal, Bytes.toBytes(1)); + put.add(cf_joined, col_alpha, Bytes.toBytes(1)); + region.put(put); + + put = new Put(row2); + put.add(cf_essential, col_alpha, Bytes.toBytes(2)); + put.add(cf_joined, col_normal, Bytes.toBytes(2)); + put.add(cf_alpha, col_alpha, Bytes.toBytes(2)); + region.put(put); + + put = new Put(row3); + put.add(cf_essential, col_normal, filtered_val); + put.add(cf_joined, col_normal, filtered_val); + region.put(put); + + // Check two things: + // 1. result list contains expected values + // 2. result list is sorted properly + + Scan scan = new Scan(); + Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal, + CompareOp.NOT_EQUAL, filtered_val); + scan.setFilter(filter); + InternalScanner s = region.getScanner(scan); + + List results = new ArrayList(); + assertTrue(s.next(results)); + assertEquals(results.size(), 1); + results.clear(); + + assertTrue(s.next(results)); + assertEquals(results.size(), 3); + assertTrue("orderCheck", results.get(0).matchingFamily(cf_alpha)); + assertTrue("orderCheck", results.get(1).matchingFamily(cf_essential)); + assertTrue("orderCheck", results.get(2).matchingFamily(cf_joined)); + results.clear(); + + assertFalse(s.next(results)); + assertEquals(results.size(), 0); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + /** + * HBASE-5416 + * + * Test case when scan limits amount of KVs returned on each next() call. + */ + public void testScanner_JoinedScannersAndLimits() throws IOException { + byte [] tableName = Bytes.toBytes("testTable"); + byte [] cf_first = Bytes.toBytes("first"); + byte [] cf_second = Bytes.toBytes("second"); + + this.region = initHRegion(tableName, getName(), cf_first, cf_second); + try { + byte [] col_a = Bytes.toBytes("a"); + byte [] col_b = Bytes.toBytes("b"); + + Put put; + + for (int i = 0; i < 10; i++) { + put = new Put(Bytes.toBytes("r" + Integer.toString(i))); + put.add(cf_first, col_a, Bytes.toBytes(i)); + if (i < 5) { + put.add(cf_first, col_b, Bytes.toBytes(i)); + put.add(cf_second, col_a, Bytes.toBytes(i)); + put.add(cf_second, col_b, Bytes.toBytes(i)); + } + region.put(put); + } + + Scan scan = new Scan(); + Filter filter = new SingleColumnValueFilter(cf_first, col_a, CompareOp.NOT_EQUAL, Bytes.toBytes("bogus")); + scan.setFilter(filter); + InternalScanner s = region.getScanner(scan); + + // Our data looks like this: + // r0: first:a, first:b, second:a, second:b + // r1: first:a, first:b, second:a, second:b + // r2: first:a, first:b, second:a, second:b + // r3: first:a, first:b, second:a, second:b + // r4: first:a, first:b, second:a, second:b + // r5: first:a + // r6: first:a + // r7: first:a + // r8: first:a + // r9: first:a + + // But due to next's limit set to 3, we should get this: + // r0: first:a, first:b, second:a + // r0: second:b + // r1: first:a, first:b, second:a + // r1: second:b + // r2: first:a, first:b, second:a + // r2: second:b + // r3: first:a, first:b, second:a + // r3: second:b + // r4: first:a, first:b, second:a + // r4: second:b + // r5: first:a + // r6: first:a + // r7: first:a + // r8: first:a + // r9: first:a + + List results = new ArrayList(); + int index = 0; + while (true) { + boolean more = s.next(results, 3); + if ((index >> 1) < 5) { + if (index % 2 == 0) + assertEquals(results.size(), 3); + else + assertEquals(results.size(), 1); + } + else + assertEquals(results.size(), 1); + results.clear(); + index++; + if (!more) break; + } + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + ////////////////////////////////////////////////////////////////////////////// // Split test ////////////////////////////////////////////////////////////////////////////// Index: hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java (revision 1345677) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java (working copy) @@ -28,6 +28,9 @@ import static org.junit.Assert.*; +import java.util.List; +import java.util.ArrayList; + /** * Tests for {@link SingleColumnValueExcludeFilter}. Because this filter * extends {@link SingleColumnValueFilter}, only the added functionality is @@ -53,16 +56,18 @@ CompareOp.EQUAL, VAL_1); // A 'match' situation - KeyValue kv; - kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1); - // INCLUDE expected because test column has not yet passed - assertTrue("otherColumn", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE); - kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1); - // Test column will pass (will match), will SKIP because test columns are excluded - assertTrue("testedMatch", filter.filterKeyValue(kv) == Filter.ReturnCode.SKIP); - // Test column has already passed and matched, all subsequent columns are INCLUDE - kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1); - assertTrue("otherColumn", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE); + List kvs = new ArrayList(); + KeyValue kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1); + + kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1)); + kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1)); + kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1)); + + filter.filterRow(kvs); + + assertEquals("resultSize", kvs.size(), 2); + assertTrue("leftKV1", KeyValue.COMPARATOR.compare(kvs.get(0), kv) == 0); + assertTrue("leftKV2", KeyValue.COMPARATOR.compare(kvs.get(1), kv) == 0); assertFalse("allRemainingWhenMatch", filter.filterAllRemaining()); // A 'mismatch' situation Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1345677) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -3347,6 +3347,9 @@ class RegionScannerImpl implements RegionScanner { // Package local for testability KeyValueHeap storeHeap = null; + KeyValueHeap joinedHeap = null; + // state flag which indicates when joined heap data gather interrupted due to scan limits + private boolean joinedHeapHasMoreData = false; private final byte [] stopRow; private Filter filter; private List results = new ArrayList(); @@ -3388,7 +3391,11 @@ scannerReadPoints.put(this, this.readPt); } + // Here we separate all scanners into two lists - first is scanners, + // providing data required by the filter to operate (scanners list) and + // all others (joinedScanners list). List scanners = new ArrayList(); + List joinedScanners = new ArrayList(); if (additionalScanners != null) { scanners.addAll(additionalScanners); } @@ -3396,10 +3403,17 @@ for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - StoreScanner scanner = store.getScanner(scan, entry.getValue()); - scanners.add(scanner); + KeyValueScanner scanner = store.getScanner(scan, entry.getValue()); + if (this.filter == null || this.filter.isFamilyEssential(entry.getKey())) { + scanners.add(scanner); + } else { + joinedScanners.add(scanner); + } } this.storeHeap = new KeyValueHeap(scanners, comparator); + if (!joinedScanners.isEmpty()) { + this.joinedHeap = new KeyValueHeap(joinedScanners, comparator); + } } RegionScannerImpl(Scan scan) throws IOException { @@ -3470,6 +3484,32 @@ return next(outResults, batch, metric); } + /** + * Fetches records with this row into result list, until next row or limit (if not -1). + * @param heap KeyValueHeap to fetch data from. It must be positioned on correct row before call. + * @param limit Max amount of KVs to place in result list, -1 means no limit. + * @param currentRow Byte array with key we are fetching. + * @param metric Metric key to be passed into KeyValueHeap::next(). + * @return true if limit reached, false ovewise. + */ + private boolean populateResult(KeyValueHeap heap, int limit, byte[] currentRow, String metric) throws IOException { + KeyValue nextKV = heap.peek(); + + if (!Bytes.equals(currentRow, nextKV.getRow())) { + return false; + } + + do { + heap.next(results, limit - results.size(), metric); + if (limit > 0 && results.size() == limit) { + return true; + } + nextKV = heap.peek(); + } while (nextKV != null && Bytes.equals(currentRow, nextKV.getRow())); + + return false; + } + /* * @return True if a filter rules the scanner is over, done. */ @@ -3500,20 +3540,22 @@ return false; } else if (filterRowKey(currentRow)) { nextRow(currentRow); + } else if (joinedHeapHasMoreData) { + joinedHeapHasMoreData = populateResult(this.joinedHeap, limit, + currentRow, metric); + return true; } else { - byte [] nextRow; - do { - this.storeHeap.next(results, limit - results.size(), metric); - if (limit > 0 && results.size() == limit) { - if (this.filter != null && filter.hasFilterRow()) { - throw new IncompatibleFilterException( - "Filter with filterRow(List) incompatible with scan with limit!"); - } - return true; // we are expecting more yes, but also limited to how many we can return. - } - } while (Bytes.equals(currentRow, nextRow = peekRow())); + if (populateResult(this.storeHeap, limit, currentRow, metric)) { + if (this.filter != null && filter.hasFilterRow()) throw new IncompatibleFilterException( + "Filter with filterRow(List) incompatible with scan with limit!"); + return true; // we are expecting more yes, but also limited to how many we can return. + } + byte [] nextRow = peekRow(); + final boolean stopRow = isStopRow(nextRow); + // save that the row was empty before filters applied to it. + final boolean isEmptyRow = results.isEmpty(); // now that we have an entire row, lets process with a filters: @@ -3522,7 +3564,7 @@ filter.filterRow(results); } - if (results.isEmpty() || filterRow()) { + if (isEmptyRow || filterRow()) { // this seems like a redundant step - we already consumed the row // there're no left overs. // the reasons for calling this method are: @@ -3535,6 +3577,41 @@ if (!stopRow) continue; } + else { + // Here we need to fetch additional, non-essential data into row. These + // values are not needed for filter to work, so we postpone their + // fetch to (possibly) reduce amount of data loads from disk. + KeyValue nextKV; + if (this.joinedHeap != null && (nextKV = this.joinedHeap.peek()) != null) { + boolean correct_row = true; + // do seek only when it's needed + if (!Bytes.equals(currentRow, nextKV.getRow())) { + correct_row = this.joinedHeap.seek(KeyValue.createFirstOnRow(currentRow)); + if (correct_row) { + correct_row = (nextKV = this.joinedHeap.peek()) != null && Bytes.equals(currentRow, nextKV.getRow()); + } + } + if (correct_row) { + this.joinedHeapHasMoreData = populateResult(this.joinedHeap, limit, currentRow, metric); + // As the data obtained from two independent heaps, we need to + // ensure that result list is sorted, because Result relies + // on that. + Collections.sort(results, comparator); + + // Result list population was interrupted by limits, we need to restart it on next() invocation. + if (this.joinedHeapHasMoreData) { + return true; + } + } + } + + // Double check to prevent empty rows to appear in result. It could be + // the case when SingleValueExcludeFilter is used. + if (results.isEmpty()) { + nextRow(currentRow); + if (!stopRow) continue; + } + } return !stopRow; } } @@ -3551,14 +3628,18 @@ protected void nextRow(byte [] currentRow) throws IOException { while (Bytes.equals(currentRow, peekRow())) { - this.storeHeap.next(MOCKED_LIST); + if (this.joinedHeapHasMoreData) { + this.joinedHeap.next(MOCKED_LIST); + } else { + this.storeHeap.next(MOCKED_LIST); + } } results.clear(); resetFilters(); } private byte[] peekRow() { - KeyValue kv = this.storeHeap.peek(); + KeyValue kv = this.joinedHeapHasMoreData ? this.joinedHeap.peek() : this.storeHeap.peek(); return kv == null ? null : kv.getRow(); } @@ -3575,6 +3656,10 @@ storeHeap.close(); storeHeap = null; } + if (joinedHeap != null) { + joinedHeap.close(); + joinedHeap = null; + } // no need to sychronize here. scannerReadPoints.remove(this); this.filterClosed = true; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java (revision 1345677) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java (working copy) @@ -108,6 +108,10 @@ } } + public boolean isFamilyEssential(byte[] name) { + return filter.isFamilyEssential(name); + } + @Override public String toString() { return this.getClass().getSimpleName() + " " + this.filter.toString(); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java (revision 1345677) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java (working copy) @@ -26,6 +26,8 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import java.util.ArrayList; +import java.util.List; +import java.util.Iterator; /** * A {@link Filter} that checks a single column value, but does not emit the @@ -80,16 +82,22 @@ super(family, qualifier, compareOp, comparator); } - public ReturnCode filterKeyValue(KeyValue keyValue) { - ReturnCode superRetCode = super.filterKeyValue(keyValue); - if (superRetCode == ReturnCode.INCLUDE) { + // We cleaned result row in FilterRow to be consistent with scanning process. + public boolean hasFilterRow() { + return true; + } + + // Here we remove from row all key values from testing column + public void filterRow(List kvs) { + Iterator it = kvs.iterator(); + while (it.hasNext()) { + KeyValue kv = (KeyValue)it.next(); // If the current column is actually the tested column, // we will skip it instead. - if (keyValue.matchingColumn(this.columnFamily, this.columnQualifier)) { - return ReturnCode.SKIP; + if (kv.matchingColumn(this.columnFamily, this.columnQualifier)) { + it.remove(); } } - return superRetCode; } public static Filter createFilterFromArguments(ArrayList filterArguments) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java (revision 1345677) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java (working copy) @@ -135,6 +135,16 @@ } /** + * By default, we require all scan's column families to be present. Our + * subclasses may be more precise. + * + * @inheritDoc + */ + public boolean isFamilyEssential(byte[] name) { + return true; + } + + /** * Given the filter's arguments it constructs the filter *

* @param filterArguments the filter's arguments Index: hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java (revision 1345677) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java (working copy) @@ -326,6 +326,16 @@ } @Override + public boolean isFamilyEssential(byte[] name) { + for (Filter filter : filters) { + if (filter.isFamilyEssential(name)) { + return true; + } + } + return false; + } + + @Override public String toString() { return toString(MAX_LOG_FILTERS); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/filter/Filter.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/filter/Filter.java (revision 1345677) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/filter/Filter.java (working copy) @@ -167,4 +167,12 @@ * not sure which key to seek to next. */ public KeyValue getNextKeyHint(final KeyValue currentKV); + + /** + * Check that given column family is essential for filter to check row. Most + * filters always return true here. But some could have more sophisticated + * logic which could significantly reduce scanning process by not even + * touching columns until we are 100% sure that it's data is needed in result. + */ + public boolean isFamilyEssential(byte[] name); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java (revision 1345677) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java (working copy) @@ -109,6 +109,10 @@ } } + public boolean isFamilyEssential(byte[] name) { + return filter.isFamilyEssential(name); + } + @Override public String toString() { return this.getClass().getSimpleName() + " " + this.filter.toString(); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java (revision 1345677) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java (working copy) @@ -311,6 +311,14 @@ out.writeBoolean(latestVersionOnly); } + /** + * The only CF this filter needs is given column family. So, + * it's the only essential column in whole scan. + */ + public boolean isFamilyEssential(byte[] name) { + return Bytes.equals(name, this.columnFamily); + } + @Override public String toString() { return String.format("%s (%s, %s, %s, %s)",