Index: src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (revision 1542179)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (working copy)
@@ -21,6 +21,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
@@ -28,6 +29,8 @@
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import com.google.common.collect.Lists;
+
/**
* Implements a heap merge across any number of KeyValueScanners.
*
@@ -42,21 +45,22 @@
*/
public class KeyValueHeap extends NonLazyKeyValueScanner
implements KeyValueScanner, InternalScanner {
- private PriorityQueue heap = null;
/**
- * The current sub-scanner, i.e. the one that contains the next key/value
- * to return to the client. This scanner is NOT included in {@link #heap}
- * (but we frequently add it back to the heap and pull the new winner out).
- * We maintain an invariant that the current sub-scanner has already done
- * a real seek, and that current.peek() is always a real key/value (or null)
+ * We maintain an invariant that the "top" sub-scanner has already done
+ * a real seek, and that {@code scanner.peek()} is always a real key/value (or null)
* except for the fake last-key-on-row-column supplied by the multi-column
* Bloom filter optimization, which is OK to propagate to StoreScanner. In
- * order to ensure that, always use {@link #pollRealKV()} to update current.
+ * order to ensure that, always call {@link #enforceSeek()} after adding
+ * new elements to the loser tree or modifying the top element.
*/
- private KeyValueScanner current = null;
- private KVScannerComparator comparator;
+ private final LoserTree loserTree;
+ /**
+ * All sub-scanners.
+ */
+ private final List extends KeyValueScanner> scanners;
+ private final KVScannerComparator comparator;
/**
* Constructor. This KeyValueHeap will handle closing of passed in
@@ -66,44 +70,36 @@
*/
public KeyValueHeap(List extends KeyValueScanner> scanners,
KVComparator comparator) throws IOException {
+ this.scanners = scanners;
this.comparator = new KVScannerComparator(comparator);
- if (!scanners.isEmpty()) {
- this.heap = new PriorityQueue(scanners.size(),
- this.comparator);
- for (KeyValueScanner scanner : scanners) {
- if (scanner.peek() != null) {
- this.heap.add(scanner);
- } else {
- scanner.close();
- }
+
+ List firstValues = Lists.newArrayListWithExpectedSize(scanners.size());
+ for (KeyValueScanner scanner : scanners) {
+ KeyValue kv = scanner.peek(); // peek() maybe return null if the scanner reaches its end
+ if (kv == null) {
+ scanner.close();
}
- this.current = pollRealKV();
+ firstValues.add(kv);
}
+ this.loserTree = new LoserTree(firstValues, comparator);
+ doRealSeekOnTopScanner();
}
public KeyValue peek() {
- if (this.current == null) {
- return null;
- }
- return this.current.peek();
+ return loserTree.topValue();
}
public KeyValue next() throws IOException {
- if(this.current == null) {
+ if (loserTree.isEmpty()) {
return null;
}
- KeyValue kvReturn = this.current.next();
- KeyValue kvNext = this.current.peek();
- if (kvNext == null) {
- this.current.close();
- this.current = pollRealKV();
+ KeyValue kvReturn = loserTree.topValue();
+ KeyValueScanner s = topScanner();
+ if (s.next() == null) {
+ closeTopScanner();
} else {
- KeyValueScanner topScanner = this.heap.peek();
- if (topScanner == null ||
- this.comparator.compare(kvNext, topScanner.peek()) >= 0) {
- this.heap.add(this.current);
- this.current = pollRealKV();
- }
+ refreshTopScanner();
+ doRealSeekOnTopScanner();
}
return kvReturn;
}
@@ -136,26 +132,24 @@
* @return true if there are more keys, false if all scanners are done
*/
public boolean next(List result, int limit, String metric) throws IOException {
- if (this.current == null) {
+ if (loserTree.isEmpty()) {
return false;
}
- InternalScanner currentAsInternal = (InternalScanner)this.current;
- boolean mayContainMoreRows = currentAsInternal.next(result, limit, metric);
- KeyValue pee = this.current.peek();
+ InternalScanner topScannerAsInternal = (InternalScanner) topScanner();
+ boolean mayContainMoreRows = topScannerAsInternal.next(result, limit);
/*
* By definition, any InternalScanner must return false only when it has no
* further rows to be fetched. So, we can close a scanner if it returns
* false. All existing implementations seem to be fine with this. It is much
* more efficient to close scanners which are not needed than keep them in
- * the heap. This is also required for certain optimizations.
+ * the loser tree. This is also required for certain optimizations.
*/
- if (pee == null || !mayContainMoreRows) {
- this.current.close();
+ if (mayContainMoreRows) {
+ refreshTopScanner();
} else {
- this.heap.add(this.current);
+ closeTopScanner();;
}
- this.current = pollRealKV();
- return (this.current != null);
+ return !loserTree.isEmpty();
}
/**
@@ -222,15 +216,9 @@
}
public void close() {
- if (this.current != null) {
- this.current.close();
+ while (!loserTree.isEmpty()) {
+ closeTopScanner();
}
- if (this.heap != null) {
- KeyValueScanner scanner;
- while ((scanner = this.heap.poll()) != null) {
- scanner.close();
- }
- }
}
/**
@@ -292,49 +280,35 @@
"optimization requires a lazy seek");
}
- if (current == null) {
- return false;
- }
- heap.add(current);
- current = null;
-
- KeyValueScanner scanner;
- while ((scanner = heap.poll()) != null) {
- KeyValue topKey = scanner.peek();
- if (comparator.getComparator().compare(seekKey, topKey) <= 0) {
- // Top KeyValue is at-or-after Seek KeyValue. We only know that all
- // scanners are at or after seekKey (because fake keys of
- // scanners where a lazy-seek operation has been done are not greater
- // than their real next keys) but we still need to enforce our
- // invariant that the top scanner has done a real seek. This way
- // StoreScanner and RegionScanner do not have to worry about fake keys.
- heap.add(scanner);
- current = pollRealKV();
- return current != null;
- }
-
+ while (!loserTree.isEmpty() &&
+ comparator.getComparator().compare(loserTree.topValue(), seekKey) <= 0) {
+ KeyValueScanner topScanner = scanners.get(loserTree.topIndex());
boolean seekResult;
- if (isLazy && heap.size() > 0) {
- // If there is only one scanner left, we don't do lazy seek.
- seekResult = scanner.requestSeek(seekKey, forward, useBloom);
+ if (isLazy && loserTree.getNumOfOpenStreams() > 1) {
+ // Do lazy seek only if there are more than one scanners left.
+ seekResult = topScanner.requestSeek(seekKey, forward, useBloom);
} else {
seekResult = NonLazyKeyValueScanner.doRealSeek(
- scanner, seekKey, forward);
+ topScanner, seekKey, forward);
}
-
if (!seekResult) {
- scanner.close();
+ closeTopScanner();
} else {
- heap.add(scanner);
+ refreshTopScanner();
}
}
+ doRealSeekOnTopScanner();
- // Heap is returning empty, scanner is done
- return false;
+ return !loserTree.isEmpty();
}
+ private void closeTopScanner() {
+ topScanner().close();
+ loserTree.updateTopValue(null);
+ }
+
/**
- * Fetches the top sub-scanner from the priority queue, ensuring that a real
+ * Fetches the next element from the top sub-scanner, ensuring that a real
* seek has been done on it. Works by fetching the top sub-scanner, and if it
* has not done a real seek, making it do so (which will modify its top KV),
* putting it back, and repeating this until success. Relies on the fact that
@@ -344,56 +318,35 @@
* this scanner heap if (1) it has done a real seek and (2) its KV is the top
* among all top KVs (some of which are fake) in the scanner heap.
*/
- private KeyValueScanner pollRealKV() throws IOException {
- KeyValueScanner kvScanner = heap.poll();
- if (kvScanner == null) {
- return null;
+ private void refreshTopScanner() throws IOException {
+ KeyValueScanner s = topScanner();
+ KeyValue kv = s.peek();
+ if (kv == null) {
+ closeTopScanner();
+ return;
}
+ loserTree.updateTopValue(kv);
+ }
- while (kvScanner != null && !kvScanner.realSeekDone()) {
- if (kvScanner.peek() != null) {
- kvScanner.enforceSeek();
- KeyValue curKV = kvScanner.peek();
- if (curKV != null) {
- KeyValueScanner nextEarliestScanner = heap.peek();
- if (nextEarliestScanner == null) {
- // The heap is empty. Return the only possible scanner.
- return kvScanner;
- }
-
- // Compare the current scanner to the next scanner. We try to avoid
- // putting the current one back into the heap if possible.
- KeyValue nextKV = nextEarliestScanner.peek();
- if (nextKV == null || comparator.compare(curKV, nextKV) < 0) {
- // We already have the scanner with the earliest KV, so return it.
- return kvScanner;
- }
-
- // Otherwise, put the scanner back into the heap and let it compete
- // against all other scanners (both those that have done a "real
- // seek" and a "lazy seek").
- heap.add(kvScanner);
- } else {
- // Close the scanner because we did a real seek and found out there
- // are no more KVs.
- kvScanner.close();
- }
- } else {
- // Close the scanner because it has already run out of KVs even before
- // we had to do a real seek on it.
- kvScanner.close();
- }
- kvScanner = heap.poll();
+ private void doRealSeekOnTopScanner() throws IOException {
+ while (!loserTree.isEmpty() && !topScanner().realSeekDone()) {
+ KeyValueScanner s = topScanner();
+ s.enforceSeek();
+ refreshTopScanner();
}
-
- return kvScanner;
}
/**
* @return the current Heap
*/
- public PriorityQueue getHeap() {
- return this.heap;
+ public Collection getHeap() {
+ List results = Lists.newArrayList();
+ for (int index : loserTree.getOpenStreamsForTesting()) {
+ if (index != loserTree.topIndex()) {
+ results.add(scanners.get(index));
+ }
+ }
+ return results;
}
@Override
@@ -402,6 +355,13 @@
}
KeyValueScanner getCurrentForTesting() {
- return current;
+ return topScanner();
}
+
+ private KeyValueScanner topScanner() {
+ if (loserTree.isEmpty()) {
+ return null;
+ }
+ return scanners.get(loserTree.topIndex());
+ }
}
Index: src/main/java/org/apache/hadoop/hbase/regionserver/LoserTree.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/LoserTree.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/LoserTree.java (working copy)
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * LoserTree is a data structure for multi-way merge.
+ *
+ * The use case in multi-way merge is like this:
+ *
+ * 1) Suppose you have N streams opened, then you can construct a LoserTree with
+ * the first elements from each stream.
+ * 2) Call {@link #topValue()} and {@link #topIndex()} to retrieve the
+ * minimal value and the index to scanner which it comes from.
+ * 3) Output the minimal value.
+ * 4) Read next from the stream, and call {@link #updateTopValue(Object)}.
+ * If the scanner reaches EOF, then call {@code updateTopValue(null)}.
+ * 5) Jump to 2) until all scanners reach EOF.
+ *
+ * This class is NOT thread-safe.
+ *
+ * http://sandbox.mc.edu/~bennet/cs402/lec/losedex.html
+ */
+@InterfaceAudience.Private
+public class LoserTree {
+
+ private final int n;
+ private final Comparator super T> comparator;
+ private final List values;
+
+ /** The number of streams not reaching EOF. */
+ private int numOpenStreams;
+
+ /**
+ * {@code tree[0]} is the index to minimal elements in {@code values}.
+ * {@code tree[i]} where i > 0 stores the index to greater value between {@code values[tree[2*i]]}
+ * and {@code values[tree[2*i + 1]]}.
+ */
+ private final int[] tree;
+
+ /**
+ * Constructs a {@code LoserTree} to merge several streams, given the first value of each streams,
+ * where {@code values[i]} is the first value of the i-th stream. If the stream is empty, just pass
+ * {@code NULL} at that position.
+ *
+ * After construction, clients can use {@link #topIndex()} and {@link #topValue()} to retrieve the
+ * minimal value among {@code values}.
+ */
+ public LoserTree(List values, Comparator super T> comparator) {
+ this.n = values.size();
+ this.comparator = comparator;
+ this.values = Lists.newArrayList(values);
+ this.numOpenStreams = 0;
+ for (T value : values) {
+ if (value != null) {
+ numOpenStreams++;
+ }
+ }
+ if (n == 0) {
+ tree = null;
+ } else {
+ // Construct tree[] using dynamic programming with an auxiliary array aux[], where aux[i]
+ // is the index to winner between {@code values[trees[2*i]]} and {@code values[2*i+1]}.
+ //
+ // Suppose there are 4 streams and their first values are 3, 2, 1 and 4 respectively.
+ // We will have the following tree[
+ // t[0]=3 <-- winner is v[3]
+ // |
+ // t[1]=0
+ // a[1]=3
+ // / \
+ // t[2]=1 t[3]=2
+ // a[2]=0 a[3]=3
+ // / \ / \
+ // a[4]=0 a[5]=1 a[6]=2 a[7]=3
+ // v[0]=3 v[1]=2 v[2]=1 v[3]=4
+ //
+ // Notations:
+ // t[i] denotes tree[i],
+ // v[i] denotes values[i] and
+ // a[i] denotes aux[i].
+ tree = new int [n];
+ int[] aux = new int [2 * n];
+ for (int i = 0; i < n; i++) {
+ aux[i + n] = i;
+ }
+ for (int i = n - 1; i > 0; i--) {
+ int winner;
+ int loser;
+ if (compare(aux[i * 2], aux[i * 2 + 1]) < 0) {
+ winner = aux[i * 2];
+ loser = aux[i * 2 + 1];
+ } else {
+ winner = aux[i * 2 + 1];
+ loser = aux[i * 2];
+ }
+ tree[i] = loser;
+ aux[i] = winner;
+ }
+ tree[0] = aux[1];
+ }
+ }
+
+ /**
+ * @return the index to the minimal element.
+ */
+ public int topIndex() {
+ return numOpenStreams > 0 ? tree[0] : -1;
+ }
+
+ /**
+ * @return the value of the minimal element.
+ */
+ public T topValue() {
+ if (numOpenStreams == 0) {
+ return null;
+ }
+ return values.get(topIndex());
+ }
+
+ /**
+ * Changes the value at top index. After this call, user can use {@link #topValue()} to retrieve
+ * the new minimal element.
+ *
+ * It is used when we push next value from the stream that we previously took the minimal element from.
+ * Passes {@code NULL} to value if the stream has reached EOF.
+ */
+ public void updateTopValue(T value) {
+ int index = topIndex();
+ if (value == null && values.get(index) != null) {
+ numOpenStreams--;
+ if (numOpenStreams < 0) {
+ throw new IllegalStateException("numOpenStreams is negative: " + numOpenStreams +
+ "This should not happen. Does it called by multiple threads?");
+ }
+ }
+ values.set(index, value);
+ adjust(index);
+ }
+
+ /**
+ * Tests that if there is any streams that haven't reached EOF.
+ */
+ public boolean isEmpty() {
+ return numOpenStreams == 0;
+ }
+
+ public int getNumOfOpenStreams() {
+ return numOpenStreams;
+ }
+
+ public List getOpenStreamsForTesting() {
+ List streams = Lists.newArrayList();
+ for (int i = 0; i < n; i++) {
+ if (values.get(i) != null) {
+ streams.add(i);
+ }
+ }
+ return streams;
+ }
+
+ /**
+ * Adjusts the loser tree after {@code values[i]} is updated. It needs to compete along the tree
+ * from bottom up to the root. Once it "loses", it stops there and the winner continues to fight to top.
+ *
+ * @param i the index to the recently-updated top value
+ */
+ private void adjust(int i) {
+ for (int k = (i + n) / 2; k > 0; k /= 2) {
+ if (compare(i, tree[k]) >= 0) {
+ int tmp = tree[k];
+ tree[k] = i;
+ i = tmp;
+ }
+ }
+ tree[0] = i;
+ }
+
+ private int compare(int leftIndex, int rightIndex) {
+ if (leftIndex == rightIndex) {
+ return 0;
+ }
+ T leftValue = values.get(leftIndex);
+ T rightValue = values.get(rightIndex);
+ if (leftValue == null) {
+ return 1;
+ }
+ if (rightValue == null) {
+ return -1;
+ }
+ return comparator.compare(leftValue, rightValue);
+ }
+}
Index: src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueHeapBenchmark.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueHeapBenchmark.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueHeapBenchmark.java (working copy)
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+/** Performance benchmark for {@link KeyValueHeap}. */
+public class KeyValueHeapBenchmark extends Configured implements Tool {
+
+ private static final byte[] FAMILY = Bytes.toBytes("f");
+ private static final int NUM_KEYS_PER_SCANNER = 10000;
+ private static final int NUM_SCANNERS = 20;
+ private static final int RANDOM_SEED = 0;
+ private static final int NUM_ROUNDS = 100;
+ private final Random random = new Random(RANDOM_SEED);
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new KeyValueHeapBenchmark(), args);
+ System.exit(exitCode);
+ }
+
+ private void benchmark(int numScanners) throws IOException {
+ long nextTime = 0;
+ int nextCount = 0;
+ long reseekTime = 0;
+ int reseekCount = 0;
+ for (int round = 0; round < NUM_ROUNDS; round++) {
+ // prepare data
+ List> kvs = Lists.newArrayList();
+ for (int i = 0; i < numScanners; i++) {
+ kvs.add(randomKeyValues());
+ }
+
+ // "next"
+ {
+ StopWatch watch = new StopWatch();
+ KeyValueHeap h = createKeyValueHeap(kvs);
+ watch.start();
+ while (h.next() != null) {
+ nextCount++;
+ }
+ watch.stop();
+ h.close();
+ nextTime += watch.getTime();
+ }
+
+ // "reseek"
+ {
+ StopWatch watch = new StopWatch();
+ KeyValueHeap h = createKeyValueHeap(kvs);
+ boolean first = false;
+ watch.start();
+ for (List e : kvs) {
+ for (KeyValue kv : e) {
+ if (first) {
+ h.seek(kv);
+ first = false;
+ } else {
+ h.reseek(kv);
+ }
+ reseekCount++;
+ }
+ }
+ watch.stop();
+ h.close();
+ reseekTime += watch.getTime();
+ }
+ }
+ if (nextCount != NUM_ROUNDS * NUM_KEYS_PER_SCANNER * numScanners) {
+ throw new AssertionError("Wrong nextCount: " + nextCount);
+ }
+ if (reseekCount != NUM_ROUNDS * NUM_KEYS_PER_SCANNER * numScanners) {
+ throw new AssertionError("Wrong reseekCount: " + reseekCount);
+ }
+ System.out.printf("%d scanners:\tnext %.1f calls/s\treseek %.1f calls/s\n",
+ numScanners,
+ 1000.0 * nextCount / nextTime,
+ 1000.0 * reseekCount / reseekTime);
+ }
+
+ private KeyValueHeap createKeyValueHeap(List> kvs) throws IOException {
+ List subscanners = Lists.newArrayList();
+ for (List e : kvs) {
+ subscanners.add(new CollectionBackedScanner(e));
+ }
+ return new KeyValueHeap(subscanners, KeyValue.COMPARATOR);
+ }
+
+ private List randomKeyValues() {
+ List kvs = Lists.newArrayListWithExpectedSize(NUM_KEYS_PER_SCANNER);
+ for (int i = 0; i < NUM_KEYS_PER_SCANNER; i++) {
+ byte[] row = Bytes.toBytes(random.nextLong());
+ KeyValue kv = new KeyValue(row, FAMILY, null);
+ kvs.add(kv);
+ }
+ return kvs;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ for (int i = 1; i < NUM_SCANNERS; i++) {
+ benchmark(i);
+ }
+ return 0;
+ }
+}
Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestLoserTree.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/regionserver/TestLoserTree.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/regionserver/TestLoserTree.java (working copy)
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.hbase.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@Category(SmallTests.class)
+public class TestLoserTree {
+
+ private static final Comparator INTEGER_COMPARATOR = new Comparator() {
+ @Override
+ public int compare(Integer o1, Integer o2) {
+ return Ints.compare(o1.intValue(), o2.intValue());
+ }
+ };
+
+ @Test
+ public void testSingleWayMerge() {
+ LoserTree loserTree = new LoserTree(Lists.newArrayList(0), INTEGER_COMPARATOR);
+ assertEquals(0, loserTree.topIndex());
+ assertEquals(0, loserTree.topValue().intValue());
+ loserTree.updateTopValue(1);
+ assertEquals(0, loserTree.topIndex());
+ assertEquals(1, loserTree.topValue().intValue());
+ loserTree.updateTopValue(2);
+ assertEquals(0, loserTree.topIndex());
+ assertEquals(2, loserTree.topValue().intValue());
+ assertEquals(1, loserTree.getNumOfOpenStreams());
+ assertFalse(loserTree.isEmpty());
+ loserTree.updateTopValue(null);
+ assertTrue(loserTree.isEmpty());
+ assertEquals(0, loserTree.getNumOfOpenStreams());
+ }
+
+ @Test
+ public void testMultiWayMerge() {
+ int n = 40;
+ int m = 10000;
+ Random random = new Random(0L);
+ List> in = Lists.newArrayList();
+ for (int i = 0; i < n; i++) {
+ in.add(Lists.newLinkedList());
+ }
+ for (int i = 0; i < m; i++) {
+ in.get(random.nextInt(n)).add(i);
+ }
+ List firstValues = Lists.newArrayList();
+ for (Queue q : in) {
+ firstValues.add(q.peek());
+ }
+ LoserTree loserTree = new LoserTree(firstValues, INTEGER_COMPARATOR);
+ int prev = Integer.MIN_VALUE;
+ int count = 0;
+ while (!loserTree.isEmpty()) {
+ int k = loserTree.topIndex();
+ Queue q = in.get(k);
+ assertTrue(loserTree.topValue() > prev);
+ prev = loserTree.topValue();
+ assertEquals(q.peek(), loserTree.topValue());
+ q.poll();
+ loserTree.updateTopValue(q.peek());
+ count++;
+ }
+ assertEquals(m, count);
+ }
+
+ @Test
+ public void testZeroWayMerge() {
+ LoserTree loserTree = new LoserTree(Collections.emptyList(), INTEGER_COMPARATOR);
+ assertTrue(loserTree.isEmpty());
+ assertNull(loserTree.topValue());
+ assertEquals(-1, loserTree.topIndex());
+ assertEquals(0, loserTree.getNumOfOpenStreams());
+ }
+
+}