diff --git src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxRegion.java src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxRegion.java index 0cb6ff7..6ff7b0c 100644 --- src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxRegion.java +++ src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxRegion.java @@ -287,6 +287,17 @@ public class IdxRegion extends HRegion { return result; } + /** + * {@inheritDoc} + *
+ * Fast forwards the scanner by calling {@link #seekNext()}. + */ + @Override + protected void nextRow(byte[] currentRow) throws IOException { + seekNext(); + super.nextRow(currentRow); + } + protected void seekNext() throws IOException { KeyValue keyValue; do { diff --git src/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 50904f4..8b58f5d 100644 --- src/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -53,7 +53,10 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.io.UnsupportedEncodingException; + import java.lang.reflect.Constructor; + import java.util.AbstractList; import java.util.ArrayList; + import java.util.Collection; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -64,7 +67,6 @@ package org.apache.hadoop.hbase.regionserver; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; - import java.lang.reflect.Constructor; /** * HRegion stores data for a certain region of a table. It stores all columns @@ -1747,7 +1749,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ this(scan, null); } - private void resetFilters() { + /** + * Reset both the filter and the old filter. + */ + protected void resetFilters() { if (filter != null) { filter.reset(); } @@ -1764,7 +1769,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } results.clear(); boolean returnResult = nextInternal(); - if (!returnResult && filter != null && filter.filterRow()) { + if (!returnResult && filterRow()) { results.clear(); } outResults.addAll(results); @@ -1779,71 +1784,78 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * @return True if a filter rules the scanner is over, done. */ boolean isFilterDone() { - return this.filter != null && this.filter.filterAllRemaining(); + return + (this.filter != null && this.filter.filterAllRemaining()) || + (this.oldFilter != null && oldFilter.filterAllRemaining()); } + /* - * @return true if there are more rows, false if scanner is done - * @throws IOException - */ + * @return true if there are more rows, false if scanner is done + * @throws IOException + */ private boolean nextInternal() throws IOException { - byte [] currentRow = null; - boolean filterCurrentRow = false; while (true) { - KeyValue kv = this.storeHeap.peek(); - if (kv == null) return false; - byte [] row = kv.getRow(); - boolean samerow = Bytes.equals(currentRow, row); - if (samerow && filterCurrentRow) { - // Filter all columns until row changes - readAndDumpCurrentResult(); - continue; - } - if (!samerow) { - // Continue on the next row: - currentRow = row; - filterCurrentRow = false; - // See if we passed stopRow - if (this.stopRow != null && - comparator.compareRows(this.stopRow, 0, this.stopRow.length, - currentRow, 0, currentRow.length) <= 0) { - return false; + byte[] currentRow = peekRow(); + if (isStopRow(currentRow)) { + return false; + } else if (filterRowKey(currentRow)) { + nextRow(currentRow); + } else { + byte[] nextRow; + do { + this.storeHeap.next(results); + } while (Bytes.equals(currentRow, nextRow = peekRow())); + + final boolean stopRow = isStopRow(nextRow); + if (!stopRow && (results.isEmpty() || filterRow())) { + // this seems like a redundant step - we already consumed the row + // there're no left overs. + // the reasons for calling this method are: + // 1. reset the filters. + // 2. provide a hook to fast forward the row (used by subclasses) + nextRow(currentRow); + continue; } - if (hasResults()) return true; - } - // See if current row should be filtered based on row key - if ((this.filter != null && this.filter.filterRowKey(row, 0, row.length)) || - (oldFilter != null && this.oldFilter.filterRowKey(row, 0, row.length))) { - readAndDumpCurrentResult(); - resetFilters(); - filterCurrentRow = true; - currentRow = row; - continue; + return !stopRow; } - this.storeHeap.next(results); } } - private void readAndDumpCurrentResult() throws IOException { - this.storeHeap.next(this.results); - this.results.clear(); - } - - /* - * Do we have results to return or should we continue. Call when we get to - * the end of a row. Does house cleaning -- clearing results and resetting - * filters -- if we are to continue. - * @return True if we should return else false if need to keep going. + /** + * Reset state and move to the next row. + * + * @param currentRow the current row + * @throws IOException by store heap */ - private boolean hasResults() { - if (this.results.isEmpty() || - this.filter != null && this.filter.filterRow()) { - // Make sure results is empty, reset filters - this.results.clear(); - resetFilters(); - return false; + protected void nextRow(byte[] currentRow) throws IOException { + while (Bytes.equals(currentRow, peekRow())) { + this.storeHeap.next(MOCKED_LIST); } - return true; + results.clear(); + resetFilters(); + } + + private boolean isStopRow(byte[] currentRow) { + return currentRow == null || + (this.stopRow != null && + comparator.compareRows(this.stopRow, 0, this.stopRow.length, + currentRow, 0, currentRow.length) <= 0); + } + + private boolean filterRow() { + return (filter != null && filter.filterRow()) || + oldFilter != null && oldFilter.filterRow(results); + } + + private byte[] peekRow() { + KeyValue kv = this.storeHeap.peek(); + return kv == null ? null : kv.getRow(); + } + + private boolean filterRowKey(byte[] row) { + return (this.filter != null && this.filter.filterRowKey(row, 0, row.length)) || + (oldFilter != null && this.oldFilter.filterRowKey(row, 0, row.length)); } public void close() { @@ -2617,4 +2629,31 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ if (bc != null) bc.shutdown(); } } -} + + /** + * A mocked list implementaion - discards all updates. + */ + private static final List