From 129ca8604e51350d148bf1504a0f1a3b32556894 Mon Sep 17 00:00:00 2001 From: Yi Deng Date: Wed, 15 Oct 2014 19:04:54 -0700 Subject: [PATCH] ScanPrefetcher --- .../hadoop/hbase/client/AbstractClientScanner.java | 45 +-- .../hadoop/hbase/client/ResultScannerIterator.java | 77 +++++ .../apache/hadoop/hbase/client/ScanPrefetcher.java | 330 +++++++++++++++++++++ .../hadoop/hbase/client/TestScanPrefetcher.java | 129 ++++++++ 4 files changed, 537 insertions(+), 44 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScannerIterator.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanPrefetcher.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanPrefetcher.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java index 54c97d7..07ea5b8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java @@ -76,49 +76,6 @@ public abstract class AbstractClientScanner implements ResultScanner { @Override public Iterator iterator() { - return new Iterator() { - // The next RowResult, possibly pre-read - Result next = null; - - // return true if there is another item pending, false if there isn't. - // this method is where the actual advancing takes place, but you need - // to call next() to consume it. hasNext() will only advance if there - // isn't a pending next(). - @Override - public boolean hasNext() { - if (next == null) { - try { - next = AbstractClientScanner.this.next(); - return next != null; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return true; - } - - // get the pending next item and advance the iterator. returns null if - // there is no next item. - @Override - public Result next() { - // since hasNext() does the real advancing, we call this to determine - // if there is a next before proceeding. - if (!hasNext()) { - return null; - } - - // if we get to here, then hasNext() has given us an item to return. - // we want to return the item and then null out the next pointer, so - // we use a temporary variable. - Result temp = next; - next = null; - return temp; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; + return new ResultScannerIterator(this); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScannerIterator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScannerIterator.java new file mode 100644 index 0000000..3a2f660 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScannerIterator.java @@ -0,0 +1,77 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * An Iterator of Result for ResultScanner. + */ +@InterfaceAudience.Private +public class ResultScannerIterator implements Iterator { + private ResultScanner scanner; + // The next RowResult, possibly pre-read + private Result next = null; + + public ResultScannerIterator(ResultScanner scanner) { + this.scanner = scanner; + } + // return true if there is another item pending, false if there isn't. + // this method is where the actual advancing takes place, but you need + // to call next() to consume it. hasNext() will only advance if there + // isn't a pending next(). + @Override + public boolean hasNext() { + if (next == null) { + try { + next = scanner.next(); + return next != null; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return true; + } + + // get the pending next item and advance the iterator. returns null if + // there is no next item. + @Override + public Result next() { + // since hasNext() does the real advancing, we call this to determine + // if there is a next before proceeding. + if (!hasNext()) { + return null; + } + + // if we get to here, then hasNext() has given us an item to return. + // we want to return the item and then null out the next pointer, so + // we use a temporary variable. + Result temp = next; + next = null; + return temp; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanPrefetcher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanPrefetcher.java new file mode 100644 index 0000000..a9b37f8 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanPrefetcher.java @@ -0,0 +1,330 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * ScanPrefetcher makes background fetching given an open scanner. + */ +@InterfaceAudience.Public +public class ScanPrefetcher implements ResultScanner { + private static final Log LOG = LogFactory.getLog(ScanPrefetcher.class); + + /** + * Key to conf of queue length. Each element in the queue is an array of the Result's. + */ + public static final String HBASE_CLIENT_SCANNER_QUEUE_LENGTH = + "hbase.client.scanner.queue.length"; + /** + * Default values of {@link ScanPrefetcher#HBASE_CLIENT_SCANNER_QUEUE_LENGTH} + */ + public static final int DEFAULT_HBASE_CLIENT_SCANNER_QUEUE_LENGTH = 1; + + /** + * Key to conf of caching in ScanPrefetcher. It is the maximum number of Results in each element + * of the queue. If not specified, will use {@link HConstants#HBASE_CLIENT_SCANNER_CACHING} + */ + public static final String HBASE_SCAN_PREFETCHER_CACHING = "hbase.scan.prefetcher.caching"; + + // End of Scanning mark + private static final Result[] EOS = new Result[0]; + + // The thread pool + private final ExecutorService pool; + + // Temporary results list in main thread, may be null + private Result[] currentResults; + // The position of next unfetched results in currentResults if it is + // non-null. + private int currentPos; + // Whether this client has closed. + private boolean closed; + // The queue transferring fetched Result[] to main thread. + // When queue.take() returns an EOS, scanning ends. + private final ArrayBlockingQueue queue; + // A place storing Result[] in case the queue is full. + // If the Fetcher has to put results here, it'll be on idle then. + // If the main thread exchange some non-null results from justFetched, it'll restart the Fetcher. + private final AtomicReference justFetched = new AtomicReference<>(); + // Contains exception thrown in fetcher thread. + private final AtomicReference exception = new AtomicReference<>(); + // The variable informing fetching thread to stop + private final AtomicBoolean closing = new AtomicBoolean(false); + // Background fetcher + private final Runnable fetcher; + + /** + * Constructor. + * @param conf the configuration + * @param pool the thread pool + * @param scanner the original scanner. + */ + public ScanPrefetcher(Configuration conf, ExecutorService pool, ResultScanner scanner) { + this.pool = pool; + this.queue = new ArrayBlockingQueue<>(conf.getInt(HBASE_CLIENT_SCANNER_QUEUE_LENGTH, + DEFAULT_HBASE_CLIENT_SCANNER_QUEUE_LENGTH)); + + int caching = conf.getInt(HBASE_SCAN_PREFETCHER_CACHING, conf.getInt( + HConstants.HBASE_CLIENT_SCANNER_CACHING, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING)); + + this.fetcher = new Fetcher(scanner, closing, caching, queue, justFetched, exception); + pool.execute(fetcher); + } + + // Throws a Throwable exception as IOException or RuntimeException + private static void throwIOException(Throwable e) throws IOException { + if (e != null) { + if (e instanceof IOException) { + throw (IOException) e; + } else if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + throw new RuntimeException(e); + } + } + + /** + * Fetches results from queue to currentResults if it is null. + * @return true if more results available, false if end of scanning + */ + private boolean fetchFromQueue() throws IOException { + if (currentResults != null) { + return true; + } + + if (closed) { + return false; + } + + try { + // Check justFetched firstly. If we got non-null value, the Fetcher is on idle. + Result[] jf = justFetched.getAndSet(null); + + // this will block for background to scan + currentResults = queue.take(); + + if (currentResults.length == 0) { + // We've got an EOS (end of scanning) + closed = true; + currentResults = null; + + Throwable e = this.exception.get(); + if (e != null) { + // Failure of scanning + throwIOException(e); + } + + return false; + } + + if (jf != null) { + // Something is put justFetched because the queue was full when those + // results were fetched. The fetching task should not be running now. + // We can safely do the add here because the Fetcher is on idle and we take an element out + // of the queue. + queue.add(jf); + if (jf.length > 0) { + // not EOS, we should start the fetcher again + pool.execute(fetcher); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + // Results fetched + currentPos = 0; + return true; + } + + @Override + public Iterator iterator() { + return new ResultScannerIterator(this); + } + + @Override + public Result next() throws IOException { + if (!fetchFromQueue()) { + return null; + } + Result res = currentResults[currentPos]; + currentPos++; + + if (currentPos >= currentResults.length) { + // currentResults is drained off + currentResults = null; + } + return res; + } + + @Override + public Result[] next(int nbRows) throws IOException { + if (!fetchFromQueue()) { + return null; + } + + // In case, currentResults is just the results we want, return it directly + // to avoid extra resource allocation and copying. + if (currentPos == 0 && nbRows == currentResults.length) { + Result[] res = currentResults; + currentResults = null; + return res; + } + + Result[] res = new Result[nbRows]; + int len = 0; + + while (len < nbRows) { + // Move from currentResults + int n = Math.min(nbRows - len, currentResults.length - currentPos); + System.arraycopy(currentResults, currentPos, res, len, n); + + len += n; + currentPos += n; + + if (currentPos == currentResults.length) { + currentResults = null; + + if (!fetchFromQueue()) { + // Unexpected partial results, we have to make a copy. + return Arrays.copyOf(res, len); + } + } + } + + return res; + } + + @Override + public void close() { + if (this.closed) { + return; + } + this.closing.set(true); + try { + while (fetchFromQueue()) { + // skip all results, wait for EOS + currentResults = null; + } + } catch (Throwable e) { + LOG.debug("Exception on closing", e); + this.closed = true; + } + } + + /** + * A runnable that does background fetching. + */ + private static class Fetcher implements Runnable { + private final AtomicBoolean closing; + private final ResultScanner scanner; + private final ArrayBlockingQueue queue; + private final AtomicReference justFetched; + private final AtomicReference exception; + private final int caching; + + public Fetcher(ResultScanner scanner, AtomicBoolean closing, int caching, + ArrayBlockingQueue queue, AtomicReference justFetched, + AtomicReference exception) { + this.scanner = scanner; + this.closing = closing; + this.caching = caching; + this.queue = queue; + this.justFetched = justFetched; + this.exception = exception; + } + + /** + * Puts results in queue or justFetched. + * @return whether we should continue fetching in this run. + */ + private boolean putResults(Result[] results) { + if (!queue.offer(results)) { + // queue is full, put results in justFetched + justFetched.set(results); + + if (queue.isEmpty()) { + // It's possible the queue is empty before justFetched is set + // and the main thread is blocking on queue.Take(). + // We try move results in justFetched to queue here. + Result[] js = justFetched.getAndSet(null); + if (js != null) { + // Results in justFetched is not taken by main thread and queue is empty. Put it into + // queue and keep working. + queue.add(js); + return true; + } + // If js == null, it means the main thread moved justFetched to + // queue and arranged a new run. + } + // Then quit from this run and go idle. New run is submitted when some results + // are taken out of the queue + return false; + } + return true; + } + + @Override + public void run() { + try { + while (!closing.get()) { + Result[] results = scanner.next(caching); + if (results != null && results.length > 0) { + if (!putResults(results)) { + // queue is full, let the Fetcher have a rest. + return; + } + } else { + // scanning over. + break; + } + } + } catch (Throwable e) { + exception.set(e); + } + // We only get here when scanning is over or aborted + putResults(EOS); + + // We close the scanner after the EOS is set so the main thread will not wait for scanner to + // be closed. + // This also means even if the ScanPretcher.close is called, the underneath scanner may not + // have been closed. + try { + scanner.close(); + } catch (RuntimeException e) { + // We actually ignore the exception for closing the scanner. + LOG.warn("Exception caught when closing the scanner", e); + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanPrefetcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanPrefetcher.java new file mode 100644 index 0000000..7ee9798 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanPrefetcher.java @@ -0,0 +1,129 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Testcases for {@link ScanPrefetcher} + */ +public class TestScanPrefetcher { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @Before + public void setUp() throws Exception { + TEST_UTIL.startMiniCluster(3); + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Basic testcase. Generating 10000 rows distributed around regions and scan them out. + */ + @Test + public void testBasic() throws Exception { + final TableName TABLE = TableName.valueOf("testBasic"); + final byte[] FAMILY = Bytes.toBytes("f"); + final int N = 1000; + Configuration conf = TEST_UTIL.getConfiguration(); + + HTable htable = TEST_UTIL.createTable(TABLE, FAMILY); + for (int i = 0; i < N; i++) { + byte[] key = Bytes.toBytes(i); + // makes key[0] distributing all value spaces so that all regions contain data. + key[0] = (byte) (i * 256 / N); + + Put put = new Put(key).add(FAMILY, FAMILY, Bytes.toBytes(i)); + htable.put(put); + } + + ResultScanner clientScanner = htable.getScanner(new Scan()); + ScanPrefetcher scanner = new ScanPrefetcher(conf, htable.getPool(), clientScanner); + try { + int count = 0; + for (Result res : scanner) { + byte[] key = Bytes.copy(res.getRow()); + // assuming highest byte if i is zero, i.e. i is less than 0x01000000 + key[0] = 0; + int i = Bytes.toInt(key); + + byte[] value = res.getValue(FAMILY, FAMILY); + + Assert.assertEquals("value for " + Bytes.toStringBinary(res.getRow()), + Bytes.toStringBinary(Bytes.toBytes(i)), Bytes.toStringBinary(value)); + + count++; + } + Assert.assertEquals("count", N, count); + } finally { + scanner.close(); + } + + htable.close(); + } + + /** + * Similar to testBasic but will use nextRows to fetch all results as a whole. + */ + @Test + public void testNextRows() throws Exception { + final TableName TABLE = TableName.valueOf("testNextRows"); + final byte[] FAMILY = Bytes.toBytes("f"); + final int N = 1000; + Configuration conf = TEST_UTIL.getConfiguration(); + + HTable htable = TEST_UTIL.createTable(TABLE, FAMILY); + for (int i = 0; i < N; i++) { + byte[] key = Bytes.toBytes(i); + // makes key[0] distributing all value spaces so that all regions contain data. + key[0] = (byte) (i * 256 / N); + + Put put = new Put(key).add(FAMILY, FAMILY, Bytes.toBytes(i)); + htable.put(put); + } + + ResultScanner clientScanner = htable.getScanner(new Scan()); + ScanPrefetcher scanner = new ScanPrefetcher(conf, htable.getPool(), clientScanner); + try { + Result[] results = scanner.next(N); + Assert.assertEquals("count", N, results.length); + for (int i = 0; i < results.length; i++) { + Result res = results[i]; + byte[] value = res.getValue(FAMILY, FAMILY); + + Assert.assertEquals("value for " + Bytes.toStringBinary(res.getRow()), + Bytes.toStringBinary(Bytes.toBytes(i)), Bytes.toStringBinary(value)); + } + } finally { + scanner.close(); + } + + htable.close(); + } +} -- 1.9.3 (Apple Git-50)