diff --git src/main/java/org/apache/hadoop/hbase/filter/Filter.java src/main/java/org/apache/hadoop/hbase/filter/Filter.java index 02ea5f5..5c2f555 100644 --- src/main/java/org/apache/hadoop/hbase/filter/Filter.java +++ src/main/java/org/apache/hadoop/hbase/filter/Filter.java @@ -163,4 +163,12 @@ public interface Filter extends Writable { * not sure which key to seek to next. */ public KeyValue getNextKeyHint(final KeyValue currentKV); + + /** + * Check that given column family is essential for filter to check row. Most + * filters always return true here, but some could have more sophisticated + * logic which could significantly expedite scanning process by not even + * touching columns until we are 100% sure that it's data is needed in result. + */ + public boolean isFamilyEssential(byte[] name); } diff --git src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java index 0d1b123..df541ea 100644 --- src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java +++ src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java @@ -139,4 +139,14 @@ public abstract class FilterBase implements Filter { public static Filter createFilterFromArguments(ArrayList filterArguments) { throw new IllegalArgumentException("This method has not been implemented"); } + + /** + * By default, we require all scan's column families to be present. Our + * subclasses are free to be more precise. + * + * @inheritDoc + */ + public boolean isFamilyEssential(byte[] name) { + return true; + } } diff --git src/main/java/org/apache/hadoop/hbase/filter/FilterList.java src/main/java/org/apache/hadoop/hbase/filter/FilterList.java index 216d0db..4433086 100644 --- src/main/java/org/apache/hadoop/hbase/filter/FilterList.java +++ src/main/java/org/apache/hadoop/hbase/filter/FilterList.java @@ -310,4 +310,14 @@ public class FilterList implements Filter { } return keyHint; } + + @Override + public boolean isFamilyEssential(byte[] name) { + for (Filter filter : filters) { + if (filter.isFamilyEssential(name)) { + return true; + } + } + return false; + } } \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java index 7c7607f..7124078 100644 --- src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java +++ src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java @@ -25,6 +25,9 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import java.util.ArrayList; +import java.util.List; +import java.util.Iterator; + /** * A {@link Filter} that checks a single column value, but does not emit the * tested column. This will enable a performance boost over @@ -76,16 +79,23 @@ public class SingleColumnValueExcludeFilter extends SingleColumnValueFilter { super(family, qualifier, compareOp, comparator); } - public ReturnCode filterKeyValue(KeyValue keyValue) { - ReturnCode superRetCode = super.filterKeyValue(keyValue); - if (superRetCode == ReturnCode.INCLUDE) { + // We clean result row in filterRow to be consistent with scanning process. + public boolean hasFilterRow() { + return true; + } + + // Here we remove from row all key values from testing column + public void filterRow(List kvs) { + Iterator it = kvs.iterator(); + while (it.hasNext()) { + KeyValue kv = (KeyValue)it.next(); + // If the current column is actually the tested column, // we will skip it instead. - if (keyValue.matchingColumn(this.columnFamily, this.columnQualifier)) { - return ReturnCode.SKIP; + if (kv.matchingColumn(this.columnFamily, this.columnQualifier)) { + it.remove(); } } - return superRetCode; } public static Filter createFilterFromArguments(ArrayList filterArguments) { diff --git src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java index 7e4b2ae..6849833 100644 --- src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java +++ src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java @@ -306,4 +306,12 @@ public class SingleColumnValueFilter extends FilterBase { out.writeBoolean(filterIfMissing); out.writeBoolean(latestVersionOnly); } + + /** + * The only thing this filter needs to check now is given column family. So, + * it's the only essential column in whole scan. + */ + public boolean isFamilyEssential(byte[] name) { + return Bytes.equals(name, this.columnFamily); + } } diff --git src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java index 8be40ee..539cf78 100644 --- src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java +++ src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java @@ -103,4 +103,8 @@ public class SkipFilter extends FilterBase { throw new RuntimeException("Failed deserialize.", e); } } + + public boolean isFamilyEssential(byte[] name) { + return filter.isFamilyEssential(name); + } } diff --git src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java index b9fa927..47f41cf 100644 --- src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java +++ src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java @@ -104,4 +104,8 @@ public class WhileMatchFilter extends FilterBase { throw new RuntimeException("Failed deserialize.", e); } } + + public boolean isFamilyEssential(byte[] name) { + return filter.isFamilyEssential(name); + } } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index dfd29ee..1b5775f 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3250,6 +3250,7 @@ public class HRegion implements HeapSize { // , Writable{ class RegionScannerImpl implements RegionScanner { // Package local for testability KeyValueHeap storeHeap = null; + KeyValueHeap joinedHeap = null; private final byte [] stopRow; private Filter filter; private List results = new ArrayList(); @@ -3289,7 +3290,11 @@ public class HRegion implements HeapSize { // , Writable{ scannerReadPoints.put(this, this.readPt); } + // Here we separate all scanners into two lists - scanners + // providing data required by the filter to operate (scanners list) and + // all others (joinedScanners list). List scanners = new ArrayList(); + List joinedScanners = new ArrayList(); if (additionalScanners != null) { scanners.addAll(additionalScanners); } @@ -3298,9 +3303,17 @@ public class HRegion implements HeapSize { // , Writable{ scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); StoreScanner scanner = store.getScanner(scan, entry.getValue()); - scanners.add(scanner); + if (this.filter == null || this.filter.isFamilyEssential(entry.getKey())) { + scanners.add(scanner); + } + else { + joinedScanners.add(scanner); + } } this.storeHeap = new KeyValueHeap(scanners, comparator); + if (!joinedScanners.isEmpty()) { + this.joinedHeap = new KeyValueHeap(joinedScanners, comparator); + } } RegionScannerImpl(Scan scan) throws IOException { @@ -3389,6 +3402,9 @@ public class HRegion implements HeapSize { // , Writable{ final boolean stopRow = isStopRow(nextRow); + // save whether the row was empty before filters are applied to it. + final boolean isEmptyRow = results.isEmpty(); + // now that we have an entire row, lets process with a filters: // first filter with the filterRow(List) @@ -3396,7 +3412,7 @@ public class HRegion implements HeapSize { // , Writable{ filter.filterRow(results); } - if (results.isEmpty() || filterRow()) { + if (isEmptyRow || filterRow()) { // this seems like a redundant step - we already consumed the row // there're no left overs. // the reasons for calling this method are: @@ -3409,6 +3425,36 @@ public class HRegion implements HeapSize { // , Writable{ if (!stopRow) continue; } + else { + // Here we need to fetch additional, non-essential data into row. The + // values are not needed for filter to work, so we postpone their + // fetch to (possibly) reduce amount of data loaded from disk. + if (this.joinedHeap != null && + this.joinedHeap.seek(KeyValue.createFirstOnRow(currentRow))) { + while (true) { + this.joinedHeap.next(results, limit - results.size()); + KeyValue nextKV = this.joinedHeap.peek(); + if (nextKV == null) { + break; + } + if (!Bytes.equals(currentRow, nextKV.getRow())) { + break; + } + } + // As the data obtained from two independent heaps, we need to + // ensure that result list is sorted, because Result relies + // on that. + Collections.sort(results, comparator); + } + + // Double check to prevent empty rows to appear in result. It could be + // the case when SingleColumnValueExcludeFilter used. + if (results.isEmpty()) { + nextRow(currentRow); + if (!stopRow) continue; + } + } + return !stopRow; } } @@ -3449,6 +3495,10 @@ public class HRegion implements HeapSize { // , Writable{ storeHeap.close(); storeHeap = null; } + if (joinedHeap != null) { + joinedHeap.close(); + joinedHeap = null; + } // no need to sychronize here. scannerReadPoints.remove(this); this.filterClosed = true; diff --git src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java index b7cb6b5..4179615 100644 --- src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java +++ src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java @@ -26,6 +26,8 @@ import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.util.Bytes; import org.junit.experimental.categories.Category; +import java.util.List; +import java.util.ArrayList; /** * Tests for {@link SingleColumnValueExcludeFilter}. Because this filter @@ -53,17 +55,18 @@ public class TestSingleColumnValueExcludeFilter extends TestCase { CompareOp.EQUAL, VAL_1); // A 'match' situation - KeyValue kv; - kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1); - // INCLUDE expected because test column has not yet passed - assertTrue("otherColumn", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE); - kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1); - // Test column will pass (will match), will SKIP because test columns are excluded - assertTrue("testedMatch", filter.filterKeyValue(kv) == Filter.ReturnCode.SKIP); - // Test column has already passed and matched, all subsequent columns are INCLUDE - kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1); - assertTrue("otherColumn", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE); - assertFalse("allRemainingWhenMatch", filter.filterAllRemaining()); + List kvs = new ArrayList(); + KeyValue kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1); + + kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1)); + kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1)); + kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1)); + + filter.filterRow(kvs); + + assertEquals("resultSize", kvs.size(), 2); + assertTrue("leftKV1", KeyValue.COMPARATOR.compare(kvs.get(0), kv) == 0); + assertTrue("leftKV2", KeyValue.COMPARATOR.compare(kvs.get(1), kv) == 0); // A 'mismatch' situation filter.reset(); @@ -83,4 +86,3 @@ public class TestSingleColumnValueExcludeFilter extends TestCase { public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); } - diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 97c624d..d91e66b 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.NullComparator; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; @@ -2454,6 +2455,64 @@ public class TestHRegion extends HBaseTestCase { } } + public void testScanner_JoinedScanners() throws IOException { + byte [] tableName = Bytes.toBytes("testTable"); + byte [] cf_essential = Bytes.toBytes("essential"); + byte [] cf_joined = Bytes.toBytes("joined"); + byte [] cf_alpha = Bytes.toBytes("alpha"); + initHRegion(tableName, getName(), cf_essential, cf_joined, cf_alpha); + + byte [] row1 = Bytes.toBytes("row1"); + byte [] row2 = Bytes.toBytes("row2"); + byte [] row3 = Bytes.toBytes("row3"); + + byte [] col_normal = Bytes.toBytes("d"); + byte [] col_alpha = Bytes.toBytes("a"); + + byte [] filtered_val = Bytes.toBytes(3); + + Put put = new Put(row1); + put.add(cf_essential, col_normal, Bytes.toBytes(1)); + put.add(cf_joined, col_alpha, Bytes.toBytes(1)); + region.put(put); + + put = new Put(row2); + put.add(cf_essential, col_alpha, Bytes.toBytes(2)); + put.add(cf_joined, col_normal, Bytes.toBytes(2)); + put.add(cf_alpha, col_alpha, Bytes.toBytes(2)); + region.put(put); + + put = new Put(row3); + put.add(cf_essential, col_normal, filtered_val); + put.add(cf_joined, col_normal, filtered_val); + region.put(put); + + // Check two things: + // 1. result list contains expected values + // 2. result list is sorted properly + + Scan scan = new Scan(); + Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal, + CompareOp.NOT_EQUAL, filtered_val); + scan.setFilter(filter); + InternalScanner s = region.getScanner(scan); + + List results = new ArrayList(); + assertTrue(s.next(results)); + assertEquals(results.size(), 1); + results.clear(); + + assertTrue(s.next(results)); + assertEquals(results.size(), 3); + assertTrue("orderCheck", results.get(0).matchingFamily(cf_alpha)); + assertTrue("orderCheck", results.get(1).matchingFamily(cf_essential)); + assertTrue("orderCheck", results.get(2).matchingFamily(cf_joined)); + results.clear(); + + assertFalse(s.next(results)); + assertEquals(results.size(), 0); + } + ////////////////////////////////////////////////////////////////////////////// // Split test //////////////////////////////////////////////////////////////////////////////