diff --git src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxRegion.java src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxRegion.java index 0cb6ff7..6ff7b0c 100644 --- src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxRegion.java +++ src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxRegion.java @@ -287,6 +287,17 @@ public class IdxRegion extends HRegion { return result; } + /** + * {@inheritDoc} + *

+ * Fast forwards the scanner by calling {@link #seekNext()}. + */ + @Override + protected void nextRow(byte[] currentRow) throws IOException { + seekNext(); + super.nextRow(currentRow); + } + protected void seekNext() throws IOException { KeyValue keyValue; do { diff --git src/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 50904f4..8b58f5d 100644 --- src/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -53,7 +53,10 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.io.UnsupportedEncodingException; + import java.lang.reflect.Constructor; + import java.util.AbstractList; import java.util.ArrayList; + import java.util.Collection; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -64,7 +67,6 @@ package org.apache.hadoop.hbase.regionserver; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; - import java.lang.reflect.Constructor; /** * HRegion stores data for a certain region of a table. It stores all columns @@ -1747,7 +1749,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ this(scan, null); } - private void resetFilters() { + /** + * Reset both the filter and the old filter. + */ + protected void resetFilters() { if (filter != null) { filter.reset(); } @@ -1764,7 +1769,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } results.clear(); boolean returnResult = nextInternal(); - if (!returnResult && filter != null && filter.filterRow()) { + if (!returnResult && filterRow()) { results.clear(); } outResults.addAll(results); @@ -1779,71 +1784,78 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * @return True if a filter rules the scanner is over, done. */ boolean isFilterDone() { - return this.filter != null && this.filter.filterAllRemaining(); + return + (this.filter != null && this.filter.filterAllRemaining()) || + (this.oldFilter != null && oldFilter.filterAllRemaining()); } + /* - * @return true if there are more rows, false if scanner is done - * @throws IOException - */ + * @return true if there are more rows, false if scanner is done + * @throws IOException + */ private boolean nextInternal() throws IOException { - byte [] currentRow = null; - boolean filterCurrentRow = false; while (true) { - KeyValue kv = this.storeHeap.peek(); - if (kv == null) return false; - byte [] row = kv.getRow(); - boolean samerow = Bytes.equals(currentRow, row); - if (samerow && filterCurrentRow) { - // Filter all columns until row changes - readAndDumpCurrentResult(); - continue; - } - if (!samerow) { - // Continue on the next row: - currentRow = row; - filterCurrentRow = false; - // See if we passed stopRow - if (this.stopRow != null && - comparator.compareRows(this.stopRow, 0, this.stopRow.length, - currentRow, 0, currentRow.length) <= 0) { - return false; + byte[] currentRow = peekRow(); + if (isStopRow(currentRow)) { + return false; + } else if (filterRowKey(currentRow)) { + nextRow(currentRow); + } else { + byte[] nextRow; + do { + this.storeHeap.next(results); + } while (Bytes.equals(currentRow, nextRow = peekRow())); + + final boolean stopRow = isStopRow(nextRow); + if (!stopRow && (results.isEmpty() || filterRow())) { + // this seems like a redundant step - we already consumed the row + // there're no left overs. + // the reasons for calling this method are: + // 1. reset the filters. + // 2. provide a hook to fast forward the row (used by subclasses) + nextRow(currentRow); + continue; } - if (hasResults()) return true; - } - // See if current row should be filtered based on row key - if ((this.filter != null && this.filter.filterRowKey(row, 0, row.length)) || - (oldFilter != null && this.oldFilter.filterRowKey(row, 0, row.length))) { - readAndDumpCurrentResult(); - resetFilters(); - filterCurrentRow = true; - currentRow = row; - continue; + return !stopRow; } - this.storeHeap.next(results); } } - private void readAndDumpCurrentResult() throws IOException { - this.storeHeap.next(this.results); - this.results.clear(); - } - - /* - * Do we have results to return or should we continue. Call when we get to - * the end of a row. Does house cleaning -- clearing results and resetting - * filters -- if we are to continue. - * @return True if we should return else false if need to keep going. + /** + * Reset state and move to the next row. + * + * @param currentRow the current row + * @throws IOException by store heap */ - private boolean hasResults() { - if (this.results.isEmpty() || - this.filter != null && this.filter.filterRow()) { - // Make sure results is empty, reset filters - this.results.clear(); - resetFilters(); - return false; + protected void nextRow(byte[] currentRow) throws IOException { + while (Bytes.equals(currentRow, peekRow())) { + this.storeHeap.next(MOCKED_LIST); } - return true; + results.clear(); + resetFilters(); + } + + private boolean isStopRow(byte[] currentRow) { + return currentRow == null || + (this.stopRow != null && + comparator.compareRows(this.stopRow, 0, this.stopRow.length, + currentRow, 0, currentRow.length) <= 0); + } + + private boolean filterRow() { + return (filter != null && filter.filterRow()) || + oldFilter != null && oldFilter.filterRow(results); + } + + private byte[] peekRow() { + KeyValue kv = this.storeHeap.peek(); + return kv == null ? null : kv.getRow(); + } + + private boolean filterRowKey(byte[] row) { + return (this.filter != null && this.filter.filterRowKey(row, 0, row.length)) || + (oldFilter != null && this.oldFilter.filterRowKey(row, 0, row.length)); } public void close() { @@ -2617,4 +2629,31 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ if (bc != null) bc.shutdown(); } } -} + + /** + * A mocked list implementaion - discards all updates. + */ + private static final List MOCKED_LIST = new AbstractList() { + + @Override + public void add(int index, KeyValue element) { + // do nothing + } + + @Override + public boolean addAll(int index, Collection c) { + return false; // this list is never changed as a result of an update + } + + @Override + public KeyValue get(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int size() { + return 0; + } + }; + + } diff --git src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java index f00aea5..dd659df 100644 --- src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java +++ src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java @@ -19,12 +19,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseTestCase; @@ -38,12 +32,14 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.InclusiveStopFilter; import org.apache.hadoop.hbase.filter.InclusiveStopRowFilter; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.PrefixRowFilter; import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.WhileMatchFilter; import org.apache.hadoop.hbase.filter.WhileMatchRowFilter; import org.apache.hadoop.hbase.io.hfile.Compression; @@ -51,6 +47,12 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hdfs.MiniDFSCluster; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + /** * Test of a long-lived scanner validating as we go. */ @@ -118,6 +120,7 @@ public class TestScanner extends HBaseTestCase { } s.close(); assertEquals(0, count); + assertEquals(1, results.size()); // Now do something a bit more imvolved. scan = new Scan(startrow, stoprow); scan.addFamily(HConstants.CATALOG_FAMILY); @@ -525,4 +528,43 @@ public class TestScanner extends HBaseTestCase { LOG.info("Found " + count + " items"); return count; } + + + /** + * When there's more than one column it changes the configuration of the + * KeyValueHeap and triggers a different execution path in the RegionScanner. + */ + public void testScanWithTwoColumns() throws IOException { + this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null); + final byte[] row1 = Bytes.toBytes("row1"); + final byte[] row2 = Bytes.toBytes("row2"); + final byte[] qual1 = Bytes.toBytes("a"); + final byte[] qual2 = Bytes.toBytes("b"); + final byte[] val1 = Bytes.toBytes(1); + final byte[] val2 = Bytes.toBytes(-1); + /** + * prime the region. + */ + Put put1 = new Put(row1); + put1.add(HConstants.CATALOG_FAMILY,qual1, val1); + put1.add(HConstants.CATALOG_FAMILY,qual2, val1); + r.put(put1); + Put put2 = new Put(row2); + put2.add(HConstants.CATALOG_FAMILY, qual1, val2); + put2.add(HConstants.CATALOG_FAMILY, qual2, val2); + r.put(put2); + /** + * Scan for the second row. + */ + Scan scan = new Scan(); + scan.setFilter(new SingleColumnValueFilter(HConstants.CATALOG_FAMILY, + qual2, CompareFilter.CompareOp.EQUAL, val2)); + + InternalScanner scanner1 = r.getScanner(scan); + List res = new ArrayList(); + assertFalse(scanner1.next(res)); + assertEquals(2, res.size()); + scanner1.close(); + } + }