From 033a614664ef69d2d51e419369d66d054095f8fb Mon Sep 17 00:00:00 2001 From: "han.congcong" Date: Fri, 21 Dec 2018 16:44:01 +0800 Subject: [PATCH] ClientAsyncPrefetchScanner improvement --- .../client/ClientAsyncPrefetchScanner.java | 133 ++++++++++-------- .../apache/hadoop/hbase/client/HTable.java | 3 +- .../org/apache/hadoop/hbase/client/Scan.java | 33 +++++ .../DifferentScanThreadException.java | 13 ++ .../client/TestScannersFromClientSide.java | 41 +++++- 5 files changed, 163 insertions(+), 60 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/DifferentScanThreadException.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java index e5af8717a8..d51af41214 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java @@ -17,28 +17,27 @@ */ package org.apache.hadoop.hbase.client; +import org.apache.hadoop.hbase.exceptions.DifferentScanThreadException; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; import java.io.IOException; -import java.io.InterruptedIOException; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; +import java.util.function.BiFunction; import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.util.Threads; /** * ClientAsyncPrefetchScanner implements async scanner behaviour. @@ -52,26 +51,47 @@ import org.apache.hadoop.hbase.util.Threads; */ @InterfaceAudience.Private public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { + private static final String BLOCKER = "block by async scanner"; private long maxCacheSize; private AtomicLong cacheSizeInBytes; // exception queue (from prefetch to main scan execution) private Queue exceptionsQueue; - // prefetch thread to be executed asynchronously - private Thread prefetcher; + // prefetch pool to execute prefetchRunnable asynchronously + private ExecutorService prefetchPool; + // when to prefetch result + private BiFunction prefetchCondition; // used for testing private Consumer prefetchListener; - - private final Lock lock = new ReentrantLock(); - private final Condition notEmpty = lock.newCondition(); - private final Condition notFull = lock.newCondition(); + // fetch is running or not + private AtomicBoolean fetching = new AtomicBoolean(false); + // a prefetchRunnable is added to pool or not + private volatile boolean added = false; + //cache the first scan thread to avoid a thread block forever + private AtomicReference scanThread = new AtomicReference<>(); public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name, ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcControllerFactory, ExecutorService pool, - int replicaCallTimeoutMicroSecondScan) throws IOException { + int replicaCallTimeoutMicroSecondScan,ExecutorService prefetchPool, + BiFunction preFetchCondition) throws IOException { super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, replicaCallTimeoutMicroSecondScan); + + if(prefetchPool == null){ + this.prefetchPool = getPool(); + }else{ + this.prefetchPool = prefetchPool; + } + + if(preFetchCondition == null){ + this.prefetchCondition = (maxCacheSize,currentCacheSize)->this.prefetchCondition(); + }else{ + this.prefetchCondition = preFetchCondition; + } + + this.prefetchPool.submit(new PrefetchRunnable()); + added = true; } @VisibleForTesting @@ -86,8 +106,6 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { cache = new LinkedBlockingQueue<>(); cacheSizeInBytes = new AtomicLong(0); exceptionsQueue = new ConcurrentLinkedQueue<>(); - prefetcher = new Thread(new PrefetchRunnable()); - Threads.setDaemonThreadRunning(prefetcher, tableName + ".asyncPrefetcher"); } private long resultSize2CacheSize(long maxResultSize) { @@ -97,41 +115,37 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { @Override public Result next() throws IOException { - try { - lock.lock(); + if(!scanThread.compareAndSet(null,Thread.currentThread()) && scanThread.get() != Thread.currentThread()){ + throw new DifferentScanThreadException("not same thread"); + } + handleException(); + boolean acquire = false; + try{ while (cache.isEmpty()) { - handleException(); if (this.closed) { return null; } - try { - notEmpty.await(); - } catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted when wait to load cache"); + if (fetching.get()) { + LockSupport.park(BLOCKER); + } else if (fetching.compareAndSet(false,true)) { + acquire = true; + loadCache(); } } - Result result = pollCache(); - if (prefetchCondition()) { - notFull.signalAll(); + Result result = this.pollCache(); + + if(this.prefetchCondition.apply(maxCacheSize,cacheSizeInBytes.get())){ + if(!added){ + added=true; + this.prefetchPool.submit(new PrefetchRunnable()); + } } return result; - } finally { - lock.unlock(); - handleException(); - } - } - - @Override - public void close() { - try { - lock.lock(); - super.close(); - closed = true; - notFull.signalAll(); - notEmpty.signalAll(); - } finally { - lock.unlock(); + }finally { + if(acquire){ + fetching.compareAndSet(true,false); + } } } @@ -168,24 +182,31 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { @Override public void run() { - while (!closed) { - boolean succeed = false; - try { - lock.lock(); - while (!prefetchCondition()) { - notFull.await(); - } - loadCache(); - succeed = true; - } catch (Exception e) { - exceptionsQueue.add(e); - } finally { - notEmpty.signalAll(); - lock.unlock(); - if (prefetchListener != null) { - prefetchListener.accept(succeed); + boolean acquire = false; + boolean succeed = false; + try { + if (fetching.compareAndSet(false, true)) { + acquire = true; + while (!closed && prefetchCondition.apply(maxCacheSize, cacheSizeInBytes.get())) { + loadCache(); + succeed = true; + if(scanThread.get() != null) { + LockSupport.unpark(scanThread.get()); + } } } + }catch (Exception e){ + exceptionsQueue.add(e); + if(prefetchListener != null){ + prefetchListener.accept(succeed); + } + }finally { + if(acquire){ + fetching.compareAndSet(true,false); + } + if(scanThread.get() != null){ + LockSupport.unpark(scanThread.get()); + } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index fb69a2530b..70d14d03a1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -323,7 +323,8 @@ public class HTable implements Table { if (async) { return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan(), + scan.getPrefetchExecutor(),scan.getPrefetchCondition()); } else { return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index d4aff047a3..e9f539a375 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -28,6 +28,8 @@ import java.util.Map; import java.util.NavigableSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.ExecutorService; +import java.util.function.BiFunction; import org.apache.hadoop.hbase.HConstants; import org.apache.yetus.audience.InterfaceAudience; @@ -145,6 +147,15 @@ public class Scan extends Query { private Map> familyMap = new TreeMap>(Bytes.BYTES_COMPARATOR); private Boolean asyncPrefetch = null; + /** + * prefetch executor + */ + private ExecutorService prefetchExecutor; + + /** + * function that tells prefetch thread when to start fetching + */ + private BiFunction prefetchCondition; /** * Parameter name for client scanner sync/async prefetch toggle. @@ -281,6 +292,8 @@ public class Scan extends Query { setPriority(scan.getPriority()); readType = scan.getReadType(); super.setReplicaId(scan.getReplicaId()); + this.prefetchCondition = scan.getPrefetchCondition(); + this.prefetchExecutor = scan.getPrefetchExecutor(); } /** @@ -1146,6 +1159,24 @@ public class Scan extends Query { return this; } + public BiFunction getPrefetchCondition() { + return prefetchCondition; + } + + public Scan setPrefetchCondition(BiFunction prefetchCondition) { + this.prefetchCondition = prefetchCondition; + return this; + } + + public ExecutorService getPrefetchExecutor() { + return prefetchExecutor; + } + + public Scan setPrefetchExecutor(ExecutorService prefetchExecutor) { + this.prefetchExecutor = prefetchExecutor; + return this; + } + /** * @return the limit of rows for this scan */ @@ -1249,6 +1280,8 @@ public class Scan extends Query { return needCursorResult; } + + /** * Create a new Scan with a cursor. It only set the position information like start row key. * The others (like cfs, stop row, limit) should still be filled in by the user. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/DifferentScanThreadException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/DifferentScanThreadException.java new file mode 100644 index 0000000000..6f786e13f0 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/DifferentScanThreadException.java @@ -0,0 +1,13 @@ +package org.apache.hadoop.hbase.exceptions; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Throw when different thread scan result use same {@link org.apache.hadoop.hbase.client.ClientAsyncPrefetchScanner} + */ +@InterfaceAudience.Public +public class DifferentScanThreadException extends RuntimeException{ + public DifferentScanThreadException(String message){ + super(message); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index b91e205d42..af605055dc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -30,6 +30,8 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.IntStream; @@ -785,11 +787,46 @@ public class TestScannersFromClientSide { puts.clear(); } + Scan scanWithDefault = new Scan(); + scanWithDefault.setAsyncPrefetch(true); + if (caching > 0) { + scanWithDefault.setCaching(caching); + } + doTestAsyncScanner(ht,scanWithDefault,listener,rowNumber,kvListExp,toLog); + + + ExecutorService executorService = Executors.newFixedThreadPool(1); + Scan scanWithThreadPool = new Scan(); + scanWithThreadPool.setAsyncPrefetch(true); + scanWithThreadPool.setPrefetchExecutor(executorService); + if (caching > 0) { + scanWithDefault.setCaching(caching); + } + doTestAsyncScanner(ht,scanWithThreadPool,listener,rowNumber,kvListExp,toLog); + + Scan scanWithPrefetchCondition = new Scan(); + scanWithPrefetchCondition.setAsyncPrefetch(true); + scanWithPrefetchCondition.setPrefetchCondition((maxSize,size) -> size < maxSize/2); + if (caching > 0) { + scanWithDefault.setCaching(caching); + } + doTestAsyncScanner(ht,scanWithPrefetchCondition,listener,rowNumber,kvListExp,toLog); + Scan scan = new Scan(); scan.setAsyncPrefetch(true); + scan.setPrefetchCondition((maxSize,size) -> size < maxSize/2); + scan.setPrefetchExecutor(executorService); if (caching > 0) { - scan.setCaching(caching); + scanWithDefault.setCaching(caching); } + doTestAsyncScanner(ht,scanWithPrefetchCondition,listener,rowNumber,kvListExp,toLog); + + executorService.shutdown(); + + TEST_UTIL.deleteTable(table); + } + + private void doTestAsyncScanner(Table ht,Scan scan,Consumer listener,int rowNumber,List kvListExp,boolean toLog) throws IOException, InterruptedException { try (ResultScanner scanner = ht.getScanner(scan)) { assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner); ((ClientAsyncPrefetchScanner) scanner).setPrefetchListener(listener); @@ -814,8 +851,6 @@ public class TestScannersFromClientSide { result = Result.create(kvListScan); verifyResult(result, kvListExp, toLog, "Testing async scan"); } - - TEST_UTIL.deleteTable(table); } private static byte[][] makeNAsciiWithZeroPrefix(byte[] base, int n) { -- 2.18.0