diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c62bd4f..806ee06 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5970,10 +5970,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ class RegionScannerImpl implements RegionScanner { // Package local for testability - KeyValueHeap storeHeap = null; + StoreHeap storeHeap = null; /** Heap of key-values that are not essential for the provided filters and are thus read * on demand, if on-demand column family loading is enabled.*/ - KeyValueHeap joinedHeap = null; + StoreHeap joinedHeap = null; /** * If the joined heap data gathering is interrupted due to scan limits, this will * contain the row for which we are populating the values.*/ @@ -6072,9 +6072,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected void initializeKVHeap(List scanners, List joinedScanners, HRegion region) throws IOException { - this.storeHeap = new KeyValueHeap(scanners, region.comparator); + this.storeHeap = new StoreHeap(scanners, region.comparator); if (!joinedScanners.isEmpty()) { - this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator); + this.joinedHeap = new StoreHeap(joinedScanners, region.comparator); } } @@ -6217,9 +6217,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param currentRow Byte array with key we are fetching. * @param offset offset for currentRow * @param length length for currentRow - * @return state of last call to {@link KeyValueHeap#next()} + * @return state of last call to {@link StoreHeap#next()} */ - private boolean populateResult(List results, KeyValueHeap heap, + private boolean populateResult(List results, StoreHeap heap, ScannerContext scannerContext, byte[] currentRow, int offset, short length) throws IOException { Cell nextKv; @@ -6594,7 +6594,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.filterClosed = true; } - KeyValueHeap getStoreHeapForTesting() { + StoreHeap getStoreHeapForTesting() { return storeHeap; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java deleted file mode 100644 index c0ba844..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ /dev/null @@ -1,426 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.Comparator; -import java.util.List; -import java.util.PriorityQueue; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValue.KVComparator; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; - -/** - * Implements a heap merge across any number of KeyValueScanners. - *

- * Implements KeyValueScanner itself. - *

- * This class is used at the Region level to merge across Stores - * and at the Store level to merge across the memstore and StoreFiles. - *

- * In the Region case, we also need InternalScanner.next(List), so this class - * also implements InternalScanner. WARNING: As is, if you try to use this - * as an InternalScanner at the Store level, you will get runtime exceptions. - */ -@InterfaceAudience.Private -public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner - implements KeyValueScanner, InternalScanner { - private static final Log LOG = LogFactory.getLog(KeyValueHeap.class); - protected PriorityQueue heap = null; - - /** - * The current sub-scanner, i.e. the one that contains the next key/value - * to return to the client. This scanner is NOT included in {@link #heap} - * (but we frequently add it back to the heap and pull the new winner out). - * We maintain an invariant that the current sub-scanner has already done - * a real seek, and that current.peek() is always a real key/value (or null) - * except for the fake last-key-on-row-column supplied by the multi-column - * Bloom filter optimization, which is OK to propagate to StoreScanner. In - * order to ensure that, always use {@link #pollRealKV()} to update current. - */ - protected KeyValueScanner current = null; - - protected KVScannerComparator comparator; - - /** - * Constructor. This KeyValueHeap will handle closing of passed in - * KeyValueScanners. - * @param scanners - * @param comparator - */ - public KeyValueHeap(List scanners, - KVComparator comparator) throws IOException { - this(scanners, new KVScannerComparator(comparator)); - } - - /** - * Constructor. - * @param scanners - * @param comparator - * @throws IOException - */ - KeyValueHeap(List scanners, - KVScannerComparator comparator) throws IOException { - this.comparator = comparator; - if (!scanners.isEmpty()) { - this.heap = new PriorityQueue(scanners.size(), - this.comparator); - for (KeyValueScanner scanner : scanners) { - if (scanner.peek() != null) { - this.heap.add(scanner); - } else { - scanner.close(); - } - } - this.current = pollRealKV(); - } - } - - @Override - public Cell peek() { - if (this.current == null) { - return null; - } - return this.current.peek(); - } - - @Override - public Cell next() throws IOException { - if(this.current == null) { - return null; - } - Cell kvReturn = this.current.next(); - Cell kvNext = this.current.peek(); - if (kvNext == null) { - this.current.close(); - this.current = null; - this.current = pollRealKV(); - } else { - KeyValueScanner topScanner = this.heap.peek(); - // no need to add current back to the heap if it is the only scanner left - if (topScanner != null && this.comparator.compare(kvNext, topScanner.peek()) >= 0) { - this.heap.add(this.current); - this.current = null; - this.current = pollRealKV(); - } - } - return kvReturn; - } - - /** - * Gets the next row of keys from the top-most scanner. - *

- * This method takes care of updating the heap. - *

- * This can ONLY be called when you are using Scanners that implement InternalScanner as well as - * KeyValueScanner (a {@link StoreScanner}). - * @param result - * @return true if more rows exist after this one, false if scanner is done - */ - @Override - public boolean next(List result) throws IOException { - return next(result, NoLimitScannerContext.getInstance()); - } - - @Override - public boolean next(List result, ScannerContext scannerContext) throws IOException { - if (this.current == null) { - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } - InternalScanner currentAsInternal = (InternalScanner)this.current; - boolean moreCells = currentAsInternal.next(result, scannerContext); - Cell pee = this.current.peek(); - - /* - * By definition, any InternalScanner must return false only when it has no - * further rows to be fetched. So, we can close a scanner if it returns - * false. All existing implementations seem to be fine with this. It is much - * more efficient to close scanners which are not needed than keep them in - * the heap. This is also required for certain optimizations. - */ - - if (pee == null || !moreCells) { - this.current.close(); - } else { - this.heap.add(this.current); - } - this.current = null; - this.current = pollRealKV(); - if (this.current == null) { - moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } - return moreCells; - } - - protected static class KVScannerComparator implements Comparator { - protected KVComparator kvComparator; - /** - * Constructor - * @param kvComparator - */ - public KVScannerComparator(KVComparator kvComparator) { - this.kvComparator = kvComparator; - } - - @Override - public int compare(KeyValueScanner left, KeyValueScanner right) { - int comparison = compare(left.peek(), right.peek()); - if (comparison != 0) { - return comparison; - } else { - // Since both the keys are exactly the same, we break the tie in favor of higher ordered - // scanner since it'll have newer data. Since higher value should come first, we reverse - // sort here. - return Long.compare(right.getScannerOrder(), left.getScannerOrder()); - } - } - /** - * Compares two KeyValue - * @param left - * @param right - * @return less than 0 if left is smaller, 0 if equal etc.. - */ - public int compare(Cell left, Cell right) { - return this.kvComparator.compare(left, right); - } - /** - * @return KVComparator - */ - public KVComparator getComparator() { - return this.kvComparator; - } - } - - @Override - public void close() { - if (this.current != null) { - this.current.close(); - } - if (this.heap != null) { - KeyValueScanner scanner; - while ((scanner = this.heap.poll()) != null) { - scanner.close(); - } - } - } - - /** - * Seeks all scanners at or below the specified seek key. If we earlied-out - * of a row, we may end up skipping values that were never reached yet. - * Rather than iterating down, we want to give the opportunity to re-seek. - *

- * As individual scanners may run past their ends, those scanners are - * automatically closed and removed from the heap. - *

- * This function (and {@link #reseek(Cell)}) does not do multi-column - * Bloom filter and lazy-seek optimizations. To enable those, call - * {@link #requestSeek(Cell, boolean, boolean)}. - * @param seekKey KeyValue to seek at or after - * @return true if KeyValues exist at or after specified key, false if not - * @throws IOException - */ - @Override - public boolean seek(Cell seekKey) throws IOException { - return generalizedSeek(false, // This is not a lazy seek - seekKey, - false, // forward (false: this is not a reseek) - false); // Not using Bloom filters - } - - /** - * This function is identical to the {@link #seek(Cell)} function except - * that scanner.seek(seekKey) is changed to scanner.reseek(seekKey). - */ - @Override - public boolean reseek(Cell seekKey) throws IOException { - return generalizedSeek(false, // This is not a lazy seek - seekKey, - true, // forward (true because this is reseek) - false); // Not using Bloom filters - } - - /** - * {@inheritDoc} - */ - @Override - public boolean requestSeek(Cell key, boolean forward, - boolean useBloom) throws IOException { - return generalizedSeek(true, key, forward, useBloom); - } - - /** - * @param isLazy whether we are trying to seek to exactly the given row/col. - * Enables Bloom filter and most-recent-file-first optimizations for - * multi-column get/scan queries. - * @param seekKey key to seek to - * @param forward whether to seek forward (also known as reseek) - * @param useBloom whether to optimize seeks using Bloom filters - */ - private boolean generalizedSeek(boolean isLazy, Cell seekKey, - boolean forward, boolean useBloom) throws IOException { - if (!isLazy && useBloom) { - throw new IllegalArgumentException("Multi-column Bloom filter " + - "optimization requires a lazy seek"); - } - - if (current == null) { - return false; - } - heap.add(current); - current = null; - - KeyValueScanner scanner = null; - try { - while ((scanner = heap.poll()) != null) { - Cell topKey = scanner.peek(); - if (comparator.getComparator().compare(seekKey, topKey) <= 0) { - // Top KeyValue is at-or-after Seek KeyValue. We only know that all - // scanners are at or after seekKey (because fake keys of - // scanners where a lazy-seek operation has been done are not greater - // than their real next keys) but we still need to enforce our - // invariant that the top scanner has done a real seek. This way - // StoreScanner and RegionScanner do not have to worry about fake - // keys. - heap.add(scanner); - scanner = null; - current = pollRealKV(); - return current != null; - } - - boolean seekResult; - if (isLazy && heap.size() > 0) { - // If there is only one scanner left, we don't do lazy seek. - seekResult = scanner.requestSeek(seekKey, forward, useBloom); - } else { - seekResult = NonLazyKeyValueScanner.doRealSeek(scanner, seekKey, - forward); - } - - if (!seekResult) { - scanner.close(); - } else { - heap.add(scanner); - } - } - } catch (Exception e) { - if (scanner != null) { - try { - scanner.close(); - } catch (Exception ce) { - LOG.warn("close KeyValueScanner error", ce); - } - } - throw e; - } - - // Heap is returning empty, scanner is done - return false; - } - - /** - * Fetches the top sub-scanner from the priority queue, ensuring that a real - * seek has been done on it. Works by fetching the top sub-scanner, and if it - * has not done a real seek, making it do so (which will modify its top KV), - * putting it back, and repeating this until success. Relies on the fact that - * on a lazy seek we set the current key of a StoreFileScanner to a KV that - * is not greater than the real next KV to be read from that file, so the - * scanner that bubbles up to the top of the heap will have global next KV in - * this scanner heap if (1) it has done a real seek and (2) its KV is the top - * among all top KVs (some of which are fake) in the scanner heap. - */ - protected KeyValueScanner pollRealKV() throws IOException { - KeyValueScanner kvScanner = heap.poll(); - if (kvScanner == null) { - return null; - } - - while (kvScanner != null && !kvScanner.realSeekDone()) { - if (kvScanner.peek() != null) { - try { - kvScanner.enforceSeek(); - } catch (IOException ioe) { - kvScanner.close(); - throw ioe; - } - Cell curKV = kvScanner.peek(); - if (curKV != null) { - KeyValueScanner nextEarliestScanner = heap.peek(); - if (nextEarliestScanner == null) { - // The heap is empty. Return the only possible scanner. - return kvScanner; - } - - // Compare the current scanner to the next scanner. We try to avoid - // putting the current one back into the heap if possible. - Cell nextKV = nextEarliestScanner.peek(); - if (nextKV == null || comparator.compare(curKV, nextKV) < 0) { - // We already have the scanner with the earliest KV, so return it. - return kvScanner; - } - - // Otherwise, put the scanner back into the heap and let it compete - // against all other scanners (both those that have done a "real - // seek" and a "lazy seek"). - heap.add(kvScanner); - } else { - // Close the scanner because we did a real seek and found out there - // are no more KVs. - kvScanner.close(); - } - } else { - // Close the scanner because it has already run out of KVs even before - // we had to do a real seek on it. - kvScanner.close(); - } - kvScanner = heap.poll(); - } - - return kvScanner; - } - - /** - * @return the current Heap - */ - public PriorityQueue getHeap() { - return this.heap; - } - - /** - * @see KeyValueScanner#getScannerOrder() - */ - @Override - public long getScannerOrder() { - return 0; - } - - KeyValueScanner getCurrentForTesting() { - return current; - } - - @Override - public Cell getNextIndexedKey() { - // here we return the next index key from the top scanner - return current == null ? null : current.getNextIndexedKey(); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedHeap.java new file mode 100644 index 0000000..6a471cb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedHeap.java @@ -0,0 +1,195 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue.KVComparator; + +/** + * ReversedHeap is used for supporting reversed scanning. Compared with + * StoreHeap, its scanner comparator is a little different (see + * ReversedKVScannerComparator), all seek is backward seek(see + * {@link KeyValueScanner#backwardSeek}), and it will jump to the previous row + * if it is already at the end of one row when calling next(). + */ +@InterfaceAudience.Private +public class ReversedHeap extends StoreHeap { + + /** + * @param scanners + * @param comparator + * @throws IOException + */ + public ReversedHeap(List scanners, + KVComparator comparator) throws IOException { + super(scanners, new ReversedKVScannerComparator(comparator)); + } + + @Override + public boolean seek(Cell seekKey) throws IOException { + throw new IllegalStateException( + "seek cannot be called on ReversedKeyValueHeap"); + } + + @Override + public boolean reseek(Cell seekKey) throws IOException { + throw new IllegalStateException( + "reseek cannot be called on ReversedKeyValueHeap"); + } + + @Override + public boolean requestSeek(Cell key, boolean forward, boolean useBloom) + throws IOException { + throw new IllegalStateException( + "requestSeek cannot be called on ReversedKeyValueHeap"); + } + + @Override + public boolean seekToPreviousRow(Cell seekKey) throws IOException { + if (current == null) { + return false; + } + heap.add(current); + current = null; + + KeyValueScanner scanner; + while ((scanner = heap.poll()) != null) { + Cell topKey = scanner.peek(); + if (comparator.getComparator().compareRows(topKey.getRowArray(), + topKey.getRowOffset(), topKey.getRowLength(), seekKey.getRowArray(), + seekKey.getRowOffset(), seekKey.getRowLength()) < 0) { + // Row of Top KeyValue is before Seek row. + heap.add(scanner); + current = pollRealKV(); + return current != null; + } + + if (!scanner.seekToPreviousRow(seekKey)) { + scanner.close(); + } else { + heap.add(scanner); + } + } + + // Heap is returning empty, scanner is done + return false; + } + + @Override + public boolean backwardSeek(Cell seekKey) throws IOException { + if (current == null) { + return false; + } + heap.add(current); + current = null; + + KeyValueScanner scanner; + while ((scanner = heap.poll()) != null) { + Cell topKey = scanner.peek(); + if ((CellUtil.matchingRow(seekKey, topKey) && comparator + .getComparator().compare(seekKey, topKey) <= 0) + || comparator.getComparator().compareRows(seekKey, topKey) > 0) { + heap.add(scanner); + current = pollRealKV(); + return current != null; + } + if (!scanner.backwardSeek(seekKey)) { + scanner.close(); + } else { + heap.add(scanner); + } + } + return false; + } + + @Override + public Cell next() throws IOException { + if (this.current == null) { + return null; + } + Cell kvReturn = this.current.next(); + Cell kvNext = this.current.peek(); + if (kvNext == null + || this.comparator.kvComparator.compareRows(kvNext, kvReturn) > 0) { + if (this.current.seekToPreviousRow(kvReturn)) { + this.heap.add(this.current); + } else { + this.current.close(); + } + this.current = null; + this.current = pollRealKV(); + } else { + KeyValueScanner topScanner = this.heap.peek(); + if (topScanner != null + && this.comparator.compare(this.current, topScanner) > 0) { + this.heap.add(this.current); + this.current = null; + this.current = pollRealKV(); + } + } + return kvReturn; + } + + /** + * In ReversedKVScannerComparator, we compare the row of scanners' peek values + * first, sort bigger one before the smaller one. Then compare the KeyValue if + * they have the equal row, sort smaller one before the bigger one + */ + private static class ReversedKVScannerComparator extends + KVScannerComparator { + + /** + * Constructor + * @param kvComparator + */ + public ReversedKVScannerComparator(KVComparator kvComparator) { + super(kvComparator); + } + + @Override + public int compare(KeyValueScanner left, KeyValueScanner right) { + int rowComparison = compareRows(left.peek(), right.peek()); + if (rowComparison != 0) { + return -rowComparison; + } + return super.compare(left, right); + } + + /** + * Compares rows of two KeyValue + * @param left + * @param right + * @return less than 0 if left is smaller, 0 if equal etc.. + */ + public int compareRows(Cell left, Cell right) { + return super.kvComparator.compareRows(left, right); + } + } + + @Override + public boolean seekToLastRow() throws IOException { + throw new NotImplementedException("Not implemented"); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java deleted file mode 100644 index d501313..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java +++ /dev/null @@ -1,195 +0,0 @@ -/** - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.List; - -import org.apache.commons.lang.NotImplementedException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.KeyValue.KVComparator; - -/** - * ReversedKeyValueHeap is used for supporting reversed scanning. Compared with - * KeyValueHeap, its scanner comparator is a little different (see - * ReversedKVScannerComparator), all seek is backward seek(see - * {@link KeyValueScanner#backwardSeek}), and it will jump to the previous row - * if it is already at the end of one row when calling next(). - */ -@InterfaceAudience.Private -public class ReversedKeyValueHeap extends KeyValueHeap { - - /** - * @param scanners - * @param comparator - * @throws IOException - */ - public ReversedKeyValueHeap(List scanners, - KVComparator comparator) throws IOException { - super(scanners, new ReversedKVScannerComparator(comparator)); - } - - @Override - public boolean seek(Cell seekKey) throws IOException { - throw new IllegalStateException( - "seek cannot be called on ReversedKeyValueHeap"); - } - - @Override - public boolean reseek(Cell seekKey) throws IOException { - throw new IllegalStateException( - "reseek cannot be called on ReversedKeyValueHeap"); - } - - @Override - public boolean requestSeek(Cell key, boolean forward, boolean useBloom) - throws IOException { - throw new IllegalStateException( - "requestSeek cannot be called on ReversedKeyValueHeap"); - } - - @Override - public boolean seekToPreviousRow(Cell seekKey) throws IOException { - if (current == null) { - return false; - } - heap.add(current); - current = null; - - KeyValueScanner scanner; - while ((scanner = heap.poll()) != null) { - Cell topKey = scanner.peek(); - if (comparator.getComparator().compareRows(topKey.getRowArray(), - topKey.getRowOffset(), topKey.getRowLength(), seekKey.getRowArray(), - seekKey.getRowOffset(), seekKey.getRowLength()) < 0) { - // Row of Top KeyValue is before Seek row. - heap.add(scanner); - current = pollRealKV(); - return current != null; - } - - if (!scanner.seekToPreviousRow(seekKey)) { - scanner.close(); - } else { - heap.add(scanner); - } - } - - // Heap is returning empty, scanner is done - return false; - } - - @Override - public boolean backwardSeek(Cell seekKey) throws IOException { - if (current == null) { - return false; - } - heap.add(current); - current = null; - - KeyValueScanner scanner; - while ((scanner = heap.poll()) != null) { - Cell topKey = scanner.peek(); - if ((CellUtil.matchingRow(seekKey, topKey) && comparator - .getComparator().compare(seekKey, topKey) <= 0) - || comparator.getComparator().compareRows(seekKey, topKey) > 0) { - heap.add(scanner); - current = pollRealKV(); - return current != null; - } - if (!scanner.backwardSeek(seekKey)) { - scanner.close(); - } else { - heap.add(scanner); - } - } - return false; - } - - @Override - public Cell next() throws IOException { - if (this.current == null) { - return null; - } - Cell kvReturn = this.current.next(); - Cell kvNext = this.current.peek(); - if (kvNext == null - || this.comparator.kvComparator.compareRows(kvNext, kvReturn) > 0) { - if (this.current.seekToPreviousRow(kvReturn)) { - this.heap.add(this.current); - } else { - this.current.close(); - } - this.current = null; - this.current = pollRealKV(); - } else { - KeyValueScanner topScanner = this.heap.peek(); - if (topScanner != null - && this.comparator.compare(this.current, topScanner) > 0) { - this.heap.add(this.current); - this.current = null; - this.current = pollRealKV(); - } - } - return kvReturn; - } - - /** - * In ReversedKVScannerComparator, we compare the row of scanners' peek values - * first, sort bigger one before the smaller one. Then compare the KeyValue if - * they have the equal row, sort smaller one before the bigger one - */ - private static class ReversedKVScannerComparator extends - KVScannerComparator { - - /** - * Constructor - * @param kvComparator - */ - public ReversedKVScannerComparator(KVComparator kvComparator) { - super(kvComparator); - } - - @Override - public int compare(KeyValueScanner left, KeyValueScanner right) { - int rowComparison = compareRows(left.peek(), right.peek()); - if (rowComparison != 0) { - return -rowComparison; - } - return super.compare(left, right); - } - - /** - * Compares rows of two KeyValue - * @param left - * @param right - * @return less than 0 if left is smaller, 0 if equal etc.. - */ - public int compareRows(Cell left, Cell right) { - return super.kvComparator.compareRows(left, right); - } - } - - @Override - public boolean seekToLastRow() throws IOException { - throw new NotImplementedException("Not implemented"); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java index 7eec632..51f9d15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java @@ -51,9 +51,9 @@ class ReversedRegionScannerImpl extends RegionScannerImpl { @Override protected void initializeKVHeap(List scanners, List joinedScanners, HRegion region) throws IOException { - this.storeHeap = new ReversedKeyValueHeap(scanners, region.getComparator()); + this.storeHeap = new ReversedHeap(scanners, region.getComparator()); if (!joinedScanners.isEmpty()) { - this.joinedHeap = new ReversedKeyValueHeap(joinedScanners, + this.joinedHeap = new ReversedHeap(joinedScanners, region.getComparator()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java index 0f12b0a..683a7e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java @@ -65,7 +65,7 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner { protected void resetKVHeap(List scanners, KVComparator comparator) throws IOException { // Combine all seeked scanners with a heap - heap = new ReversedKeyValueHeap(scanners, comparator); + heap = new ReversedHeap(scanners, comparator); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileHeap.java new file mode 100644 index 0000000..3434489 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileHeap.java @@ -0,0 +1,376 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; + +/** + * Implements a heap merge across any number of KeyValueScanners. + *

+ * Implements KeyValueScanner itself. + *

+ * This class is used at the Store level to merge across the memstore and StoreFiles. + */ +@InterfaceAudience.Private +public class StoreFileHeap extends NonReversedNonLazyKeyValueScanner + implements KeyValueScanner { + private static final Log LOG = LogFactory.getLog(StoreFileHeap.class); + protected PriorityQueue heap = null; + + /** + * The current sub-scanner, i.e. the one that contains the next key/value + * to return to the client. This scanner is NOT included in {@link #heap} + * (but we frequently add it back to the heap and pull the new winner out). + * We maintain an invariant that the current sub-scanner has already done + * a real seek, and that current.peek() is always a real key/value (or null) + * except for the fake last-key-on-row-column supplied by the multi-column + * Bloom filter optimization, which is OK to propagate to StoreScanner. In + * order to ensure that, always use {@link #pollRealKV()} to update current. + */ + protected KeyValueScanner current = null; + + protected KVScannerComparator comparator; + + /** + * Constructor. This KeyValueHeap will handle closing of passed in + * KeyValueScanners. + * @param scanners + * @param comparator + */ + public StoreFileHeap(List scanners, + KVComparator comparator) throws IOException { + this(scanners, new KVScannerComparator(comparator)); + } + + /** + * Constructor. + * @param scanners + * @param comparator + * @throws IOException + */ + StoreFileHeap(List scanners, + KVScannerComparator comparator) throws IOException { + this.comparator = comparator; + if (!scanners.isEmpty()) { + this.heap = new PriorityQueue(scanners.size(), + this.comparator); + for (KeyValueScanner scanner : scanners) { + if (scanner.peek() != null) { + this.heap.add(scanner); + } else { + scanner.close(); + } + } + this.current = pollRealKV(); + } + } + + @Override + public Cell peek() { + if (this.current == null) { + return null; + } + return this.current.peek(); + } + + @Override + public Cell next() throws IOException { + if(this.current == null) { + return null; + } + Cell kvReturn = this.current.next(); + Cell kvNext = this.current.peek(); + if (kvNext == null) { + this.current.close(); + this.current = null; + this.current = pollRealKV(); + } else { + KeyValueScanner topScanner = this.heap.peek(); + // no need to add current back to the heap if it is the only scanner left + if (topScanner != null && this.comparator.compare(kvNext, topScanner.peek()) >= 0) { + this.heap.add(this.current); + this.current = null; + this.current = pollRealKV(); + } + } + return kvReturn; + } + + protected static class KVScannerComparator implements Comparator { + protected KVComparator kvComparator; + /** + * Constructor + * @param kvComparator + */ + public KVScannerComparator(KVComparator kvComparator) { + this.kvComparator = kvComparator; + } + + @Override + public int compare(KeyValueScanner left, KeyValueScanner right) { + int comparison = compare(left.peek(), right.peek()); + if (comparison != 0) { + return comparison; + } else { + // Since both the keys are exactly the same, we break the tie in favor of higher ordered + // scanner since it'll have newer data. Since higher value should come first, we reverse + // sort here. + return Long.compare(right.getScannerOrder(), left.getScannerOrder()); + } + } + /** + * Compares two KeyValue + * @param left + * @param right + * @return less than 0 if left is smaller, 0 if equal etc.. + */ + public int compare(Cell left, Cell right) { + return this.kvComparator.compare(left, right); + } + /** + * @return KVComparator + */ + public KVComparator getComparator() { + return this.kvComparator; + } + } + + @Override + public void close() { + if (this.current != null) { + this.current.close(); + } + if (this.heap != null) { + KeyValueScanner scanner; + while ((scanner = this.heap.poll()) != null) { + scanner.close(); + } + } + } + + /** + * Seeks all scanners at or below the specified seek key. If we earlied-out + * of a row, we may end up skipping values that were never reached yet. + * Rather than iterating down, we want to give the opportunity to re-seek. + *

+ * As individual scanners may run past their ends, those scanners are + * automatically closed and removed from the heap. + *

+ * This function (and {@link #reseek(Cell)}) does not do multi-column + * Bloom filter and lazy-seek optimizations. To enable those, call + * {@link #requestSeek(Cell, boolean, boolean)}. + * @param seekKey KeyValue to seek at or after + * @return true if KeyValues exist at or after specified key, false if not + * @throws IOException + */ + @Override + public boolean seek(Cell seekKey) throws IOException { + return generalizedSeek(false, // This is not a lazy seek + seekKey, + false, // forward (false: this is not a reseek) + false); // Not using Bloom filters + } + + /** + * This function is identical to the {@link #seek(Cell)} function except + * that scanner.seek(seekKey) is changed to scanner.reseek(seekKey). + */ + @Override + public boolean reseek(Cell seekKey) throws IOException { + return generalizedSeek(false, // This is not a lazy seek + seekKey, + true, // forward (true because this is reseek) + false); // Not using Bloom filters + } + + /** + * {@inheritDoc} + */ + @Override + public boolean requestSeek(Cell key, boolean forward, + boolean useBloom) throws IOException { + return generalizedSeek(true, key, forward, useBloom); + } + + /** + * @param isLazy whether we are trying to seek to exactly the given row/col. + * Enables Bloom filter and most-recent-file-first optimizations for + * multi-column get/scan queries. + * @param seekKey key to seek to + * @param forward whether to seek forward (also known as reseek) + * @param useBloom whether to optimize seeks using Bloom filters + */ + private boolean generalizedSeek(boolean isLazy, Cell seekKey, + boolean forward, boolean useBloom) throws IOException { + if (!isLazy && useBloom) { + throw new IllegalArgumentException("Multi-column Bloom filter " + + "optimization requires a lazy seek"); + } + + if (current == null) { + return false; + } + heap.add(current); + current = null; + + KeyValueScanner scanner = null; + try { + while ((scanner = heap.poll()) != null) { + Cell topKey = scanner.peek(); + if (comparator.getComparator().compare(seekKey, topKey) <= 0) { + // Top KeyValue is at-or-after Seek KeyValue. We only know that all + // scanners are at or after seekKey (because fake keys of + // scanners where a lazy-seek operation has been done are not greater + // than their real next keys) but we still need to enforce our + // invariant that the top scanner has done a real seek. This way + // StoreScanner and RegionScanner do not have to worry about fake + // keys. + heap.add(scanner); + scanner = null; + current = pollRealKV(); + return current != null; + } + + boolean seekResult; + if (isLazy && heap.size() > 0) { + // If there is only one scanner left, we don't do lazy seek. + seekResult = scanner.requestSeek(seekKey, forward, useBloom); + } else { + seekResult = NonLazyKeyValueScanner.doRealSeek(scanner, seekKey, + forward); + } + + if (!seekResult) { + scanner.close(); + } else { + heap.add(scanner); + } + } + } catch (Exception e) { + if (scanner != null) { + try { + scanner.close(); + } catch (Exception ce) { + LOG.warn("close KeyValueScanner error", ce); + } + } + throw e; + } + + // Heap is returning empty, scanner is done + return false; + } + + /** + * Fetches the top sub-scanner from the priority queue, ensuring that a real + * seek has been done on it. Works by fetching the top sub-scanner, and if it + * has not done a real seek, making it do so (which will modify its top KV), + * putting it back, and repeating this until success. Relies on the fact that + * on a lazy seek we set the current key of a StoreFileScanner to a KV that + * is not greater than the real next KV to be read from that file, so the + * scanner that bubbles up to the top of the heap will have global next KV in + * this scanner heap if (1) it has done a real seek and (2) its KV is the top + * among all top KVs (some of which are fake) in the scanner heap. + */ + protected KeyValueScanner pollRealKV() throws IOException { + KeyValueScanner kvScanner = heap.poll(); + if (kvScanner == null) { + return null; + } + + while (kvScanner != null && !kvScanner.realSeekDone()) { + if (kvScanner.peek() != null) { + try { + kvScanner.enforceSeek(); + } catch (IOException ioe) { + kvScanner.close(); + throw ioe; + } + Cell curKV = kvScanner.peek(); + if (curKV != null) { + KeyValueScanner nextEarliestScanner = heap.peek(); + if (nextEarliestScanner == null) { + // The heap is empty. Return the only possible scanner. + return kvScanner; + } + + // Compare the current scanner to the next scanner. We try to avoid + // putting the current one back into the heap if possible. + Cell nextKV = nextEarliestScanner.peek(); + if (nextKV == null || comparator.compare(curKV, nextKV) < 0) { + // We already have the scanner with the earliest KV, so return it. + return kvScanner; + } + + // Otherwise, put the scanner back into the heap and let it compete + // against all other scanners (both those that have done a "real + // seek" and a "lazy seek"). + heap.add(kvScanner); + } else { + // Close the scanner because we did a real seek and found out there + // are no more KVs. + kvScanner.close(); + } + } else { + // Close the scanner because it has already run out of KVs even before + // we had to do a real seek on it. + kvScanner.close(); + } + kvScanner = heap.poll(); + } + + return kvScanner; + } + + /** + * @return the current Heap + */ + public PriorityQueue getHeap() { + return this.heap; + } + + /** + * @see KeyValueScanner#getScannerOrder() + */ + @Override + public long getScannerOrder() { + return 0; + } + + KeyValueScanner getCurrentForTesting() { + return current; + } + + @Override + public Cell getNextIndexedKey() { + // here we return the next index key from the top scanner + return current == null ? null : current.getNextIndexedKey(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreHeap.java new file mode 100644 index 0000000..cfea082 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreHeap.java @@ -0,0 +1,89 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; + +/** + * Implements a heap merge across any number of KeyValueScanners. + *

+ * Implements KeyValueScanner itself. + *

+ * This class is used at the Region level to merge across Stores. + *

+ * In the Region case, we also need InternalScanner.next(List), so this class + * also implements InternalScanner. + */ +@InterfaceAudience.Private +public class StoreHeap extends StoreFileHeap implements InternalScanner { + /** + * Constructor. This KeyValueHeap will handle closing of passed in + * KeyValueScanners. + * @param scanners + * @param comparator + */ + public StoreHeap(List scanners, + KVComparator comparator) throws IOException { + super(scanners, comparator); + } + + /** + * Constructor. + * @param scanners + * @param comparator + * @throws IOException + */ + StoreHeap(List scanners, + KVScannerComparator comparator) throws IOException { + super(scanners, comparator); + } + + /** + * Gets the next row of keys from the top-most scanner. + *

+ * This method takes care of updating the heap. + *

+ * This can ONLY be called when you are using Scanners that implement InternalScanner as well as + * KeyValueScanner (a {@link StoreScanner}). + * @param result + * @return true if more rows exist after this one, false if scanner is done + */ + @Override + public boolean next(List result) throws IOException { + return next(result, NoLimitScannerContext.getInstance()); + } + + @Override + public boolean next(List result, ScannerContext scannerContext) throws IOException { + if (this.current == null) { + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } + InternalScanner currentAsInternal = (InternalScanner)this.current; + boolean moreCells = currentAsInternal.next(result, scannerContext); + Cell pee = this.current.peek(); + + /* + * By definition, any InternalScanner must return false only when it has no + * further rows to be fetched. So, we can close a scanner if it returns + * false. All existing implementations seem to be fine with this. It is much + * more efficient to close scanners which are not needed than keep them in + * the heap. This is also required for certain optimizations. + */ + + if (pee == null || !moreCells) { + this.current.close(); + } else { + this.heap.add(this.current); + } + this.current = null; + this.current = pollRealKV(); + if (this.current == null) { + moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } + return moreCells; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 678308b..93a5e34 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -66,7 +66,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // In unit tests, the store could be null protected final Store store; protected ScanQueryMatcher matcher; - protected KeyValueHeap heap; + protected StoreFileHeap heap; protected boolean cacheBlocks; protected long countPerRow = 0; @@ -403,7 +403,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner protected void resetKVHeap(List scanners, KVComparator comparator) throws IOException { // Combine all seeked scanners with a heap - heap = new KeyValueHeap(scanners, comparator); + heap = new StoreFileHeap(scanners, comparator); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java index c62df07..9f6b7ad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java @@ -74,7 +74,7 @@ public class TestKeyValueHeap extends HBaseTestCase { public List assertCells(List expected, List scanners) throws IOException { //Creating KeyValueHeap - KeyValueHeap kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR); + StoreFileHeap kvh = new StoreFileHeap(scanners, KeyValue.COMPARATOR); List actual = new ArrayList<>(); while(kvh.peek() != null){ @@ -114,8 +114,8 @@ public class TestKeyValueHeap extends HBaseTestCase { List expected = Arrays.asList(kv211); //Creating KeyValueHeap - KeyValueHeap kvh = - new KeyValueHeap(scanners, KeyValue.COMPARATOR); + StoreFileHeap kvh = + new StoreFileHeap(scanners, KeyValue.COMPARATOR); Cell seekKv = new KeyValue(row2, fam1, null, null); kvh.seek(seekKv); @@ -133,7 +133,7 @@ public class TestKeyValueHeap extends HBaseTestCase { scanners.add(s4); //Creating KeyValueHeap - KeyValueHeap kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR); + StoreFileHeap kvh = new StoreFileHeap(scanners, KeyValue.COMPARATOR); while(kvh.next() != null); @@ -153,7 +153,7 @@ public class TestKeyValueHeap extends HBaseTestCase { List scanners = new ArrayList(Arrays.asList(s1, s2, s3, s4)); // Creating KeyValueHeap - KeyValueHeap kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR); + StoreFileHeap kvh = new StoreFileHeap(scanners, KeyValue.COMPARATOR); try { for (KeyValueScanner scanner : scanners) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java index 7682024..ac82aeb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java @@ -121,7 +121,7 @@ public class TestScanWithBloomError { Scan scan = new Scan(ROW_BYTES, ROW_BYTES); addColumnSetToScan(scan, colSet); RegionScannerImpl scanner = (RegionScannerImpl) region.getScanner(scan); - KeyValueHeap storeHeap = scanner.getStoreHeapForTesting(); + StoreFileHeap storeHeap = scanner.getStoreHeapForTesting(); assertEquals(0, storeHeap.getHeap().size()); StoreScanner storeScanner = (StoreScanner) storeHeap.getCurrentForTesting(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java index fe938b3..61bd2e1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java @@ -542,7 +542,7 @@ public class TestScannerHeartbeatMessages { * Custom KV Heap that can be configured to sleep/wait in between retrievals of column family * cells. Useful for testing */ - private static final class HeartbeatKVHeap extends KeyValueHeap { + private static final class HeartbeatKVHeap extends StoreHeap { public HeartbeatKVHeap(List scanners, KVComparator comparator) throws IOException { super(scanners, comparator); @@ -566,7 +566,7 @@ public class TestScannerHeartbeatMessages { * Custom reversed KV Heap that can be configured to sleep in between retrievals of column family * cells. */ - private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap { + private static final class HeartbeatReversedKVHeap extends ReversedHeap { public HeartbeatReversedKVHeap(List scanners, KVComparator comparator) throws IOException { super(scanners, comparator); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index 1b25744..7bb1478 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -124,7 +124,7 @@ public class TestStoreScanner extends TestCase { new KeyValue(FIVE, CF, ZERO, 1L, KeyValue.Type.Put, VALUE), }; - private static class KeyValueHeapWithCount extends KeyValueHeap { + private static class KeyValueHeapWithCount extends StoreFileHeap { final AtomicInteger count;