From b812daf595bf663454c1e7a68940a56d61a17f5d Mon Sep 17 00:00:00 2001 From: eshcar Date: Mon, 20 Apr 2015 23:14:18 +0300 Subject: [PATCH] HBASE-13071: async scanner (rebase) --- .../hbase/client/ClientAsyncPrefetchScanner.java | 181 ++++++++++++++++++++ .../apache/hadoop/hbase/client/ClientScanner.java | 44 +++-- .../hadoop/hbase/client/ClientSimpleScanner.java | 54 ++++++ .../hadoop/hbase/client/ClientSmallScanner.java | 2 +- .../org/apache/hadoop/hbase/client/HTable.java | 17 +- .../hadoop/hbase/client/ReversedClientScanner.java | 2 +- .../java/org/apache/hadoop/hbase/client/Scan.java | 30 +++- .../hadoop/hbase/client/TableConfiguration.java | 16 +- .../hadoop/hbase/client/TestClientScanner.java | 10 ++ .../client/TestClientSmallReversedScanner.java | 32 ++-- .../hbase/client/TestClientSmallScanner.java | 34 ++-- .../hbase/client/TestScannersFromClientSide.java | 49 ++++++ 12 files changed, 402 insertions(+), 69 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java new file mode 100644 index 0000000..077631e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.util.Threads; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * ClientAsyncPrefetchScanner implements async scanner behaviour. + * Specifically, the cache used by this scanner is a concurrent queue which allows both + * the producer (hbase client) and consumer (application) to access the queue in parallel. + * This requires some synchronization. + * This class allocates a double buffer cache, that is, a cache of twice the caching factor. + * The prefetch is invoked when the cache is half­filled, instead of waiting for it to be empty, + * and returns n rows, where n is the caching factor (half the size of the buffer). + */ +@InterfaceAudience.Private +public class ClientAsyncPrefetchScanner extends ClientScanner { + + // exception queue (from prefetch to main scan execution) + private Queue exceptionsQueue; + // prefetch runnable object to be executed asynchronously + private PrefetchRunnable prefetchRunnable; + // Boolean flag to ensure only a single prefetch is running (per scan) + // eshcar: we use atomic boolean to allow multiple concurrent threads to + // consume records from the same cache, but still have a single prefetcher thread. + // For a single consumer thread this can be replace with a native boolean. + private AtomicBoolean prefetchRunning; + // an attribute for synchronizing close between scanner and prefetch threads + private AtomicLong closingThreadId; + private static final int NO_THREAD = -1; + + public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name, + ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, + RpcControllerFactory rpcControllerFactory, ExecutorService pool, + int replicaCallTimeoutMicroSecondScan) throws IOException { + super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, + replicaCallTimeoutMicroSecondScan); + } + + @Override + protected void initCache() { + // concurrent cache + cache = new LinkedBlockingQueue(getCacheCapacity()); + exceptionsQueue = new ConcurrentLinkedQueue(); + prefetchRunnable = new PrefetchRunnable(); + prefetchRunning = new AtomicBoolean(false); + closingThreadId = new AtomicLong(NO_THREAD); + } + + @Override + public Result next() throws IOException { + + try { + handleException(); + + // If the scanner is closed and there's nothing left in the cache, next is a no-op. + if (getCacheSize() == 0 && this.closed) { + return null; + } + if (getCacheSize() < getThresholdSize()) { + // run prefetch in the background only if no prefetch is already running + if (!isPrefetchRunning()) { + if (prefetchRunning.compareAndSet(false, true)) { + getPool().execute(prefetchRunnable); + } + } + } + + while (isPrefetchRunning()) { + // prefetch running or still pending + if (getCacheSize() > 0) { + return cache.poll(); + } else { + // (busy) wait for a record - sleep + Threads.sleep(1); + } + } + + if (getCacheSize() > 0) { + return cache.poll(); + } + + // if we exhausted this scanner before calling close, write out the scan metrics + writeScanMetrics(); + return null; + } finally { + handleException(); + } + } + + @Override + public void close() { + if (!scanMetricsPublished) writeScanMetrics(); + closed = true; + if (!isPrefetchRunning()) { + if(closingThreadId.compareAndSet(NO_THREAD, Thread.currentThread().getId())) { + super.close(); + } + } // else do nothing since the async prefetch still needs this resources + } + + private void handleException() throws IOException { + //The prefetch task running in the background puts any exception it + //catches into this exception queue. + // Rethrow the exception so the application can handle it. + while (!exceptionsQueue.isEmpty()) { + Exception first = exceptionsQueue.peek(); + first.printStackTrace(); + if (first instanceof IOException) { + throw (IOException) first; + } + throw (RuntimeException) first; + } + } + + private boolean isPrefetchRunning() { + return prefetchRunning.get(); + } + + // double buffer - double cache size + private int getCacheCapacity() { + int capacity = Integer.MAX_VALUE; + if(this.caching >= 0 && this.caching < (Integer.MAX_VALUE /2)) { + capacity = this.caching * 2 + 1; + } + return capacity; + } + + private int getThresholdSize() { + return getCacheCapacity() / 2 ; + } + + private class PrefetchRunnable implements Runnable { + + @Override + public void run() { + try { + loadCache(); + } catch (Exception e) { + exceptionsQueue.add(e); + } finally { + prefetchRunning.set(false); + if(closed) { + if (closingThreadId.compareAndSet(NO_THREAD, Thread.currentThread().getId())) { + // close was waiting for the prefetch to end + close(); + } + } + } + } + + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 6a129c7..9f47a1f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -17,26 +17,11 @@ */ package org.apache.hadoop.hbase.client; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.ExecutorService; - +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.NotServingRegionException; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -45,7 +30,10 @@ import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.Bytes; -import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.*; +import java.util.concurrent.ExecutorService; /** * Implements the scanner interface for the HBase client. @@ -53,8 +41,8 @@ import com.google.common.annotations.VisibleForTesting; * through them all. */ @InterfaceAudience.Private -public class ClientScanner extends AbstractClientScanner { - private final Log LOG = LogFactory.getLog(this.getClass()); +public abstract class ClientScanner extends AbstractClientScanner { + protected final Log LOG = LogFactory.getLog(this.getClass()); // A byte array in which all elements are the max byte, and it is used to // construct closest front row static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); @@ -64,7 +52,7 @@ public class ClientScanner extends AbstractClientScanner { // wonky: e.g. if it splits on us. protected HRegionInfo currentRegion = null; protected ScannerCallableWithReplicas callable = null; - protected final LinkedList cache = new LinkedList(); + protected Queue cache; /** * A list of partial results that have been returned from the server. This list should only * contain results if this scanner does not have enough partial results to form the complete @@ -151,9 +139,12 @@ public class ClientScanner extends AbstractClientScanner { this.rpcControllerFactory = controllerFactory; this.conf = conf; + initCache(); initializeScannerInConstruction(); } + protected abstract void initCache(); + protected void initializeScannerInConstruction() throws IOException{ // initialize the scanner nextScanner(this.caching, false); @@ -334,7 +325,7 @@ public class ClientScanner extends AbstractClientScanner { * * By default, scan metrics are disabled; if the application wants to collect them, this * behavior can be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)} - * + * *

This invocation clears the scan metrics. Metrics are aggregated in the Scan instance. */ protected void writeScanMetrics() { @@ -346,8 +337,11 @@ public class ClientScanner extends AbstractClientScanner { scanMetricsPublished = true; } - @Override - public Result next() throws IOException { + protected void initSyncCache() { + cache = new LinkedList(); + } + + protected Result nextWithSyncCache() throws IOException { // If the scanner is closed and there's nothing left in the cache, next is a no-op. if (cache.size() == 0 && this.closed) { return null; @@ -374,6 +368,8 @@ public class ClientScanner extends AbstractClientScanner { * Contact the servers to load more {@link Result}s in the cache. */ protected void loadCache() throws IOException { + // check if scanner was closed during previous prefetch + if (closed) return; Result[] values = null; long remainingResultSize = maxScannerResultSize; int countdown = this.caching; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java new file mode 100644 index 0000000..3998ac0 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.concurrent.ExecutorService; + +/** + * ClientSimpleScanner implements a sync scanner behaviour. + * The cache is a simple list. + * The prefetch is invoked only when the application finished processing the entire cache. + */ +@InterfaceAudience.Private +public class ClientSimpleScanner extends ClientScanner { + + public ClientSimpleScanner(Configuration configuration, Scan scan, TableName name, + ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, + RpcControllerFactory rpcControllerFactory, ExecutorService pool, + int replicaCallTimeoutMicroSecondScan) throws IOException { + super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, + replicaCallTimeoutMicroSecondScan); + } + + @Override + protected void initCache() { + initSyncCache(); + } + + @Override + public Result next() throws IOException { + return nextWithSyncCache(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java index 77321b0..7a5b4f0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java @@ -50,7 +50,7 @@ import com.google.protobuf.ServiceException; * For small scan, it will get better performance than {@link ClientScanner} */ @InterfaceAudience.Private -public class ClientSmallScanner extends ClientScanner { +public class ClientSmallScanner extends ClientSimpleScanner { private final Log LOG = LogFactory.getLog(this.getClass()); private ScannerCallableWithReplicas smallScanCallable = null; private SmallScannerCallableFactory callableFactory; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 434e32f..a8c518c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -616,6 +616,11 @@ public class HTable implements HTableInterface { scan.setCaching(scannerCaching); } + Boolean async = scan.isAsyncPrefetch(); + if (async == null) { + async = tableConfiguration.isClientScannerAsyncPrefetch(); + } + if (scan.isReversed()) { if (scan.isSmall()) { return new ClientSmallReversedScanner(getConfiguration(), scan, getName(), @@ -633,9 +638,15 @@ public class HTable implements HTableInterface { this.connection, this.rpcCallerFactory, this.rpcControllerFactory, pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); } else { - return new ClientScanner(getConfiguration(), scan, getName(), this.connection, - this.rpcCallerFactory, this.rpcControllerFactory, - pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + if (async) { + return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection, + this.rpcCallerFactory, this.rpcControllerFactory, + pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + } else { + return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection, + this.rpcCallerFactory, this.rpcControllerFactory, + pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index ef4b89d..dde82ba 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.util.ExceptionUtil; * A reversed client scanner which support backward scanning */ @InterfaceAudience.Private -public class ReversedClientScanner extends ClientScanner { +public class ReversedClientScanner extends ClientSimpleScanner { private static final Log LOG = LogFactory.getLog(ReversedClientScanner.class); /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 3b6194f..0b51150 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -145,8 +145,23 @@ public class Scan extends Query { private Map> familyMap = new TreeMap>(Bytes.BYTES_COMPARATOR); private Boolean loadColumnFamiliesOnDemand = null; + private Boolean asyncPrefetch = null; /** + * Parameter name for client scanner sync/async prefetch toggle. + * When using async scanner, prefetching data from the server is done at the background. + * The parameter currently won't have any effect in the case that the user has set + * Scan#setSmall or Scan#setReversed + */ + public static final String HBASE_CLIENT_SCANNER_ASYNC_PREFETCH = + "hbase.client.scanner.async.prefetch"; + + /** + * Default value of {@link #HBASE_CLIENT_SCANNER_ASYNC_PREFETCH}. + */ + public static final boolean DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH = false; + + /** * Set it true for small scan to get better performance * * Small scan should use pread and big scan can use seek + read @@ -255,6 +270,7 @@ public class Scan extends Query { this.tr = get.getTimeRange(); this.familyMap = get.getFamilyMap(); this.getScan = true; + this.asyncPrefetch = false; this.consistency = get.getConsistency(); for (Map.Entry attr : get.getAttributesMap().entrySet()) { setAttribute(attr.getKey(), attr.getValue()); @@ -971,4 +987,16 @@ public class Scan extends Query { if (bytes == null) return null; return ProtobufUtil.toScanMetrics(bytes); } -} \ No newline at end of file + + + public Boolean isAsyncPrefetch() { + return asyncPrefetch; + } + + public Scan setAsyncPrefetch(boolean asyncPrefetch) { + this.asyncPrefetch = asyncPrefetch; + return this; + } + + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java index 70ad179..7a702aa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java @@ -42,7 +42,10 @@ public class TableConfiguration { private final int retries; private final int maxKeyValueSize; - /** + // toggle for async/sync prefetch + private final boolean clientScannerAsyncPrefetch; + + /** * Constructor * @param conf Configuration object */ @@ -68,6 +71,9 @@ public class TableConfiguration { this.retries = conf.getInt( HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.clientScannerAsyncPrefetch = conf.getBoolean( + Scan.HBASE_CLIENT_SCANNER_ASYNC_PREFETCH, Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH); + this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); } @@ -85,6 +91,7 @@ public class TableConfiguration { this.primaryCallTimeoutMicroSecond = 10000; this.replicaCallTimeoutMicroSecondScan = 1000000; this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; + this.clientScannerAsyncPrefetch = Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH; this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT; } @@ -119,4 +126,9 @@ public class TableConfiguration { public int getMaxKeyValueSize() { return maxKeyValueSize; } -} + + public boolean isClientScannerAsyncPrefetch() { + return clientScannerAsyncPrefetch; + } + +} \ No newline at end of file diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index a91def3..3f406df 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.LinkedList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -121,6 +122,15 @@ public class TestClientScanner { public void setRpcFinished(boolean rpcFinished) { this.rpcFinished = rpcFinished; } + + @Override + protected void initCache() { + initSyncCache(); + } + + @Override public Result next() throws IOException { + return nextWithSyncCache(); + } } @Test diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java index 4611d08..082090e 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java @@ -17,23 +17,9 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -46,6 +32,16 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + /** * Test the ClientSmallReversedScanner. */ @@ -178,7 +174,7 @@ public class TestClientSmallReversedScanner { csrs.loadCache(); - List results = csrs.cache; + Queue results = csrs.cache; Iterator iter = results.iterator(); assertEquals(3, results.size()); for (int i = 3; i >= 1 && iter.hasNext(); i--) { @@ -248,7 +244,7 @@ public class TestClientSmallReversedScanner { csrs.loadCache(); - List results = csrs.cache; + Queue results = csrs.cache; Iterator iter = results.iterator(); assertEquals(2, results.size()); for (int i = 3; i >= 2 && iter.hasNext(); i--) { @@ -264,7 +260,7 @@ public class TestClientSmallReversedScanner { csrs.loadCache(); assertEquals(1, results.size()); - Result result = results.get(0); + Result result = results.peek(); assertEquals("row1", new String(result.getRow(), StandardCharsets.UTF_8)); assertEquals(1, result.getMap().size()); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java index 90bf4bb..318fbe7 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java @@ -17,22 +17,9 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -45,6 +32,15 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + /** * Test the ClientSmallScanner. */ @@ -176,10 +172,10 @@ public class TestClientSmallScanner { css.loadCache(); - List results = css.cache; + Queue results = css.cache; assertEquals(3, results.size()); for (int i = 1; i <= 3; i++) { - Result result = results.get(i - 1); + Result result = results.poll(); byte[] row = result.getRow(); assertEquals("row" + i, new String(row, StandardCharsets.UTF_8)); assertEquals(1, result.getMap().size()); @@ -243,10 +239,10 @@ public class TestClientSmallScanner { css.loadCache(); - List results = css.cache; + Queue results = css.cache; assertEquals(2, results.size()); for (int i = 1; i <= 2; i++) { - Result result = results.get(i - 1); + Result result = results.poll(); byte[] row = result.getRow(); assertEquals("row" + i, new String(row, StandardCharsets.UTF_8)); assertEquals(1, result.getMap().size()); @@ -258,7 +254,7 @@ public class TestClientSmallScanner { css.loadCache(); assertEquals(1, results.size()); - Result result = results.get(0); + Result result = results.peek(); assertEquals("row3", new String(result.getRow(), StandardCharsets.UTF_8)); assertEquals(1, result.getMap().size()); assertTrue(css.closed); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index 1e93933..c23a638 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -592,6 +592,55 @@ public class TestScannersFromClientSide { verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region"); } + /** + * Test from client side for async scan + * + * @throws Exception + */ + @Test + public void testAsyncScanner() throws Exception { + byte [] TABLE = Bytes.toBytes("testAsyncScan"); + byte [][] ROWS = HTestConst.makeNAscii(ROW, 2); + byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); + byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10); + + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES); + + Put put; + Scan scan; + Result result; + boolean toLog = true; + List kvListExp, kvListScan; + + kvListExp = new ArrayList(); + + for (int r=0; r < ROWS.length; r++) { + put = new Put(ROWS[r]); + for (int c=0; c < FAMILIES.length; c++) { + for (int q=0; q < QUALIFIERS.length; q++) { + KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE); + put.add(kv); + kvListExp.add(kv); + } + } + ht.put(put); + } + + scan = new Scan(); + scan.setAsyncPrefetch(true); + ResultScanner scanner = ht.getScanner(scan); + kvListScan = new ArrayList(); + while ((result = scanner.next()) != null) { + for (Cell kv : result.listCells()) { + kvListScan.add(kv); + } + } + result = Result.create(kvListScan); + assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner); + verifyResult(result, kvListExp, toLog, "Testing async scan"); + + } + static void verifyResult(Result result, List expKvList, boolean toLog, String msg) { -- 1.7.10.2 (Apple Git-33)