Index: src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (revision 1542179) +++ src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (working copy) @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.PriorityQueue; @@ -28,6 +29,8 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; +import com.google.common.collect.Lists; + /** * Implements a heap merge across any number of KeyValueScanners. *

@@ -42,21 +45,22 @@ */ public class KeyValueHeap extends NonLazyKeyValueScanner implements KeyValueScanner, InternalScanner { - private PriorityQueue heap = null; /** - * The current sub-scanner, i.e. the one that contains the next key/value - * to return to the client. This scanner is NOT included in {@link #heap} - * (but we frequently add it back to the heap and pull the new winner out). - * We maintain an invariant that the current sub-scanner has already done - * a real seek, and that current.peek() is always a real key/value (or null) + * We maintain an invariant that the "top" sub-scanner has already done + * a real seek, and that {@code scanner.peek()} is always a real key/value (or null) * except for the fake last-key-on-row-column supplied by the multi-column * Bloom filter optimization, which is OK to propagate to StoreScanner. In - * order to ensure that, always use {@link #pollRealKV()} to update current. + * order to ensure that, always call {@link #enforceSeek()} after adding + * new elements to the loser tree or modifying the top element. */ - private KeyValueScanner current = null; - private KVScannerComparator comparator; + private final LoserTree loserTree; + /** + * All sub-scanners. + */ + private final List scanners; + private final KVScannerComparator comparator; /** * Constructor. This KeyValueHeap will handle closing of passed in @@ -66,44 +70,36 @@ */ public KeyValueHeap(List scanners, KVComparator comparator) throws IOException { + this.scanners = scanners; this.comparator = new KVScannerComparator(comparator); - if (!scanners.isEmpty()) { - this.heap = new PriorityQueue(scanners.size(), - this.comparator); - for (KeyValueScanner scanner : scanners) { - if (scanner.peek() != null) { - this.heap.add(scanner); - } else { - scanner.close(); - } + + List firstValues = Lists.newArrayListWithExpectedSize(scanners.size()); + for (KeyValueScanner scanner : scanners) { + KeyValue kv = scanner.peek(); // peek() maybe return null if the scanner reaches its end + if (kv == null) { + scanner.close(); } - this.current = pollRealKV(); + firstValues.add(kv); } + this.loserTree = new LoserTree(firstValues, comparator); + doRealSeekOnTopScanner(); } public KeyValue peek() { - if (this.current == null) { - return null; - } - return this.current.peek(); + return loserTree.topValue(); } public KeyValue next() throws IOException { - if(this.current == null) { + if (loserTree.isEmpty()) { return null; } - KeyValue kvReturn = this.current.next(); - KeyValue kvNext = this.current.peek(); - if (kvNext == null) { - this.current.close(); - this.current = pollRealKV(); + KeyValue kvReturn = loserTree.topValue(); + KeyValueScanner s = topScanner(); + if (s.next() == null) { + closeTopScanner(); } else { - KeyValueScanner topScanner = this.heap.peek(); - if (topScanner == null || - this.comparator.compare(kvNext, topScanner.peek()) >= 0) { - this.heap.add(this.current); - this.current = pollRealKV(); - } + refreshTopScanner(); + doRealSeekOnTopScanner(); } return kvReturn; } @@ -136,26 +132,24 @@ * @return true if there are more keys, false if all scanners are done */ public boolean next(List result, int limit, String metric) throws IOException { - if (this.current == null) { + if (loserTree.isEmpty()) { return false; } - InternalScanner currentAsInternal = (InternalScanner)this.current; - boolean mayContainMoreRows = currentAsInternal.next(result, limit, metric); - KeyValue pee = this.current.peek(); + InternalScanner topScannerAsInternal = (InternalScanner) topScanner(); + boolean mayContainMoreRows = topScannerAsInternal.next(result, limit); /* * By definition, any InternalScanner must return false only when it has no * further rows to be fetched. So, we can close a scanner if it returns * false. All existing implementations seem to be fine with this. It is much * more efficient to close scanners which are not needed than keep them in - * the heap. This is also required for certain optimizations. + * the loser tree. This is also required for certain optimizations. */ - if (pee == null || !mayContainMoreRows) { - this.current.close(); + if (mayContainMoreRows) { + refreshTopScanner(); } else { - this.heap.add(this.current); + closeTopScanner();; } - this.current = pollRealKV(); - return (this.current != null); + return !loserTree.isEmpty(); } /** @@ -222,15 +216,9 @@ } public void close() { - if (this.current != null) { - this.current.close(); + while (!loserTree.isEmpty()) { + closeTopScanner(); } - if (this.heap != null) { - KeyValueScanner scanner; - while ((scanner = this.heap.poll()) != null) { - scanner.close(); - } - } } /** @@ -292,49 +280,35 @@ "optimization requires a lazy seek"); } - if (current == null) { - return false; - } - heap.add(current); - current = null; - - KeyValueScanner scanner; - while ((scanner = heap.poll()) != null) { - KeyValue topKey = scanner.peek(); - if (comparator.getComparator().compare(seekKey, topKey) <= 0) { - // Top KeyValue is at-or-after Seek KeyValue. We only know that all - // scanners are at or after seekKey (because fake keys of - // scanners where a lazy-seek operation has been done are not greater - // than their real next keys) but we still need to enforce our - // invariant that the top scanner has done a real seek. This way - // StoreScanner and RegionScanner do not have to worry about fake keys. - heap.add(scanner); - current = pollRealKV(); - return current != null; - } - + while (!loserTree.isEmpty() && + comparator.getComparator().compare(loserTree.topValue(), seekKey) <= 0) { + KeyValueScanner topScanner = scanners.get(loserTree.topIndex()); boolean seekResult; - if (isLazy && heap.size() > 0) { - // If there is only one scanner left, we don't do lazy seek. - seekResult = scanner.requestSeek(seekKey, forward, useBloom); + if (isLazy && loserTree.getNumOfOpenStreams() > 1) { + // Do lazy seek only if there are more than one scanners left. + seekResult = topScanner.requestSeek(seekKey, forward, useBloom); } else { seekResult = NonLazyKeyValueScanner.doRealSeek( - scanner, seekKey, forward); + topScanner, seekKey, forward); } - if (!seekResult) { - scanner.close(); + closeTopScanner(); } else { - heap.add(scanner); + refreshTopScanner(); } } + doRealSeekOnTopScanner(); - // Heap is returning empty, scanner is done - return false; + return !loserTree.isEmpty(); } + private void closeTopScanner() { + topScanner().close(); + loserTree.updateTopValue(null); + } + /** - * Fetches the top sub-scanner from the priority queue, ensuring that a real + * Fetches the next element from the top sub-scanner, ensuring that a real * seek has been done on it. Works by fetching the top sub-scanner, and if it * has not done a real seek, making it do so (which will modify its top KV), * putting it back, and repeating this until success. Relies on the fact that @@ -344,56 +318,35 @@ * this scanner heap if (1) it has done a real seek and (2) its KV is the top * among all top KVs (some of which are fake) in the scanner heap. */ - private KeyValueScanner pollRealKV() throws IOException { - KeyValueScanner kvScanner = heap.poll(); - if (kvScanner == null) { - return null; + private void refreshTopScanner() throws IOException { + KeyValueScanner s = topScanner(); + KeyValue kv = s.peek(); + if (kv == null) { + closeTopScanner(); + return; } + loserTree.updateTopValue(kv); + } - while (kvScanner != null && !kvScanner.realSeekDone()) { - if (kvScanner.peek() != null) { - kvScanner.enforceSeek(); - KeyValue curKV = kvScanner.peek(); - if (curKV != null) { - KeyValueScanner nextEarliestScanner = heap.peek(); - if (nextEarliestScanner == null) { - // The heap is empty. Return the only possible scanner. - return kvScanner; - } - - // Compare the current scanner to the next scanner. We try to avoid - // putting the current one back into the heap if possible. - KeyValue nextKV = nextEarliestScanner.peek(); - if (nextKV == null || comparator.compare(curKV, nextKV) < 0) { - // We already have the scanner with the earliest KV, so return it. - return kvScanner; - } - - // Otherwise, put the scanner back into the heap and let it compete - // against all other scanners (both those that have done a "real - // seek" and a "lazy seek"). - heap.add(kvScanner); - } else { - // Close the scanner because we did a real seek and found out there - // are no more KVs. - kvScanner.close(); - } - } else { - // Close the scanner because it has already run out of KVs even before - // we had to do a real seek on it. - kvScanner.close(); - } - kvScanner = heap.poll(); + private void doRealSeekOnTopScanner() throws IOException { + while (!loserTree.isEmpty() && !topScanner().realSeekDone()) { + KeyValueScanner s = topScanner(); + s.enforceSeek(); + refreshTopScanner(); } - - return kvScanner; } /** * @return the current Heap */ - public PriorityQueue getHeap() { - return this.heap; + public Collection getHeap() { + List results = Lists.newArrayList(); + for (int index : loserTree.getOpenStreamsForTesting()) { + if (index != loserTree.topIndex()) { + results.add(scanners.get(index)); + } + } + return results; } @Override @@ -402,6 +355,13 @@ } KeyValueScanner getCurrentForTesting() { - return current; + return topScanner(); } + + private KeyValueScanner topScanner() { + if (loserTree.isEmpty()) { + return null; + } + return scanners.get(loserTree.topIndex()); + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/LoserTree.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/LoserTree.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/LoserTree.java (working copy) @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.google.common.collect.Lists; +import org.apache.hadoop.classification.InterfaceAudience; + +import java.util.Comparator; +import java.util.List; + +/** + * LoserTree is a data structure for multi-way merge. + * + * The use case in multi-way merge is like this: + * + * 1) Suppose you have N streams opened, then you can construct a LoserTree with + * the first elements from each stream. + * 2) Call {@link #topValue()} and {@link #topIndex()} to retrieve the + * minimal value and the index to scanner which it comes from. + * 3) Output the minimal value. + * 4) Read next from the stream, and call {@link #updateTopValue(Object)}. + * If the scanner reaches EOF, then call {@code updateTopValue(null)}. + * 5) Jump to 2) until all scanners reach EOF. + * + * This class is NOT thread-safe. + * + * http://sandbox.mc.edu/~bennet/cs402/lec/losedex.html + */ +@InterfaceAudience.Private +public class LoserTree { + + private final int n; + private final Comparator comparator; + private final List values; + + /** The number of streams not reaching EOF. */ + private int numOpenStreams; + + /** + * {@code tree[0]} is the index to minimal elements in {@code values}. + * {@code tree[i]} where i > 0 stores the index to greater value between {@code values[tree[2*i]]} + * and {@code values[tree[2*i + 1]]}. + */ + private final int[] tree; + + /** + * Constructs a {@code LoserTree} to merge several streams, given the first value of each streams, + * where {@code values[i]} is the first value of the i-th stream. If the stream is empty, just pass + * {@code NULL} at that position. + * + * After construction, clients can use {@link #topIndex()} and {@link #topValue()} to retrieve the + * minimal value among {@code values}. + */ + public LoserTree(List values, Comparator comparator) { + this.n = values.size(); + this.comparator = comparator; + this.values = Lists.newArrayList(values); + this.numOpenStreams = 0; + for (T value : values) { + if (value != null) { + numOpenStreams++; + } + } + if (n == 0) { + tree = null; + } else { + // Construct tree[] using dynamic programming with an auxiliary array aux[], where aux[i] + // is the index to winner between {@code values[trees[2*i]]} and {@code values[2*i+1]}. + // + // Suppose there are 4 streams and their first values are 3, 2, 1 and 4 respectively. + // We will have the following tree[ + // t[0]=3 <-- winner is v[3] + // | + // t[1]=0 + // a[1]=3 + // / \ + // t[2]=1 t[3]=2 + // a[2]=0 a[3]=3 + // / \ / \ + // a[4]=0 a[5]=1 a[6]=2 a[7]=3 + // v[0]=3 v[1]=2 v[2]=1 v[3]=4 + // + // Notations: + // t[i] denotes tree[i], + // v[i] denotes values[i] and + // a[i] denotes aux[i]. + tree = new int [n]; + int[] aux = new int [2 * n]; + for (int i = 0; i < n; i++) { + aux[i + n] = i; + } + for (int i = n - 1; i > 0; i--) { + int winner; + int loser; + if (compare(aux[i * 2], aux[i * 2 + 1]) < 0) { + winner = aux[i * 2]; + loser = aux[i * 2 + 1]; + } else { + winner = aux[i * 2 + 1]; + loser = aux[i * 2]; + } + tree[i] = loser; + aux[i] = winner; + } + tree[0] = aux[1]; + } + } + + /** + * @return the index to the minimal element. + */ + public int topIndex() { + return numOpenStreams > 0 ? tree[0] : -1; + } + + /** + * @return the value of the minimal element. + */ + public T topValue() { + if (numOpenStreams == 0) { + return null; + } + return values.get(topIndex()); + } + + /** + * Changes the value at top index. After this call, user can use {@link #topValue()} to retrieve + * the new minimal element. + * + * It is used when we push next value from the stream that we previously took the minimal element from. + * Passes {@code NULL} to value if the stream has reached EOF. + */ + public void updateTopValue(T value) { + int index = topIndex(); + if (value == null && values.get(index) != null) { + numOpenStreams--; + if (numOpenStreams < 0) { + throw new IllegalStateException("numOpenStreams is negative: " + numOpenStreams + + "This should not happen. Does it called by multiple threads?"); + } + } + values.set(index, value); + adjust(index); + } + + /** + * Tests that if there is any streams that haven't reached EOF. + */ + public boolean isEmpty() { + return numOpenStreams == 0; + } + + public int getNumOfOpenStreams() { + return numOpenStreams; + } + + public List getOpenStreamsForTesting() { + List streams = Lists.newArrayList(); + for (int i = 0; i < n; i++) { + if (values.get(i) != null) { + streams.add(i); + } + } + return streams; + } + + /** + * Adjusts the loser tree after {@code values[i]} is updated. It needs to compete along the tree + * from bottom up to the root. Once it "loses", it stops there and the winner continues to fight to top. + * + * @param i the index to the recently-updated top value + */ + private void adjust(int i) { + for (int k = (i + n) / 2; k > 0; k /= 2) { + if (compare(i, tree[k]) >= 0) { + int tmp = tree[k]; + tree[k] = i; + i = tmp; + } + } + tree[0] = i; + } + + private int compare(int leftIndex, int rightIndex) { + if (leftIndex == rightIndex) { + return 0; + } + T leftValue = values.get(leftIndex); + T rightValue = values.get(rightIndex); + if (leftValue == null) { + return 1; + } + if (rightValue == null) { + return -1; + } + return comparator.compare(leftValue, rightValue); + } +} Index: src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueHeapBenchmark.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueHeapBenchmark.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueHeapBenchmark.java (working copy) @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.google.common.collect.Lists; +import org.apache.commons.lang.time.StopWatch; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CollectionBackedScanner; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import java.io.IOException; +import java.util.List; +import java.util.Random; + +/** Performance benchmark for {@link KeyValueHeap}. */ +public class KeyValueHeapBenchmark extends Configured implements Tool { + + private static final byte[] FAMILY = Bytes.toBytes("f"); + private static final int NUM_KEYS_PER_SCANNER = 10000; + private static final int NUM_SCANNERS = 20; + private static final int RANDOM_SEED = 0; + private static final int NUM_ROUNDS = 100; + private final Random random = new Random(RANDOM_SEED); + + public static void main(String[] args) throws Exception { + int exitCode = ToolRunner.run(new KeyValueHeapBenchmark(), args); + System.exit(exitCode); + } + + private void benchmark(int numScanners) throws IOException { + long nextTime = 0; + int nextCount = 0; + long reseekTime = 0; + int reseekCount = 0; + for (int round = 0; round < NUM_ROUNDS; round++) { + // prepare data + List> kvs = Lists.newArrayList(); + for (int i = 0; i < numScanners; i++) { + kvs.add(randomKeyValues()); + } + + // "next" + { + StopWatch watch = new StopWatch(); + KeyValueHeap h = createKeyValueHeap(kvs); + watch.start(); + while (h.next() != null) { + nextCount++; + } + watch.stop(); + h.close(); + nextTime += watch.getTime(); + } + + // "reseek" + { + StopWatch watch = new StopWatch(); + KeyValueHeap h = createKeyValueHeap(kvs); + boolean first = false; + watch.start(); + for (List e : kvs) { + for (KeyValue kv : e) { + if (first) { + h.seek(kv); + first = false; + } else { + h.reseek(kv); + } + reseekCount++; + } + } + watch.stop(); + h.close(); + reseekTime += watch.getTime(); + } + } + if (nextCount != NUM_ROUNDS * NUM_KEYS_PER_SCANNER * numScanners) { + throw new AssertionError("Wrong nextCount: " + nextCount); + } + if (reseekCount != NUM_ROUNDS * NUM_KEYS_PER_SCANNER * numScanners) { + throw new AssertionError("Wrong reseekCount: " + reseekCount); + } + System.out.printf("%d scanners:\tnext %.1f calls/s\treseek %.1f calls/s\n", + numScanners, + 1000.0 * nextCount / nextTime, + 1000.0 * reseekCount / reseekTime); + } + + private KeyValueHeap createKeyValueHeap(List> kvs) throws IOException { + List subscanners = Lists.newArrayList(); + for (List e : kvs) { + subscanners.add(new CollectionBackedScanner(e)); + } + return new KeyValueHeap(subscanners, KeyValue.COMPARATOR); + } + + private List randomKeyValues() { + List kvs = Lists.newArrayListWithExpectedSize(NUM_KEYS_PER_SCANNER); + for (int i = 0; i < NUM_KEYS_PER_SCANNER; i++) { + byte[] row = Bytes.toBytes(random.nextLong()); + KeyValue kv = new KeyValue(row, FAMILY, null); + kvs.add(kv); + } + return kvs; + } + + @Override + public int run(String[] args) throws Exception { + for (int i = 1; i < NUM_SCANNERS; i++) { + benchmark(i); + } + return 0; + } +} Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestLoserTree.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestLoserTree.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestLoserTree.java (working copy) @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Queue; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +@Category(SmallTests.class) +public class TestLoserTree { + + private static final Comparator INTEGER_COMPARATOR = new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return Ints.compare(o1.intValue(), o2.intValue()); + } + }; + + @Test + public void testSingleWayMerge() { + LoserTree loserTree = new LoserTree(Lists.newArrayList(0), INTEGER_COMPARATOR); + assertEquals(0, loserTree.topIndex()); + assertEquals(0, loserTree.topValue().intValue()); + loserTree.updateTopValue(1); + assertEquals(0, loserTree.topIndex()); + assertEquals(1, loserTree.topValue().intValue()); + loserTree.updateTopValue(2); + assertEquals(0, loserTree.topIndex()); + assertEquals(2, loserTree.topValue().intValue()); + assertEquals(1, loserTree.getNumOfOpenStreams()); + assertFalse(loserTree.isEmpty()); + loserTree.updateTopValue(null); + assertTrue(loserTree.isEmpty()); + assertEquals(0, loserTree.getNumOfOpenStreams()); + } + + @Test + public void testMultiWayMerge() { + int n = 40; + int m = 10000; + Random random = new Random(0L); + List> in = Lists.newArrayList(); + for (int i = 0; i < n; i++) { + in.add(Lists.newLinkedList()); + } + for (int i = 0; i < m; i++) { + in.get(random.nextInt(n)).add(i); + } + List firstValues = Lists.newArrayList(); + for (Queue q : in) { + firstValues.add(q.peek()); + } + LoserTree loserTree = new LoserTree(firstValues, INTEGER_COMPARATOR); + int prev = Integer.MIN_VALUE; + int count = 0; + while (!loserTree.isEmpty()) { + int k = loserTree.topIndex(); + Queue q = in.get(k); + assertTrue(loserTree.topValue() > prev); + prev = loserTree.topValue(); + assertEquals(q.peek(), loserTree.topValue()); + q.poll(); + loserTree.updateTopValue(q.peek()); + count++; + } + assertEquals(m, count); + } + + @Test + public void testZeroWayMerge() { + LoserTree loserTree = new LoserTree(Collections.emptyList(), INTEGER_COMPARATOR); + assertTrue(loserTree.isEmpty()); + assertNull(loserTree.topValue()); + assertEquals(-1, loserTree.topIndex()); + assertEquals(0, loserTree.getNumOfOpenStreams()); + } + +}