From 1b5eb3dfa6a916e3116e6b639b62309f5839f5e4 Mon Sep 17 00:00:00 2001 From: eshcar Date: Thu, 14 May 2015 14:44:49 +0300 Subject: [PATCH] HBASE-13071 branch 0.98 --- .../hbase/client/ClientAsyncPrefetchScanner.java | 242 ++++++++++++++++++++ .../apache/hadoop/hbase/client/ClientScanner.java | 52 +++-- .../hadoop/hbase/client/ClientSimpleScanner.java | 55 +++++ .../hadoop/hbase/client/ClientSmallScanner.java | 11 +- .../org/apache/hadoop/hbase/client/HTable.java | 61 ++--- .../hadoop/hbase/client/ReversedClientScanner.java | 2 +- .../java/org/apache/hadoop/hbase/client/Scan.java | 25 ++ .../hadoop/hbase/client/TableConfiguration.java | 14 +- .../client/TestClientSmallReversedScanner.java | 22 +- .../hbase/client/TestClientSmallScanner.java | 25 +- .../hbase/client/TestScannersFromClientSide.java | 49 ++++ 11 files changed, 492 insertions(+), 66 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java new file mode 100644 index 0000000..8a43214 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.util.Threads; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * ClientAsyncPrefetchScanner implements async scanner behaviour. + * Specifically, the cache used by this scanner is a concurrent queue which allows both + * the producer (hbase client) and consumer (application) to access the queue in parallel. + * The number of rows returned in a prefetch is defined by the caching factor and the result size + * factor. + * This class allocates a buffer cache, whose size is a function of both factors. + * The prefetch is invoked when the cache is half­filled, instead of waiting for it to be empty. + * This is defined in the method {@link ClientAsyncPrefetchScanner#prefetchCondition()}. + */ +@InterfaceAudience.Private +public class ClientAsyncPrefetchScanner extends ClientScanner { + + private static final int ESTIMATED_SINGLE_RESULT_SIZE = 1024; + private static final int DEFAULT_QUEUE_CAPACITY = 1024; + + private int cacheCapacity; + private AtomicLong cacheSizeInBytes; + // exception queue (from prefetch to main scan execution) + private Queue exceptionsQueue; + // prefetch runnable object to be executed asynchronously + private PrefetchRunnable prefetchRunnable; + // Boolean flag to ensure only a single prefetch is running (per scan) + // We use atomic boolean to allow multiple concurrent threads to + // consume records from the same cache, but still have a single prefetcher thread. + // For a single consumer thread this can be replace with a native boolean. + private AtomicBoolean prefetchRunning; + // an attribute for synchronizing close between scanner and prefetch threads + private AtomicLong closingThreadId; + private static final int NO_THREAD = -1; + // Thread pool shared by all scanners + private static final ExecutorService pool = Executors.newCachedThreadPool(); + + public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name, + HConnection connection, RpcRetryingCallerFactory rpcCallerFactory, + RpcControllerFactory rpcControllerFactory) throws IOException { + super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory); + } + + @Override + protected void initCache() { + // concurrent cache + cacheCapacity = calcCacheCapacity(); + cache = new LinkedBlockingQueue(cacheCapacity); + cacheSizeInBytes = new AtomicLong(0); + exceptionsQueue = new ConcurrentLinkedQueue(); + prefetchRunnable = new PrefetchRunnable(); + prefetchRunning = new AtomicBoolean(false); + closingThreadId = new AtomicLong(NO_THREAD); + } + + @Override + public Result next() throws IOException { + + try { + handleException(); + + // If the scanner is closed and there's nothing left in the cache, next is a no-op. + if (getCacheCount() == 0 && this.closed) { + return null; + } + if (prefetchCondition()) { + // run prefetch in the background only if no prefetch is already running + if (!isPrefetchRunning()) { + if (prefetchRunning.compareAndSet(false, true)) { + getPool().execute(prefetchRunnable); + } + } + } + + while (isPrefetchRunning()) { + // prefetch running or still pending + if (getCacheCount() > 0) { + return pollCache(); + } else { + // (busy) wait for a record - sleep + Threads.sleep(1); + } + } + + if (getCacheCount() > 0) { + return pollCache(); + } + + // if we exhausted this scanner before calling close, write out the scan metrics + writeScanMetrics(); + return null; + } finally { + handleException(); + } + } + + @Override + public void close() { + if (!scanMetricsPublished) writeScanMetrics(); + closed = true; + if (!isPrefetchRunning()) { + if(closingThreadId.compareAndSet(NO_THREAD, Thread.currentThread().getId())) { + super.close(); + } + } // else do nothing since the async prefetch still needs this resources + } + + @Override + public int getCacheCount() { + if(cache != null) { + int size = cache.size(); + if(size > cacheCapacity) { + cacheCapacity = size; + } + return size; + } else { + return 0; + } + } + + @Override + protected void addEstimatedSize(long estimatedSize) { + cacheSizeInBytes.addAndGet(estimatedSize); + } + + private void handleException() throws IOException { + //The prefetch task running in the background puts any exception it + //catches into this exception queue. + // Rethrow the exception so the application can handle it. + while (!exceptionsQueue.isEmpty()) { + Exception first = exceptionsQueue.peek(); + first.printStackTrace(); + if (first instanceof IOException) { + throw (IOException) first; + } + throw (RuntimeException) first; + } + } + + private boolean isPrefetchRunning() { + return prefetchRunning.get(); + } + + // double buffer - double cache size + private int calcCacheCapacity() { + int capacity = Integer.MAX_VALUE; + if(caching > 0 && caching < (Integer.MAX_VALUE /2)) { + capacity = caching * 2 + 1; + } + if(capacity == Integer.MAX_VALUE){ + if(maxScannerResultSize != Integer.MAX_VALUE) { + capacity = (int) (maxScannerResultSize / ESTIMATED_SINGLE_RESULT_SIZE); + } + else { + capacity = DEFAULT_QUEUE_CAPACITY; + } + } + return capacity; + } + + private boolean prefetchCondition() { + return + (getCacheCount() < getCountThreshold()) && + (maxScannerResultSize == Long.MAX_VALUE || + getCacheSizeInBytes() < getSizeThreshold()) ; + } + + private int getCountThreshold() { + return cacheCapacity / 2 ; + } + + private long getSizeThreshold() { + return maxScannerResultSize / 2 ; + } + + private long getCacheSizeInBytes() { + return cacheSizeInBytes.get(); + } + + private ExecutorService getPool() { + return pool; + } + + private Result pollCache() { + Result res = cache.poll(); + long estimatedSize = calcEstimatedSize(res); + addEstimatedSize(-estimatedSize); + return res; + } + + private class PrefetchRunnable implements Runnable { + + @Override + public void run() { + try { + loadCache(); + } catch (Exception e) { + exceptionsQueue.add(e); + } finally { + prefetchRunning.set(false); + if(closed) { + if (closingThreadId.compareAndSet(NO_THREAD, Thread.currentThread().getId())) { + // close was waiting for the prefetch to end + close(); + } + } + } + } + + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index cced035..9037cae 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -17,14 +17,9 @@ */ package org.apache.hadoop.hbase.client; -import java.io.IOException; -import java.util.LinkedList; - import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -35,6 +30,8 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -42,6 +39,10 @@ import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.Bytes; +import java.io.IOException; +import java.util.LinkedList; +import java.util.Queue; + /** * Implements the scanner interface for the HBase client. * If there are multiple regions in a table, this scanner will iterate @@ -49,7 +50,7 @@ import org.apache.hadoop.hbase.util.Bytes; */ @InterfaceAudience.Public @InterfaceStability.Stable -public class ClientScanner extends AbstractClientScanner { +public abstract class ClientScanner extends AbstractClientScanner { private final Log LOG = LogFactory.getLog(this.getClass()); protected Scan scan; protected boolean closed = false; @@ -57,7 +58,7 @@ public class ClientScanner extends AbstractClientScanner { // wonky: e.g. if it splits on us. protected HRegionInfo currentRegion = null; protected ScannerCallable callable = null; - protected final LinkedList cache = new LinkedList(); + protected Queue cache; protected final int caching; protected long lastNext; // Keep lastResult returned successfully in case we have to reset scanner. @@ -167,6 +168,7 @@ public class ClientScanner extends AbstractClientScanner { HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); + initCache(); // check if application wants to collect scan metrics initScanMetrics(scan); @@ -185,6 +187,8 @@ public class ClientScanner extends AbstractClientScanner { initializeScannerInConstruction(); } + protected abstract void initCache(); + protected void initializeScannerInConstruction() throws IOException{ // initialize the scanner nextScanner(this.caching, false); @@ -328,8 +332,11 @@ public class ClientScanner extends AbstractClientScanner { scanMetricsPublished = true; } - @Override - public Result next() throws IOException { + protected void initSyncCache() { + cache = new LinkedList(); + } + + protected Result nextWithSyncCache() throws IOException { // If the scanner is closed and there's nothing left in the cache, next is a no-op. if (cache.size() == 0 && this.closed) { return null; @@ -356,6 +363,8 @@ public class ClientScanner extends AbstractClientScanner { * Contact the servers to load more {@link Result}s in the cache. */ protected void loadCache() throws IOException { + // check if scanner was closed during previous prefetch + if (closed) return; Result [] values = null; long remainingResultSize = maxScannerResultSize; int countdown = this.caching; @@ -457,11 +466,10 @@ public class ClientScanner extends AbstractClientScanner { if (values != null && values.length > 0) { for (Result rs : values) { cache.add(rs); - for (Cell kv : rs.rawCells()) { - // TODO make method in Cell or CellUtil - remainingResultSize -= KeyValueUtil.ensureKeyValue(kv).heapSize(); - } + long estimatedHeapSizeOfResult = calcEstimatedSize(rs); countdown--; + remainingResultSize -= estimatedHeapSizeOfResult; + addEstimatedSize(estimatedHeapSizeOfResult); this.lastResult = rs; } } @@ -500,7 +508,25 @@ public class ClientScanner extends AbstractClientScanner { callable = null; } closed = true; + } + + protected long calcEstimatedSize(Result rs) { + long estimatedHeapSizeOfResult = 0; + // We don't make Iterator here + for (Cell cell : rs.rawCells()) { + estimatedHeapSizeOfResult += KeyValueUtil.ensureKeyValue(cell).heapSize(); } + return estimatedHeapSizeOfResult; + } + + protected void addEstimatedSize(long estimatedHeapSizeOfResult) { + return; + } + + @VisibleForTesting + public int getCacheCount() { + return cache != null ? cache.size() : 0; + } @Override public boolean renewLease() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java new file mode 100644 index 0000000..c1162f9 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; + +import java.io.IOException; + +/** + * ClientSimpleScanner implements a sync scanner behaviour. + * The cache is a simple list. + * The prefetch is invoked only when the application finished processing the entire cache. + */ +@InterfaceAudience.Private +public class ClientSimpleScanner extends ClientScanner { + + public ClientSimpleScanner(Configuration configuration, Scan scan, TableName name, + HConnection connection, RpcRetryingCallerFactory rpcCallerFactory, + RpcControllerFactory rpcControllerFactory) throws IOException { + super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory); + } + + public ClientSimpleScanner(Configuration conf, Scan scan, TableName tableName, + HConnection connection) throws IOException { + super(conf, scan, tableName, connection); + } + + @Override + protected void initCache() { + initSyncCache(); + } + + @Override + public Result next() throws IOException { + return nextWithSyncCache(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java index e6c2c8e..014cd62 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java @@ -18,9 +18,8 @@ */ package org.apache.hadoop.hbase.client; -import java.io.IOException; -import java.io.InterruptedIOException; - +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -40,8 +39,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.util.Bytes; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ServiceException; +import java.io.IOException; +import java.io.InterruptedIOException; /** * Client scanner for small scan. Generally, only one RPC is called to fetch the @@ -52,7 +51,7 @@ import com.google.protobuf.ServiceException; */ @InterfaceAudience.Public @InterfaceStability.Evolving -public class ClientSmallScanner extends ClientScanner { +public class ClientSmallScanner extends ClientSimpleScanner { private final Log LOG = LogFactory.getLog(this.getClass()); private RegionServerCallable smallScanCallable = null; private SmallScannerCallableFactory callableFactory; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 7f641fb..fabf76e7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -18,30 +18,13 @@ */ package org.apache.hadoop.hbase.client; -import java.io.Closeable; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - +import com.google.protobuf.Descriptors; import com.google.protobuf.InvalidProtocolBufferException; - +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -54,6 +37,8 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.filter.BinaryComparator; @@ -74,10 +59,23 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; +import java.io.Closeable; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** *

Used to communicate with a single HBase table. An implementation of @@ -766,6 +764,11 @@ public class HTable implements HTableInterface { scan.setMaxResultSize(scannerMaxResultSize); } + Boolean async = scan.isAsyncPrefetch(); + if (async == null) { + async = tableConfiguration.isClientScannerAsyncPrefetch(); + } + if (scan.isReversed()) { if (scan.isSmall()) { return new ClientSmallReversedScanner(getConfiguration(), scan, getName(), @@ -778,7 +781,13 @@ public class HTable implements HTableInterface { if (scan.isSmall()) { return new ClientSmallScanner(getConfiguration(), scan, getName(), this.connection); } else { - return new ClientScanner(getConfiguration(), scan, getName(), this.connection); + if (async) { + return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection, + this.rpcCallerFactory, this.rpcControllerFactory); + } else { + return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection, + this.rpcCallerFactory, this.rpcControllerFactory); + } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index 206bbed..0a7cc78 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hbase.util.ExceptionUtil; */ @InterfaceAudience.Public @InterfaceStability.Evolving -public class ReversedClientScanner extends ClientScanner { +public class ReversedClientScanner extends ClientSimpleScanner { private static final Log LOG = LogFactory.getLog(ReversedClientScanner.class); // A byte array in which all elements are the max byte, and it is used to // construct closest front row diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 1ab3e92..635b577 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -138,8 +138,23 @@ public class Scan extends Query { private Map> familyMap = new TreeMap>(Bytes.BYTES_COMPARATOR); private Boolean loadColumnFamiliesOnDemand = null; + private Boolean asyncPrefetch = null; /** + * Parameter name for client scanner sync/async prefetch toggle. + * When using async scanner, prefetching data from the server is done at the background. + * The parameter currently won't have any effect in the case that the user has set + * Scan#setSmall or Scan#setReversed + */ + public static final String HBASE_CLIENT_SCANNER_ASYNC_PREFETCH = + "hbase.client.scanner.async.prefetch"; + + /** + * Default value of {@link #HBASE_CLIENT_SCANNER_ASYNC_PREFETCH}. + */ + public static final boolean DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH = false; + + /** * Set it true for small scan to get better performance * * Small scan should use pread and big scan can use seek + read @@ -247,6 +262,7 @@ public class Scan extends Query { this.tr = get.getTimeRange(); this.familyMap = get.getFamilyMap(); this.getScan = true; + this.asyncPrefetch = false; for (Map.Entry attr : get.getAttributesMap().entrySet()) { setAttribute(attr.getKey(), attr.getValue()); } @@ -772,4 +788,13 @@ public class Scan extends Query { public boolean isSmall() { return small; } + + public Boolean isAsyncPrefetch() { + return asyncPrefetch; + } + + public Scan setAsyncPrefetch(boolean asyncPrefetch) { + this.asyncPrefetch = asyncPrefetch; + return this; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java index 9789cbf..0316c16 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java @@ -50,7 +50,10 @@ public class TableConfiguration { private final int maxKeyValueSize; - /** + // toggle for async/sync prefetch + private final boolean clientScannerAsyncPrefetch; + + /** * Constructor * @param conf Configuration object */ @@ -73,6 +76,8 @@ public class TableConfiguration { this.retries = conf.getInt( HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.clientScannerAsyncPrefetch = conf.getBoolean( + Scan.HBASE_CLIENT_SCANNER_ASYNC_PREFETCH, Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH); this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1); } @@ -89,6 +94,7 @@ public class TableConfiguration { this.scannerCaching = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING; this.scannerMaxResultSize = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE; this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; + this.clientScannerAsyncPrefetch = Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH; this.maxKeyValueSize = -1; } @@ -119,4 +125,8 @@ public class TableConfiguration { public int getMaxKeyValueSize() { return maxKeyValueSize; } -} \ No newline at end of file + + public boolean isClientScannerAsyncPrefetch() { + return clientScannerAsyncPrefetch; + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java index 061d6ce..486a88c 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java @@ -27,12 +27,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -45,6 +41,16 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + /** * Test the ClientSmallReversedScanner. */ @@ -180,7 +186,7 @@ public class TestClientSmallReversedScanner { csrs.loadCache(); - List results = csrs.cache; + Queue results = csrs.cache; Iterator iter = results.iterator(); assertEquals(3, results.size()); for (int i = 3; i >= 1 && iter.hasNext(); i--) { @@ -257,7 +263,7 @@ public class TestClientSmallReversedScanner { csrs.loadCache(); - List results = csrs.cache; + Queue results = csrs.cache; Iterator iter = results.iterator(); assertEquals(2, results.size()); for (int i = 3; i >= 2 && iter.hasNext(); i--) { @@ -273,7 +279,7 @@ public class TestClientSmallReversedScanner { csrs.loadCache(); assertEquals(1, results.size()); - Result result = results.get(0); + Result result = results.peek(); assertEquals("row1", new String(result.getRow(), "UTF-8")); assertEquals(1, result.getMap().size()); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java index debdc59..7912a44 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java @@ -26,12 +26,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -44,6 +40,15 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + /** * Test the ClientSmallScanner. */ @@ -175,10 +180,10 @@ public class TestClientSmallScanner { css.loadCache(); - List results = css.cache; + Queue results = css.cache; assertEquals(3, results.size()); for (int i = 1; i <= 3; i++) { - Result result = results.get(i - 1); + Result result = results.poll(); byte[] row = result.getRow(); assertEquals("row" + i, new String(row, "UTF-8")); assertEquals(1, result.getMap().size()); @@ -246,10 +251,10 @@ public class TestClientSmallScanner { css.loadCache(); - List results = css.cache; + Queue results = css.cache; assertEquals(2, results.size()); for (int i = 1; i <= 2; i++) { - Result result = results.get(i - 1); + Result result = results.poll(); byte[] row = result.getRow(); assertEquals("row" + i, new String(row, "UTF-8")); assertEquals(1, result.getMap().size()); @@ -261,7 +266,7 @@ public class TestClientSmallScanner { css.loadCache(); assertEquals(1, results.size()); - Result result = results.get(0); + Result result = results.peek(); assertEquals("row3", new String(result.getRow(), "UTF-8")); assertEquals(1, result.getMap().size()); assertTrue(css.closed); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index 7003f83..dfc5b0c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -651,6 +651,55 @@ public class TestScannersFromClientSide { clientScanner.getCacheSize() <= 1); } + /** + * Test from client side for async scan + * + * @throws Exception + */ + @Test + public void testAsyncScanner() throws Exception { + byte [] TABLE = Bytes.toBytes("testAsyncScan"); + byte [][] ROWS = HTestConst.makeNAscii(ROW, 2); + byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); + byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10); + + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES); + + Put put; + Scan scan; + Result result; + boolean toLog = true; + List kvListExp, kvListScan; + + kvListExp = new ArrayList(); + + for (int r=0; r < ROWS.length; r++) { + put = new Put(ROWS[r]); + for (int c=0; c < FAMILIES.length; c++) { + for (int q=0; q < QUALIFIERS.length; q++) { + KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE); + put.add(kv); + kvListExp.add(kv); + } + } + ht.put(put); + } + + scan = new Scan(); + scan.setAsyncPrefetch(true); + ResultScanner scanner = ht.getScanner(scan); + kvListScan = new ArrayList(); + while ((result = scanner.next()) != null) { + for (Cell kv : result.listCells()) { + kvListScan.add(kv); + } + } + result = Result.create(kvListScan); + assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner); + verifyResult(result, kvListExp, toLog, "Testing async scan"); + + } + static void verifyResult(Result result, List expKvList, boolean toLog, String msg) { -- 1.7.10.2 (Apple Git-33)