toMap(int maxCols) {
// start with the fingerpring map and build on top of it
Index: src/main/java/org/apache/hadoop/hbase/filter/Filter.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/filter/Filter.java (revision 1431831)
+++ src/main/java/org/apache/hadoop/hbase/filter/Filter.java (working copy)
@@ -167,4 +167,12 @@
* not sure which key to seek to next.
*/
public KeyValue getNextKeyHint(final KeyValue currentKV);
+
+ /**
+ * Check that given column family is essential for filter to check row. Most
+ * filters always return true here. But some could have more sophisticated
+ * logic which could significantly reduce scanning process by not even
+ * touching columns until we are 100% sure that it's data is needed in result.
+ */
+ public boolean isFamilyEssential(byte[] name);
}
Index: src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java (revision 1431831)
+++ src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java (working copy)
@@ -131,6 +131,16 @@
}
/**
+ * By default, we require all scan's column families to be present. Our
+ * subclasses may be more precise.
+ *
+ * @inheritDoc
+ */
+ public boolean isFamilyEssential(byte[] name) {
+ return true;
+ }
+
+ /**
* Given the filter's arguments it constructs the filter
*
* @param filterArguments the filter's arguments
Index: src/main/java/org/apache/hadoop/hbase/filter/FilterList.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/filter/FilterList.java (revision 1431831)
+++ src/main/java/org/apache/hadoop/hbase/filter/FilterList.java (working copy)
@@ -318,6 +318,16 @@
}
@Override
+ public boolean isFamilyEssential(byte[] name) {
+ for (Filter filter : filters) {
+ if (filter.isFamilyEssential(name)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
public String toString() {
return toString(MAX_LOG_FILTERS);
}
Index: src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java (revision 1431831)
+++ src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java (working copy)
@@ -24,6 +24,8 @@
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
/**
* A {@link Filter} that checks a single column value, but does not emit the
@@ -76,16 +78,22 @@
super(family, qualifier, compareOp, comparator);
}
- public ReturnCode filterKeyValue(KeyValue keyValue) {
- ReturnCode superRetCode = super.filterKeyValue(keyValue);
- if (superRetCode == ReturnCode.INCLUDE) {
+ // We cleaned result row in FilterRow to be consistent with scanning process.
+ public boolean hasFilterRow() {
+ return true;
+ }
+
+ // Here we remove from row all key values from testing column
+ public void filterRow(List kvs) {
+ Iterator it = kvs.iterator();
+ while (it.hasNext()) {
+ KeyValue kv = (KeyValue)it.next();
// If the current column is actually the tested column,
// we will skip it instead.
- if (keyValue.matchingColumn(this.columnFamily, this.columnQualifier)) {
- return ReturnCode.SKIP;
+ if (kv.matchingColumn(this.columnFamily, this.columnQualifier)) {
+ it.remove();
}
}
- return superRetCode;
}
public static Filter createFilterFromArguments(ArrayList filterArguments) {
Index: src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java (revision 1431831)
+++ src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java (working copy)
@@ -307,6 +307,15 @@
out.writeBoolean(latestVersionOnly);
}
+ /**
+ * The only CF this filter needs is given column family. So, it's the only essential
+ * column in whole scan. If filterIfMissing == false, all families are essential,
+ * because of possibility of skipping the rows without any data in filtered CF.
+ */
+ public boolean isFamilyEssential(byte[] name) {
+ return !this.filterIfMissing || Bytes.equals(name, this.columnFamily);
+ }
+
@Override
public String toString() {
return String.format("%s (%s, %s, %s, %s)",
Index: src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java (revision 1431831)
+++ src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java (working copy)
@@ -104,6 +104,10 @@
}
}
+ public boolean isFamilyEssential(byte[] name) {
+ return filter.isFamilyEssential(name);
+ }
+
@Override
public String toString() {
return this.getClass().getSimpleName() + " " + this.filter.toString();
Index: src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java (revision 1431831)
+++ src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java (working copy)
@@ -105,6 +105,10 @@
}
}
+ public boolean isFamilyEssential(byte[] name) {
+ return filter.isFamilyEssential(name);
+ }
+
@Override
public String toString() {
return this.getClass().getSimpleName() + " " + this.filter.toString();
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1431831)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy)
@@ -178,6 +178,8 @@
public static final Log LOG = LogFactory.getLog(HRegion.class);
private static final String MERGEDIR = ".merges";
+ public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY = "hbase.hregion.scan.loadColumnFamiliesOnDemand";
+
final AtomicBoolean closed = new AtomicBoolean(false);
/* Closing can take some time; use the closing flag if there is stuff we don't
* want to do while in closing state; e.g. like offer this region up to the
@@ -263,8 +265,13 @@
KeyValue.KVComparator comparator;
private ConcurrentHashMap scannerReadPoints;
+ /**
+ * The default setting for whether to enable on-demand CF loading for
+ * scan requests to this region. Requests can override it.
+ */
+ private boolean isLoadingCfsOnDemandDefault = false;
- /*
+ /**
* @return The smallest mvcc readPoint across all the scanners in this
* region. Writes older than this readPoint, are included in every
* read operation.
@@ -416,6 +423,8 @@
this.conf = conf;
this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
DEFAULT_ROWLOCK_WAIT_DURATION);
+
+ this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, false);
this.regionInfo = regionInfo;
this.htableDescriptor = htd;
this.rsServices = rsServices;
@@ -857,6 +866,10 @@
return mvcc;
}
+ public boolean isLoadingCfsOnDemandDefault() {
+ return this.isLoadingCfsOnDemandDefault;
+ }
+
/**
* Close down this HRegion. Flush the cache, shut down each HStore, don't
* service any more calls.
@@ -3467,6 +3480,15 @@
class RegionScannerImpl implements RegionScanner {
// Package local for testability
KeyValueHeap storeHeap = null;
+ /** Heap of key-values that are not essential for the provided filters and are thus read
+ * on demand, if on-demand column family loading is enabled.*/
+ KeyValueHeap joinedHeap = null;
+ /**
+ * If the joined heap data gathering is interrupted due to scan limits, this will
+ * contain the row for which we are populating the values.*/
+ private KeyValue joinedContinuationRow = null;
+ // KeyValue indicating that limit is reached when scanning
+ private final KeyValue KV_LIMIT = new KeyValue();
private final byte [] stopRow;
private Filter filter;
private List results = new ArrayList();
@@ -3506,7 +3528,10 @@
scannerReadPoints.put(this, this.readPt);
}
+ // Here we separate all scanners into two lists - scanner that provide data required
+ // by the filter to operate (scanners list) and all others (joinedScanners list).
List scanners = new ArrayList();
+ List joinedScanners = new ArrayList();
if (additionalScanners != null) {
scanners.addAll(additionalScanners);
}
@@ -3515,9 +3540,17 @@
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
KeyValueScanner scanner = store.getScanner(scan, entry.getValue());
- scanners.add(scanner);
+ if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
+ || this.filter.isFamilyEssential(entry.getKey())) {
+ scanners.add(scanner);
+ } else {
+ joinedScanners.add(scanner);
+ }
}
this.storeHeap = new KeyValueHeap(scanners, comparator);
+ if (!joinedScanners.isEmpty()) {
+ this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
+ }
}
RegionScannerImpl(Scan scan) throws IOException {
@@ -3599,6 +3632,42 @@
return next(outResults, batch, metric);
}
+ private void populateFromJoinedHeap(int limit, String metric) throws IOException {
+ assert joinedContinuationRow != null;
+ KeyValue kv = populateResult(this.joinedHeap, limit, joinedContinuationRow.getBuffer(),
+ joinedContinuationRow.getRowOffset(), joinedContinuationRow.getRowLength(), metric);
+ if (kv != KV_LIMIT) {
+ // We are done with this row, reset the continuation.
+ joinedContinuationRow = null;
+ }
+ // As the data is obtained from two independent heaps, we need to
+ // ensure that result list is sorted, because Result relies on that.
+ Collections.sort(results, comparator);
+ }
+
+ /**
+ * Fetches records with this row into result list, until next row or limit (if not -1).
+ * @param heap KeyValueHeap to fetch data from. It must be positioned on correct row before call.
+ * @param limit Max amount of KVs to place in result list, -1 means no limit.
+ * @param currentRow Byte array with key we are fetching.
+ * @param offset offset for currentRow
+ * @param length length for currentRow
+ * @param metric Metric key to be passed into KeyValueHeap::next().
+ * @return true if limit reached, false otherwise.
+ */
+ private KeyValue populateResult(KeyValueHeap heap, int limit, byte[] currentRow, int offset,
+ short length, String metric) throws IOException {
+ KeyValue nextKv;
+ do {
+ heap.next(results, limit - results.size(), metric);
+ if (limit > 0 && results.size() == limit) {
+ return KV_LIMIT;
+ }
+ nextKv = heap.peek();
+ } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
+ return nextKv;
+ }
+
/*
* @return True if a filter rules the scanner is over, done.
*/
@@ -3608,6 +3677,11 @@
private boolean nextInternal(int limit, String metric) throws IOException {
RpcCallContext rpcCall = HBaseServer.getCurrentCall();
+ // The loop here is used only when at some point during the next we determine
+ // that due to effects of filters or otherwise, we have an empty row in the result.
+ // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
+ // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
+ // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
while (true) {
if (rpcCall != null) {
// If a user specifies a too-restrictive or too-slow scanner, the
@@ -3617,7 +3691,9 @@
rpcCall.throwExceptionIfCallerDisconnected();
}
+ // Let's see what we have in the storeHeap.
KeyValue current = this.storeHeap.peek();
+
byte[] currentRow = null;
int offset = 0;
short length = 0;
@@ -3626,41 +3702,48 @@
offset = current.getRowOffset();
length = current.getRowLength();
}
- if (isStopRow(currentRow, offset, length)) {
- if (filter != null && filter.hasFilterRow()) {
- filter.filterRow(results);
+ boolean stopRow = isStopRow(currentRow, offset, length);
+ // Check if we were getting data from the joinedHeap abd hit the limit.
+ // If not, then it's main path - getting results from storeHeap.
+ if (joinedContinuationRow == null) {
+ // First, check if we are at a stop row. If so, there are no more results.
+ if (stopRow) {
+ if (filter != null && filter.hasFilterRow()) {
+ filter.filterRow(results);
+ }
+ if (filter != null && filter.filterRow()) {
+ results.clear();
+ }
+ return false;
}
- if (filter != null && filter.filterRow()) {
- results.clear();
+
+ // Check if rowkey filter wants to exclude this row. If so, loop to next.
+ // Techically, if we hit limits before on this row, we don't need this call.
+ if (filterRowKey(currentRow, offset, length)) {
+ nextRow(currentRow, offset, length);
+ continue;
}
- return false;
- } else if (filterRowKey(currentRow, offset, length)) {
- nextRow(currentRow, offset, length);
- } else {
- KeyValue nextKv;
- do {
- this.storeHeap.next(results, limit - results.size(), metric);
- if (limit > 0 && results.size() == limit) {
- if (this.filter != null && filter.hasFilterRow()) {
- throw new IncompatibleFilterException(
- "Filter with filterRow(List) incompatible with scan with limit!");
- }
- return true; // we are expecting more yes, but also limited to how many we can return.
+ // Ok, we are good, let's try to get some results from the main heap.
+ KeyValue nextKv = populateResult(this.storeHeap, limit, currentRow, offset, length, metric);
+ if (nextKv == KV_LIMIT) {
+ if (this.filter != null && filter.hasFilterRow()) {
+ throw new IncompatibleFilterException(
+ "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
}
- nextKv = this.storeHeap.peek();
- } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
+ return true; // We hit the limit.
+ }
+ stopRow = nextKv == null || isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
+ // save that the row was empty before filters applied to it.
+ final boolean isEmptyRow = results.isEmpty();
- final boolean stopRow = nextKv == null || isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
-
- // now that we have an entire row, lets process with a filters:
-
- // first filter with the filterRow(List)
+ // We have the part of the row necessary for filtering (all of it, usually).
+ // First filter with the filterRow(List).
if (filter != null && filter.hasFilterRow()) {
filter.filterRow(results);
}
- if (results.isEmpty() || filterRow()) {
+ if (isEmptyRow || filterRow()) {
// this seems like a redundant step - we already consumed the row
// there're no left overs.
// the reasons for calling this method are:
@@ -3669,12 +3752,48 @@
nextRow(currentRow, offset, length);
// This row was totally filtered out, if this is NOT the last row,
- // we should continue on.
-
+ // we should continue on. Otherwise, nothing else to do.
if (!stopRow) continue;
+ return false;
}
- return !stopRow;
+
+ // Ok, we are done with storeHeap for this row.
+ // Now we may need to fetch additional, non-essential data into row.
+ // These values are not needed for filter to work, so we postpone their
+ // fetch to (possibly) reduce amount of data loads from disk.
+ if (this.joinedHeap != null) {
+ KeyValue nextJoinedKv = joinedHeap.peek();
+ // If joinedHeap is pointing to some other row, try to seek to a correct one.
+ // We don't need to recheck that row here - populateResult will take care of that.
+ boolean mayHaveData =
+ (nextJoinedKv != null && nextJoinedKv.matchingRow(currentRow, offset, length))
+ || this.joinedHeap.seek(KeyValue.createFirstOnRow(currentRow, offset, length));
+ if (mayHaveData) {
+ joinedContinuationRow = current;
+ populateFromJoinedHeap(limit, metric);
+ }
+ }
+ } else {
+ // Populating from the joined map was stopped by limits, populate some more.
+ populateFromJoinedHeap(limit, metric);
}
+
+ // We may have just called populateFromJoinedMap and hit the limits. If that is
+ // the case, we need to call it again on the next next() invocation.
+ if (joinedContinuationRow != null) {
+ return true;
+ }
+
+ // Finally, we are done with both joinedHeap and storeHeap.
+ // Double check to prevent empty rows from appearing in result. It could be
+ // the case when SingleValueExcludeFilter is used.
+ if (results.isEmpty()) {
+ nextRow(currentRow, offset, length);
+ if (!stopRow) continue;
+ }
+
+ // We are done. Return the result.
+ return !stopRow;
}
}
@@ -3709,6 +3828,10 @@
storeHeap.close();
storeHeap = null;
}
+ if (joinedHeap != null) {
+ joinedHeap.close();
+ joinedHeap = null;
+ }
// no need to sychronize here.
scannerReadPoints.remove(this);
this.filterClosed = true;
@@ -3723,16 +3846,21 @@
if (row == null) {
throw new IllegalArgumentException("Row cannot be null.");
}
+ boolean result = false;
startRegionOperation();
try {
// This could be a new thread from the last time we called next().
MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
KeyValue kv = KeyValue.createFirstOnRow(row);
// use request seek to make use of the lazy seek option. See HBASE-5520
- return this.storeHeap.requestSeek(kv, true, true);
+ result = this.storeHeap.requestSeek(kv, true, true);
+ if (this.joinedHeap != null) {
+ result = this.joinedHeap.requestSeek(kv, true, true) || result;
+ }
} finally {
closeRegionOperation();
}
+ return result;
}
}
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1431831)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -2382,6 +2382,8 @@
try {
HRegion r = getRegion(regionName);
r.checkRow(scan.getStartRow(), "Scan");
+ scan.setLoadColumnFamiliesOnDemand(r.isLoadingCfsOnDemandDefault()
+ || scan.doLoadColumnFamiliesOnDemand());
r.prepareScanner(scan);
RegionScanner s = null;
if (r.getCoprocessorHost() != null) {
Index: src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java (revision 1431831)
+++ src/test/java/org/apache/hadoop/hbase/filter/TestSingleColumnValueExcludeFilter.java (working copy)
@@ -27,6 +27,9 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.experimental.categories.Category;
+import java.util.List;
+import java.util.ArrayList;
+
/**
* Tests for {@link SingleColumnValueExcludeFilter}. Because this filter
* extends {@link SingleColumnValueFilter}, only the added functionality is
@@ -53,16 +56,18 @@
CompareOp.EQUAL, VAL_1);
// A 'match' situation
- KeyValue kv;
- kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
- // INCLUDE expected because test column has not yet passed
- assertTrue("otherColumn", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
- kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1);
- // Test column will pass (will match), will SKIP because test columns are excluded
- assertTrue("testedMatch", filter.filterKeyValue(kv) == Filter.ReturnCode.SKIP);
- // Test column has already passed and matched, all subsequent columns are INCLUDE
- kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
- assertTrue("otherColumn", filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE);
+ List kvs = new ArrayList();
+ KeyValue kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1);
+
+ kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1));
+ kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VAL_1));
+ kvs.add (new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1));
+
+ filter.filterRow(kvs);
+
+ assertEquals("resultSize", kvs.size(), 2);
+ assertTrue("leftKV1", KeyValue.COMPARATOR.compare(kvs.get(0), kv) == 0);
+ assertTrue("leftKV2", KeyValue.COMPARATOR.compare(kvs.get(1), kv) == 0);
assertFalse("allRemainingWhenMatch", filter.filterAllRemaining());
// A 'mismatch' situation
Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1431831)
+++ src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy)
@@ -20,6 +20,8 @@
package org.apache.hadoop.hbase.regionserver;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
@@ -66,9 +68,11 @@
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.NullComparator;
import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -200,7 +204,7 @@
System.out.println(results);
assertEquals(0, results.size());
}
-
+
@Test
public void testToShowNPEOnRegionScannerReseek() throws Exception{
String method = "testToShowNPEOnRegionScannerReseek";
@@ -2816,6 +2820,173 @@
}
}
+ /**
+ * Added for HBASE-5416
+ *
+ * Here we test scan optimization when only subset of CFs are used in filter
+ * conditions.
+ */
+ public void testScanner_JoinedScanners() throws IOException {
+ byte [] tableName = Bytes.toBytes("testTable");
+ byte [] cf_essential = Bytes.toBytes("essential");
+ byte [] cf_joined = Bytes.toBytes("joined");
+ byte [] cf_alpha = Bytes.toBytes("alpha");
+ this.region = initHRegion(tableName, getName(), conf, cf_essential, cf_joined, cf_alpha);
+ try {
+ byte [] row1 = Bytes.toBytes("row1");
+ byte [] row2 = Bytes.toBytes("row2");
+ byte [] row3 = Bytes.toBytes("row3");
+
+ byte [] col_normal = Bytes.toBytes("d");
+ byte [] col_alpha = Bytes.toBytes("a");
+
+ byte [] filtered_val = Bytes.toBytes(3);
+
+ Put put = new Put(row1);
+ put.add(cf_essential, col_normal, Bytes.toBytes(1));
+ put.add(cf_joined, col_alpha, Bytes.toBytes(1));
+ region.put(put);
+
+ put = new Put(row2);
+ put.add(cf_essential, col_alpha, Bytes.toBytes(2));
+ put.add(cf_joined, col_normal, Bytes.toBytes(2));
+ put.add(cf_alpha, col_alpha, Bytes.toBytes(2));
+ region.put(put);
+
+ put = new Put(row3);
+ put.add(cf_essential, col_normal, filtered_val);
+ put.add(cf_joined, col_normal, filtered_val);
+ region.put(put);
+
+ // Check two things:
+ // 1. result list contains expected values
+ // 2. result list is sorted properly
+
+ Scan scan = new Scan();
+ Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal,
+ CompareOp.NOT_EQUAL, filtered_val);
+ scan.setFilter(filter);
+ scan.setLoadColumnFamiliesOnDemand(true);
+ InternalScanner s = region.getScanner(scan);
+
+ List results = new ArrayList();
+ assertTrue(s.next(results));
+ assertEquals(results.size(), 1);
+ results.clear();
+
+ assertTrue(s.next(results));
+ assertEquals(results.size(), 3);
+ assertTrue("orderCheck", results.get(0).matchingFamily(cf_alpha));
+ assertTrue("orderCheck", results.get(1).matchingFamily(cf_essential));
+ assertTrue("orderCheck", results.get(2).matchingFamily(cf_joined));
+ results.clear();
+
+ assertFalse(s.next(results));
+ assertEquals(results.size(), 0);
+ } finally {
+ HRegion.closeHRegion(this.region);
+ this.region = null;
+ }
+ }
+
+ /**
+ * HBASE-5416
+ *
+ * Test case when scan limits amount of KVs returned on each next() call.
+ */
+ public void testScanner_JoinedScannersWithLimits() throws IOException {
+ final byte [] tableName = Bytes.toBytes("testTable");
+ final byte [] cf_first = Bytes.toBytes("first");
+ final byte [] cf_second = Bytes.toBytes("second");
+
+ this.region = initHRegion(tableName, getName(), conf, cf_first, cf_second);
+ try {
+ final byte [] col_a = Bytes.toBytes("a");
+ final byte [] col_b = Bytes.toBytes("b");
+
+ Put put;
+
+ for (int i = 0; i < 10; i++) {
+ put = new Put(Bytes.toBytes("r" + Integer.toString(i)));
+ put.add(cf_first, col_a, Bytes.toBytes(i));
+ if (i < 5) {
+ put.add(cf_first, col_b, Bytes.toBytes(i));
+ put.add(cf_second, col_a, Bytes.toBytes(i));
+ put.add(cf_second, col_b, Bytes.toBytes(i));
+ }
+ region.put(put);
+ }
+
+ Scan scan = new Scan();
+ scan.setLoadColumnFamiliesOnDemand(true);
+ Filter bogusFilter = new FilterBase() {
+ @Override
+ public boolean isFamilyEssential(byte[] name) {
+ return Bytes.equals(name, cf_first);
+ }
+ @Override
+ public void readFields(DataInput arg0) throws IOException {
+ }
+
+ @Override
+ public void write(DataOutput arg0) throws IOException {
+ }
+ };
+
+ scan.setFilter(bogusFilter);
+ InternalScanner s = region.getScanner(scan);
+
+ // Our data looks like this:
+ // r0: first:a, first:b, second:a, second:b
+ // r1: first:a, first:b, second:a, second:b
+ // r2: first:a, first:b, second:a, second:b
+ // r3: first:a, first:b, second:a, second:b
+ // r4: first:a, first:b, second:a, second:b
+ // r5: first:a
+ // r6: first:a
+ // r7: first:a
+ // r8: first:a
+ // r9: first:a
+
+ // But due to next's limit set to 3, we should get this:
+ // r0: first:a, first:b, second:a
+ // r0: second:b
+ // r1: first:a, first:b, second:a
+ // r1: second:b
+ // r2: first:a, first:b, second:a
+ // r2: second:b
+ // r3: first:a, first:b, second:a
+ // r3: second:b
+ // r4: first:a, first:b, second:a
+ // r4: second:b
+ // r5: first:a
+ // r6: first:a
+ // r7: first:a
+ // r8: first:a
+ // r9: first:a
+
+ List results = new ArrayList();
+ int index = 0;
+ while (true) {
+ boolean more = s.next(results, 3);
+ if ((index >> 1) < 5) {
+ if (index % 2 == 0)
+ assertEquals(results.size(), 3);
+ else
+ assertEquals(results.size(), 1);
+ }
+ else
+ assertEquals(results.size(), 1);
+ results.clear();
+ index++;
+ if (!more) break;
+ }
+ } finally {
+ HRegion.closeHRegion(this.region);
+ this.region = null;
+ }
+ }
+
//////////////////////////////////////////////////////////////////////////////
// Split test
//////////////////////////////////////////////////////////////////////////////