From 14b6bdbfe2c3baefa9e8a0209138ba97b5a5ea26 Mon Sep 17 00:00:00 2001 From: Yi Deng Date: Wed, 15 Oct 2014 19:04:54 -0700 Subject: [PATCH] Add ScanPrefetcher for client side scanning prefetching. Summary: Since server side prefetching was not proved to be a good way to prefetch, we need to do it on client side. This is a wrapper class that takes any instance of `ResultScanner` as the underneath scanning component. The class will schedule the scanning in a background thread. There is a buffering queue storing prefetched results, whose's length is configurable. The prefetcher will release the thread if the queue is full and wait for results to be consumed. Test Plan: `TestScanPrefetcher` Differential Revision: https://reviews.facebook.net/D25617 --- .../hadoop/hbase/client/AbstractClientScanner.java | 45 +-- .../hadoop/hbase/client/ResultScannerIterator.java | 79 +++++ .../apache/hadoop/hbase/client/ScanPrefetcher.java | 353 +++++++++++++++++++++ .../hadoop/hbase/CopyPerformanceEvaluation.java | 147 +++++++++ .../hadoop/hbase/ScanPerformanceEvaluation.java | 13 +- .../hadoop/hbase/client/TestScanPrefetcher.java | 132 ++++++++ 6 files changed, 722 insertions(+), 47 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScannerIterator.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanPrefetcher.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/CopyPerformanceEvaluation.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanPrefetcher.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java index 54c97d7..07ea5b8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java @@ -76,49 +76,6 @@ public abstract class AbstractClientScanner implements ResultScanner { @Override public Iterator iterator() { - return new Iterator() { - // The next RowResult, possibly pre-read - Result next = null; - - // return true if there is another item pending, false if there isn't. - // this method is where the actual advancing takes place, but you need - // to call next() to consume it. hasNext() will only advance if there - // isn't a pending next(). - @Override - public boolean hasNext() { - if (next == null) { - try { - next = AbstractClientScanner.this.next(); - return next != null; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return true; - } - - // get the pending next item and advance the iterator. returns null if - // there is no next item. - @Override - public Result next() { - // since hasNext() does the real advancing, we call this to determine - // if there is a next before proceeding. - if (!hasNext()) { - return null; - } - - // if we get to here, then hasNext() has given us an item to return. - // we want to return the item and then null out the next pointer, so - // we use a temporary variable. - Result temp = next; - next = null; - return temp; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; + return new ResultScannerIterator(this); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScannerIterator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScannerIterator.java new file mode 100644 index 0000000..06f8071 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScannerIterator.java @@ -0,0 +1,79 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * An Iterator of Result for ResultScanner that have implemented {@link ResultScanner#next()} . + */ +@InterfaceAudience.Private +public class ResultScannerIterator implements Iterator { + private ResultScanner scanner; + // The next RowResult, possibly pre-read + private Result next = null; + + public ResultScannerIterator(ResultScanner scanner) { + this.scanner = scanner; + } + + // @return true if there is another item pending, false if there isn't. + // this method is where the actual advancing takes place, but you need + // to call next() to consume it. hasNext() will only advance if there + // isn't a pending next(). + @Override + public boolean hasNext() { + if (next != null) { + return true; + } + + try { + next = scanner.next(); + return next != null; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // Gets the pending next item and advance the iterator. returns null if + // there is no next item. + @Override + public Result next() { + // since hasNext() does the real advancing, we call this to determine + // if there is a next before proceeding. + if (!hasNext()) { + return null; + } + + // if we get to here, then hasNext() has given us an item to return. + // we want to return the item and then null out the next pointer, so + // we use a temporary variable. + Result temp = next; + next = null; + return temp; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanPrefetcher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanPrefetcher.java new file mode 100644 index 0000000..b365748 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanPrefetcher.java @@ -0,0 +1,353 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * ScanPrefetcher makes background fetching given an open scanner. + */ +@InterfaceAudience.Public +public class ScanPrefetcher implements ResultScanner { + private static final Log LOG = LogFactory.getLog(ScanPrefetcher.class); + + /** + * Key to conf of queue length. Each element in the queue is an array of the Result's. + */ + public static final String HBASE_CLIENT_SCANNER_QUEUE_LENGTH = + "hbase.client.scanner.queue.length"; + /** + * Default values of {@link ScanPrefetcher#HBASE_CLIENT_SCANNER_QUEUE_LENGTH} + */ + public static final int DEFAULT_HBASE_CLIENT_SCANNER_QUEUE_LENGTH = 1; + + /** + * Key to conf of caching in ScanPrefetcher. It is the maximum number of Results in each element + * of the queue. If not specified, will use {@link HConstants#HBASE_CLIENT_SCANNER_CACHING} + */ + public static final String HBASE_SCAN_PREFETCHER_CACHING = "hbase.scan.prefetcher.caching"; + + // End of Scanning mark + private static final Result[] EOS = new Result[0]; + + // The thread pool + private final ExecutorService pool; + + // Temporary results list in main thread, may be null + private Result[] currentResults; + // The position of next unfetched results in currentResults if it is + // non-null. + private int currentPos; + // Whether this client has closed. + private boolean closed; + // The queue transferring fetched Result[] to main thread. + // When queue.take() returns an EOS, scanning ends. + private final ArrayBlockingQueue queue; + // A place storing Result[] in case the queue is full. + // If the Fetcher has to put results here, it'll be on idle then. + // If the main thread exchange some non-null results from justFetched, it'll restart the Fetcher. + private final AtomicReference justFetched = new AtomicReference(); + // Contains exception thrown in fetcher thread. + private final AtomicReference exception = new AtomicReference(); + // The variable informing fetching thread to stop + private final AtomicBoolean closing = new AtomicBoolean(false); + // Background fetcher + private final Runnable fetcher; + + /** + * Constructor. + * @param conf the configuration + * @param pool the thread pool + * @param scanner the original scanner. + * @param the number of Results per element in the blocking queue. If specifies a value <= 0, the + * value specified in the configuration will be used. + */ + public ScanPrefetcher(Configuration conf, ExecutorService pool, ResultScanner scanner, + int caching) { + this.pool = pool; + int queueLen = + conf.getInt(HBASE_CLIENT_SCANNER_QUEUE_LENGTH, DEFAULT_HBASE_CLIENT_SCANNER_QUEUE_LENGTH); + this.queue = new ArrayBlockingQueue(queueLen); + + if (caching <= 0) { + caching = conf.getInt(HBASE_SCAN_PREFETCHER_CACHING, conf.getInt( + HConstants.HBASE_CLIENT_SCANNER_CACHING, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING)); + } + + this.fetcher = new Fetcher(scanner, closing, caching, queue, justFetched, exception); + pool.execute(fetcher); + } + + // Throws a Throwable exception as IOException or RuntimeException + private static void throwIOException(Throwable e) throws IOException { + if (e != null) { + if (e instanceof IOException) { + throw (IOException) e; + } else if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + throw new RuntimeException(e); + } + } + + /** + * Fetches results from queue to currentResults if it is null. + * @return true if more results available, false if end of scanning + */ + private boolean fetchFromQueue() throws IOException { + if (currentResults != null) { + return true; + } + + if (closed) { + return false; + } + + try { + // Check justFetched firstly. + Result[] jf = justFetched.getAndSet(null); + + boolean needStart = false; + if (jf != null) { + // If we got non-null value, the Fetcher is on idle. + needStart = jf.length > 0; + + // And we try put justFetched into queue + if (queue.offer(jf)) { + jf = null; + } + } + + // this will block for background to scan + currentResults = queue.take(); + + if (currentResults.length == 0) { + // We've got an EOS (end of scanning) + closed = true; + currentResults = null; + + Throwable e = this.exception.get(); + if (e != null) { + // Failure of scanning + throwIOException(e); + } + + return false; + } + + if (jf != null) { + // Something is put justFetched because the queue was full when those + // results were fetched. The fetching task should not be running now. + // We can safely do the add here because the Fetcher is on idle and we take an element out + // of the queue. + queue.add(jf); + } + + if (needStart) { + // not EOS, we should start the fetcher again + pool.execute(fetcher); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + + // Results fetched + currentPos = 0; + return true; + } + + @Override + public Iterator iterator() { + return new ResultScannerIterator(this); + } + + @Override + public Result next() throws IOException { + if (!fetchFromQueue()) { + return null; + } + Result res = currentResults[currentPos]; + currentPos++; + + if (currentPos >= currentResults.length) { + // currentResults is drained off + currentResults = null; + } + return res; + } + + @Override + public Result[] next(int nbRows) throws IOException { + if (!fetchFromQueue()) { + return null; + } + + // In case, currentResults is just the results we want, return it directly + // to avoid extra resource allocation and copying. + if (currentPos == 0 && nbRows == currentResults.length) { + Result[] res = currentResults; + currentResults = null; + return res; + } + + Result[] res = new Result[nbRows]; + int len = 0; + + while (len < nbRows) { + // Move from currentResults + int n = Math.min(nbRows - len, currentResults.length - currentPos); + System.arraycopy(currentResults, currentPos, res, len, n); + + len += n; + currentPos += n; + + if (currentPos == currentResults.length) { + currentResults = null; + + if (!fetchFromQueue()) { + break; + } + } + } + + if (len < nbRows) { + // Partial results + return Arrays.copyOf(res, len); + } + + return res; + } + + @Override + public void close() { + if (this.closed) { + return; + } + this.closing.set(true); + try { + while (fetchFromQueue()) { + // skip all results, wait for EOS + currentResults = null; + } + } catch (Throwable e) { + LOG.debug("Exception on closing", e); + this.closed = true; + } + } + + /** + * A runnable that does background fetching. + */ + private static class Fetcher implements Runnable { + private final AtomicBoolean closing; + private final ResultScanner scanner; + private final ArrayBlockingQueue queue; + private final AtomicReference justFetched; + private final AtomicReference exception; + private final int caching; + + public Fetcher(ResultScanner scanner, AtomicBoolean closing, int caching, + ArrayBlockingQueue queue, AtomicReference justFetched, + AtomicReference exception) { + this.scanner = scanner; + this.closing = closing; + this.caching = caching; + this.queue = queue; + this.justFetched = justFetched; + this.exception = exception; + } + + /** + * Puts results in queue or justFetched. + * @return whether we should continue fetching in this run. + */ + private boolean putResults(Result[] results) { + if (!queue.offer(results)) { + // queue is full, put results in justFetched + justFetched.set(results); + + if (queue.isEmpty()) { + // It's possible the queue is empty before justFetched is set + // and the main thread is blocking on queue.Take(). + // We try move results in justFetched to queue here. + Result[] js = justFetched.getAndSet(null); + if (js != null) { + // Results in justFetched is not taken by main thread and queue is empty. Put it into + // queue and keep working. + queue.add(js); + return true; + } + // If js == null, it means the main thread moved justFetched to + // queue and arranged a new run. + } + // Then quit from this run and go idle. New run is submitted when some results + // are taken out of the queue + return false; + } + return true; + } + + @Override + public void run() { + try { + while (!closing.get()) { + Result[] results = scanner.next(caching); + if (results != null && results.length > 0) { + if (!putResults(results)) { + // queue is full, let the Fetcher have a rest. + return; + } + } else { + // scanning over. + break; + } + } + } catch (Throwable e) { + exception.set(e); + } + // We only get here when scanning is over or aborted + putResults(EOS); + + // We close the scanner after the EOS is set so the main thread will not wait for scanner to + // be closed. + // This also means even if the ScanPretcher.close is called, the underneath scanner may not + // have been closed. + try { + scanner.close(); + } catch (RuntimeException e) { + // We actually ignore the exception for closing the scanner. + LOG.error("Exception caught when closing the scanner", e); + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/CopyPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/CopyPerformanceEvaluation.java new file mode 100644 index 0000000..3f63cc0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/CopyPerformanceEvaluation.java @@ -0,0 +1,147 @@ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +import org.apache.commons.cli.CommandLine; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScanPrefetcher; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.util.AbstractHBaseTool; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ToolRunner; + +import com.google.common.base.Stopwatch; + +public class CopyPerformanceEvaluation extends AbstractHBaseTool { + private String type = null; + private String srcTable = null; + private String dstTable = null; + private int caching = 0; + + @Override + protected void addOptions() { + this.addRequiredOptWithArg("t", "type", "the type of the test. One of the following: " + + "copy|prefetchcopy"); + this.addOptWithArg("st", "srctable", "the name of the table to read from"); + this.addOptWithArg("dt", "dsttable", "the name of the table to write to"); + this.addOptWithArg("ch", "caching", "scanner caching value"); + } + + @Override + protected void processOptions(CommandLine cmd) { + type = cmd.getOptionValue("type", ""); + srcTable = cmd.getOptionValue("srctable"); + dstTable = cmd.getOptionValue("dsttable"); + caching = Integer.parseInt(cmd.getOptionValue("caching", "0")); + } + + private Scan getScan() { + Scan scan = new Scan(); // default scan settings + scan.setCacheBlocks(false); + scan.setMaxVersions(1); + scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE)); + if (caching > 0) { + scan.setCaching(caching); + } + + return scan; + } + + private static Put putFromResult(Result res) throws IOException { + Put put = new Put(res.getRow()); + for (Cell cell : res.rawCells()) { + put.add(cell); + } + + return put; + } + + public void testCopy(boolean prefetch) throws IOException { + Stopwatch tableOpenTimer = new Stopwatch(); + Stopwatch scanOpenTimer = new Stopwatch(); + Stopwatch copyTimer = new Stopwatch(); + + tableOpenTimer.start(); + HTable srcHTable = new HTable(getConf(), TableName.valueOf(srcTable)); + HTable dstHTable = new HTable(getConf(), TableName.valueOf(dstTable)); + tableOpenTimer.stop(); + + dstHTable.setAutoFlush(false, true); + + Scan scan = getScan(); + scanOpenTimer.start(); + ResultScanner scanner = srcHTable.getScanner(scan); + if (prefetch) { + System.err.println("Using ScanPrefetcher"); + scanner = new ScanPrefetcher(getConf(), HTable.getDefaultExecutor(getConf()), scanner, 0); + } + scanOpenTimer.stop(); + + long numRows = 0; + long numCells = 0; + copyTimer.start(); + while (true) { + Result result = scanner.next(); + if (result == null) { + break; + } + numRows++; + numCells += result.rawCells().length; + + dstHTable.put(putFromResult(result)); + } + dstHTable.flushCommits(); + copyTimer.stop(); + scanner.close(); + dstHTable.close(); + srcHTable.close(); + + ScanMetrics metrics = + ProtobufUtil.toScanMetrics(scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA)); + long totalBytes = metrics.countOfBytesInResults.get(); + double throughput = (double) totalBytes * 1000. / copyTimer.elapsedMillis(); + double throughputRows = (double) numRows * 1000. / copyTimer.elapsedMillis(); + double throughputCells = (double) numCells * 1000. / copyTimer.elapsedMillis(); + + System.out.println("HBase scan: "); + System.out.println("total time to open table: " + tableOpenTimer.elapsedMillis() + " ms"); + System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms"); + System.out.println("total time to scan: " + copyTimer.elapsedMillis() + " ms"); + + System.out.println("Scan metrics:\n" + metrics.getMetricsMap()); + + System.out.println("total bytes: " + totalBytes + " bytes (" + + StringUtils.humanReadableInt(totalBytes) + ")"); + System.out.println("throughput : " + StringUtils.humanReadableInt((long) throughput) + "/s"); + System.out.println("total rows : " + numRows); + System.out.println("throughput : " + StringUtils.humanReadableInt((long) throughputRows) + + " rows/s"); + System.out.println("total cells : " + numCells); + System.out.println("throughput : " + StringUtils.humanReadableInt((long) throughputCells) + + " cells/s"); + } + + @Override + protected int doWork() throws Exception { + if (type.equals("copy")) { + testCopy(false); + } else if (type.equals("prefetchcopy")) { + testCopy(true); + } else { + System.err.println("Unknown type: " + type); + return 1; + } + return 0; + } + + public static void main(String[] args) throws Exception { + int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyPerformanceEvaluation(), args); + System.exit(ret); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java index dfafa83..0a25ad1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScanPrefetcher; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableSnapshotScanner; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; @@ -78,7 +79,8 @@ public class ScanPerformanceEvaluation extends AbstractHBaseTool { @Override protected void addOptions() { - this.addRequiredOptWithArg("t", "type", "the type of the test. One of the following: streaming|scan|snapshotscan|scanmapreduce|snapshotscanmapreduce"); + this.addRequiredOptWithArg("t", "type", "the type of the test. One of the following: " + + "streaming|scan|prefetchscan|snapshotscan|scanmapreduce|snapshotscanmapreduce"); this.addOptWithArg("f", "file", "the filename to read from"); this.addOptWithArg("tn", "table", "the tablename to read from"); this.addOptWithArg("sn", "snapshot", "the snapshot name to read from"); @@ -141,7 +143,7 @@ public class ScanPerformanceEvaluation extends AbstractHBaseTool { return scan; } - public void testScan() throws IOException { + public void testScan(boolean prefetch) throws IOException { Stopwatch tableOpenTimer = new Stopwatch(); Stopwatch scanOpenTimer = new Stopwatch(); Stopwatch scanTimer = new Stopwatch(); @@ -153,6 +155,9 @@ public class ScanPerformanceEvaluation extends AbstractHBaseTool { Scan scan = getScan(); scanOpenTimer.start(); ResultScanner scanner = table.getScanner(scan); + if (prefetch) { + scanner = new ScanPrefetcher(getConf(), HTable.getDefaultExecutor(getConf()), scanner); + } scanOpenTimer.stop(); long numRows = 0; @@ -376,7 +381,9 @@ public class ScanPerformanceEvaluation extends AbstractHBaseTool { if (type.equals("streaming")) { testHdfsStreaming(new Path(file)); } else if (type.equals("scan")){ - testScan(); + testScan(false); + } else if (type.equals("prefetchscan")) { + testScan(true); } else if (type.equals("snapshotscan")) { testSnapshotScan(); } else if (type.equals("scanmapreduce")) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanPrefetcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanPrefetcher.java new file mode 100644 index 0000000..ff7ccdf --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanPrefetcher.java @@ -0,0 +1,132 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Testcases for {@link ScanPrefetcher} + */ +@Category(MediumTests.class) +public class TestScanPrefetcher { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @Before + public void setUp() throws Exception { + TEST_UTIL.startMiniCluster(3); + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Basic testcase. Generating 10000 rows distributed around regions and scan them out. + */ + @Test + public void testBasic() throws Exception { + final TableName TABLE = TableName.valueOf("testBasic"); + final byte[] FAMILY = Bytes.toBytes("f"); + final int N = 1000; + Configuration conf = TEST_UTIL.getConfiguration(); + + HTable htable = TEST_UTIL.createTable(TABLE, FAMILY); + for (int i = 0; i < N; i++) { + byte[] key = Bytes.toBytes(i); + // makes key[0] distributing all value spaces so that all regions contain data. + key[0] = (byte) (i * 256 / N); + + Put put = new Put(key).add(FAMILY, FAMILY, Bytes.toBytes(i)); + htable.put(put); + } + + ResultScanner scanner = + new ScanPrefetcher(conf, htable.getPool(), htable.getScanner(new Scan()), 0); + try { + int count = 0; + for (Result res : scanner) { + byte[] key = Bytes.copy(res.getRow()); + // assuming highest byte if i is zero, i.e. i is less than 0x01000000 + key[0] = 0; + int i = Bytes.toInt(key); + + byte[] value = res.getValue(FAMILY, FAMILY); + + Assert.assertEquals("value for " + Bytes.toStringBinary(res.getRow()), + Bytes.toStringBinary(Bytes.toBytes(i)), Bytes.toStringBinary(value)); + + count++; + } + Assert.assertEquals("count", N, count); + } finally { + scanner.close(); + } + + htable.close(); + } + + /** + * Similar to testBasic but will use nextRows to fetch all results as a whole. + */ + @Test + public void testNextRows() throws Exception { + final TableName TABLE = TableName.valueOf("testNextRows"); + final byte[] FAMILY = Bytes.toBytes("f"); + final int N = 1000; + Configuration conf = TEST_UTIL.getConfiguration(); + + HTable htable = TEST_UTIL.createTable(TABLE, FAMILY); + for (int i = 0; i < N; i++) { + byte[] key = Bytes.toBytes(i); + // makes key[0] distributing all value spaces so that all regions contain data. + key[0] = (byte) (i * 256 / N); + + Put put = new Put(key).add(FAMILY, FAMILY, Bytes.toBytes(i)); + htable.put(put); + } + + ResultScanner scanner = + new ScanPrefetcher(conf, htable.getPool(), htable.getScanner(new Scan()), 0); + try { + Result[] results = scanner.next(N); + Assert.assertEquals("count", N, results.length); + for (int i = 0; i < results.length; i++) { + Result res = results[i]; + byte[] value = res.getValue(FAMILY, FAMILY); + + Assert.assertEquals("value for " + Bytes.toStringBinary(res.getRow()), + Bytes.toStringBinary(Bytes.toBytes(i)), Bytes.toStringBinary(value)); + } + } finally { + scanner.close(); + } + + htable.close(); + } +} -- 1.9.3 (Apple Git-50)