diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c62bd4f24..8074a54e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5970,10 +5970,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ class RegionScannerImpl implements RegionScanner { // Package local for testability - KeyValueHeap storeHeap = null; + StoreHeap storeHeap = null; /** Heap of key-values that are not essential for the provided filters and are thus read * on demand, if on-demand column family loading is enabled.*/ - KeyValueHeap joinedHeap = null; + StoreHeap joinedHeap = null; /** * If the joined heap data gathering is interrupted due to scan limits, this will * contain the row for which we are populating the values.*/ @@ -6072,9 +6072,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected void initializeKVHeap(List scanners, List joinedScanners, HRegion region) throws IOException { - this.storeHeap = new KeyValueHeap(scanners, region.comparator); + this.storeHeap = new StoreHeap(scanners, region.comparator); if (!joinedScanners.isEmpty()) { - this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator); + this.joinedHeap = new StoreHeap(joinedScanners, region.comparator); } } @@ -6219,7 +6219,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param length length for currentRow * @return state of last call to {@link KeyValueHeap#next()} */ - private boolean populateResult(List results, KeyValueHeap heap, + private boolean populateResult(List results, StoreHeap heap, ScannerContext scannerContext, byte[] currentRow, int offset, short length) throws IOException { Cell nextKv; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index c0ba844d4..bb14c7e06 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -36,16 +36,11 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; *

* Implements KeyValueScanner itself. *

- * This class is used at the Region level to merge across Stores - * and at the Store level to merge across the memstore and StoreFiles. - *

- * In the Region case, we also need InternalScanner.next(List), so this class - * also implements InternalScanner. WARNING: As is, if you try to use this - * as an InternalScanner at the Store level, you will get runtime exceptions. + * This class is used at the Store level to merge across the memstore and StoreFiles. */ @InterfaceAudience.Private public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner - implements KeyValueScanner, InternalScanner { + implements KeyValueScanner { private static final Log LOG = LogFactory.getLog(KeyValueHeap.class); protected PriorityQueue heap = null; @@ -128,51 +123,6 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner return kvReturn; } - /** - * Gets the next row of keys from the top-most scanner. - *

- * This method takes care of updating the heap. - *

- * This can ONLY be called when you are using Scanners that implement InternalScanner as well as - * KeyValueScanner (a {@link StoreScanner}). - * @param result - * @return true if more rows exist after this one, false if scanner is done - */ - @Override - public boolean next(List result) throws IOException { - return next(result, NoLimitScannerContext.getInstance()); - } - - @Override - public boolean next(List result, ScannerContext scannerContext) throws IOException { - if (this.current == null) { - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } - InternalScanner currentAsInternal = (InternalScanner)this.current; - boolean moreCells = currentAsInternal.next(result, scannerContext); - Cell pee = this.current.peek(); - - /* - * By definition, any InternalScanner must return false only when it has no - * further rows to be fetched. So, we can close a scanner if it returns - * false. All existing implementations seem to be fine with this. It is much - * more efficient to close scanners which are not needed than keep them in - * the heap. This is also required for certain optimizations. - */ - - if (pee == null || !moreCells) { - this.current.close(); - } else { - this.heap.add(this.current); - } - this.current = null; - this.current = pollRealKV(); - if (this.current == null) { - moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } - return moreCells; - } - protected static class KVScannerComparator implements Comparator { protected KVComparator kvComparator; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java index d50131375..481e3e896 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.KeyValue.KVComparator; * if it is already at the end of one row when calling next(). */ @InterfaceAudience.Private -public class ReversedKeyValueHeap extends KeyValueHeap { +public class ReversedKeyValueHeap extends StoreHeap { /** * @param scanners diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreHeap.java new file mode 100644 index 000000000..0df3f387f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreHeap.java @@ -0,0 +1,89 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; + +/** + * Implements a heap merge across any number of KeyValueScanners. + *

+ * Implements KeyValueScanner itself. + *

+ * This class is used at the Region level to merge across Stores. + *

+ * In the Region case, we also need InternalScanner.next(List), so this class + * also implements InternalScanner. + */ +@InterfaceAudience.Private +public class StoreHeap extends KeyValueHeap implements InternalScanner { + /** + * Constructor. This KeyValueHeap will handle closing of passed in + * KeyValueScanners. + * @param scanners + * @param comparator + */ + public StoreHeap(List scanners, + KVComparator comparator) throws IOException { + super(scanners, comparator); + } + + /** + * Constructor. + * @param scanners + * @param comparator + * @throws IOException + */ + StoreHeap(List scanners, + KVScannerComparator comparator) throws IOException { + super(scanners, comparator); + } + + /** + * Gets the next row of keys from the top-most scanner. + *

+ * This method takes care of updating the heap. + *

+ * This can ONLY be called when you are using Scanners that implement InternalScanner as well as + * KeyValueScanner (a {@link StoreScanner}). + * @param result + * @return true if more rows exist after this one, false if scanner is done + */ + @Override + public boolean next(List result) throws IOException { + return next(result, NoLimitScannerContext.getInstance()); + } + + @Override + public boolean next(List result, ScannerContext scannerContext) throws IOException { + if (this.current == null) { + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } + InternalScanner currentAsInternal = (InternalScanner)this.current; + boolean moreCells = currentAsInternal.next(result, scannerContext); + Cell pee = this.current.peek(); + + /* + * By definition, any InternalScanner must return false only when it has no + * further rows to be fetched. So, we can close a scanner if it returns + * false. All existing implementations seem to be fine with this. It is much + * more efficient to close scanners which are not needed than keep them in + * the heap. This is also required for certain optimizations. + */ + + if (pee == null || !moreCells) { + this.current.close(); + } else { + this.heap.add(this.current); + } + this.current = null; + this.current = pollRealKV(); + if (this.current == null) { + moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } + return moreCells; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java index fe938b375..5f79f61b0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java @@ -542,7 +542,7 @@ public class TestScannerHeartbeatMessages { * Custom KV Heap that can be configured to sleep/wait in between retrievals of column family * cells. Useful for testing */ - private static final class HeartbeatKVHeap extends KeyValueHeap { + private static final class HeartbeatKVHeap extends StoreHeap { public HeartbeatKVHeap(List scanners, KVComparator comparator) throws IOException { super(scanners, comparator);