From 14e860c847a14ef6d0fb33e100dee0a9520a1716 Mon Sep 17 00:00:00 2001 From: "han.congcong" Date: Sat, 22 Dec 2018 12:34:19 +0800 Subject: [PATCH] async client improvement --- .../client/ClientAsyncPrefetchScanner.java | 134 ++++++++++-------- .../apache/hadoop/hbase/client/HTable.java | 3 +- .../org/apache/hadoop/hbase/client/Scan.java | 33 +++++ .../DifferentScanThreadException.java | 31 ++++ .../client/TestScannersFromClientSide.java | 59 +++++++- 5 files changed, 200 insertions(+), 60 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/DifferentScanThreadException.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java index e5af8717a8..3f5234c628 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java @@ -17,28 +17,27 @@ */ package org.apache.hadoop.hbase.client; +import org.apache.hadoop.hbase.exceptions.DifferentScanThreadException; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; import java.io.IOException; -import java.io.InterruptedIOException; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; +import java.util.function.BiFunction; import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.util.Threads; /** * ClientAsyncPrefetchScanner implements async scanner behaviour. @@ -57,21 +56,42 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { private AtomicLong cacheSizeInBytes; // exception queue (from prefetch to main scan execution) private Queue exceptionsQueue; - // prefetch thread to be executed asynchronously - private Thread prefetcher; + // prefetch pool to execute prefetchRunnable asynchronously + private ExecutorService prefetchPool; + // when to prefetch result + private BiFunction prefetchCondition; // used for testing private Consumer prefetchListener; - - private final Lock lock = new ReentrantLock(); - private final Condition notEmpty = lock.newCondition(); - private final Condition notFull = lock.newCondition(); + // fetch is running or not + private AtomicBoolean fetching = new AtomicBoolean(false); + // a prefetchRunnable is added to pool or not + private volatile boolean added = false; + //cache the first scan thread to avoid a thread block forever + private AtomicReference scanThread = new AtomicReference<>(); public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name, ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcControllerFactory, ExecutorService pool, - int replicaCallTimeoutMicroSecondScan) throws IOException { + int replicaCallTimeoutMicroSecondScan,ExecutorService prefetchPool, + BiFunction prefetchCondition) throws IOException { super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, replicaCallTimeoutMicroSecondScan); + + if(prefetchPool == null){ + this.prefetchPool = getPool(); + }else{ + this.prefetchPool = prefetchPool; + } + + if(prefetchCondition == null){ + this.prefetchCondition = (maxCacheSize,currentCacheSize)->this.prefetchCondition(); + }else{ + this.prefetchCondition = prefetchCondition; + } + if(this.prefetchCondition.apply(maxCacheSize,cacheSizeInBytes.get())) { + this.prefetchPool.submit(new PrefetchRunnable()); + added = true; + } } @VisibleForTesting @@ -86,8 +106,6 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { cache = new LinkedBlockingQueue<>(); cacheSizeInBytes = new AtomicLong(0); exceptionsQueue = new ConcurrentLinkedQueue<>(); - prefetcher = new Thread(new PrefetchRunnable()); - Threads.setDaemonThreadRunning(prefetcher, tableName + ".asyncPrefetcher"); } private long resultSize2CacheSize(long maxResultSize) { @@ -97,41 +115,38 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { @Override public Result next() throws IOException { - try { - lock.lock(); + if(!scanThread.compareAndSet(null,Thread.currentThread()) + && scanThread.get() != Thread.currentThread()){ + throw new DifferentScanThreadException("not same thread"); + } + handleException(); + boolean acquire = false; + try{ while (cache.isEmpty()) { - handleException(); if (this.closed) { return null; } - try { - notEmpty.await(); - } catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted when wait to load cache"); + if (fetching.get()) { + LockSupport.park(this); + } else if (fetching.compareAndSet(false,true)) { + acquire = true; + loadCache(); } } - Result result = pollCache(); - if (prefetchCondition()) { - notFull.signalAll(); + Result result = this.pollCache(); + + if(this.prefetchCondition.apply(maxCacheSize,cacheSizeInBytes.get())){ + if(!added){ + added=true; + this.prefetchPool.submit(new PrefetchRunnable()); + } } return result; - } finally { - lock.unlock(); - handleException(); - } - } - - @Override - public void close() { - try { - lock.lock(); - super.close(); - closed = true; - notFull.signalAll(); - notEmpty.signalAll(); - } finally { - lock.unlock(); + }finally { + if(acquire){ + fetching.compareAndSet(true,false); + } } } @@ -168,24 +183,31 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { @Override public void run() { - while (!closed) { - boolean succeed = false; - try { - lock.lock(); - while (!prefetchCondition()) { - notFull.await(); - } - loadCache(); - succeed = true; - } catch (Exception e) { - exceptionsQueue.add(e); - } finally { - notEmpty.signalAll(); - lock.unlock(); - if (prefetchListener != null) { - prefetchListener.accept(succeed); + boolean acquire = false; + boolean succeed = false; + try { + if (fetching.compareAndSet(false, true)) { + acquire = true; + while (!closed && prefetchCondition.apply(maxCacheSize, cacheSizeInBytes.get())) { + loadCache(); + succeed = true; + if(scanThread.get() != null) { + LockSupport.unpark(scanThread.get()); + } } } + }catch (Exception e){ + exceptionsQueue.add(e); + if(prefetchListener != null){ + prefetchListener.accept(succeed); + } + }finally { + if(acquire){ + fetching.compareAndSet(true,false); + } + if(scanThread.get() != null){ + LockSupport.unpark(scanThread.get()); + } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index fb69a2530b..70d14d03a1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -323,7 +323,8 @@ public class HTable implements Table { if (async) { return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan(), + scan.getPrefetchExecutor(),scan.getPrefetchCondition()); } else { return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index d4aff047a3..e9f539a375 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -28,6 +28,8 @@ import java.util.Map; import java.util.NavigableSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.ExecutorService; +import java.util.function.BiFunction; import org.apache.hadoop.hbase.HConstants; import org.apache.yetus.audience.InterfaceAudience; @@ -145,6 +147,15 @@ public class Scan extends Query { private Map> familyMap = new TreeMap>(Bytes.BYTES_COMPARATOR); private Boolean asyncPrefetch = null; + /** + * prefetch executor + */ + private ExecutorService prefetchExecutor; + + /** + * function that tells prefetch thread when to start fetching + */ + private BiFunction prefetchCondition; /** * Parameter name for client scanner sync/async prefetch toggle. @@ -281,6 +292,8 @@ public class Scan extends Query { setPriority(scan.getPriority()); readType = scan.getReadType(); super.setReplicaId(scan.getReplicaId()); + this.prefetchCondition = scan.getPrefetchCondition(); + this.prefetchExecutor = scan.getPrefetchExecutor(); } /** @@ -1146,6 +1159,24 @@ public class Scan extends Query { return this; } + public BiFunction getPrefetchCondition() { + return prefetchCondition; + } + + public Scan setPrefetchCondition(BiFunction prefetchCondition) { + this.prefetchCondition = prefetchCondition; + return this; + } + + public ExecutorService getPrefetchExecutor() { + return prefetchExecutor; + } + + public Scan setPrefetchExecutor(ExecutorService prefetchExecutor) { + this.prefetchExecutor = prefetchExecutor; + return this; + } + /** * @return the limit of rows for this scan */ @@ -1249,6 +1280,8 @@ public class Scan extends Query { return needCursorResult; } + + /** * Create a new Scan with a cursor. It only set the position information like start row key. * The others (like cfs, stop row, limit) should still be filled in by the user. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/DifferentScanThreadException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/DifferentScanThreadException.java new file mode 100644 index 0000000000..0ced801382 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/DifferentScanThreadException.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.exceptions; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * We throw DifferentScanThreadException if two threads share a prefetcher + */ +@InterfaceAudience.Public +public class DifferentScanThreadException extends RuntimeException{ + public DifferentScanThreadException(String message){ + super(message); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index af0248215c..4d1d110eb2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -31,6 +31,8 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.IntStream; @@ -786,11 +788,64 @@ public class TestScannersFromClientSide { puts.clear(); } + Scan scanWithDefault = new Scan(); + scanWithDefault.setAsyncPrefetch(true); + if (caching > 0) { + scanWithDefault.setCaching(caching); + } + doTestAsyncScanner(ht,scanWithDefault,listener,rowNumber,kvListExp,toLog); + + + ExecutorService executorService = Executors.newFixedThreadPool(1); + Scan scanWithThreadPool = new Scan(); + scanWithThreadPool.setAsyncPrefetch(true); + scanWithThreadPool.setPrefetchExecutor(executorService); + if (caching > 0) { + scanWithDefault.setCaching(caching); + } + doTestAsyncScanner(ht,scanWithThreadPool,listener,rowNumber,kvListExp,toLog); + + Scan scanWithPrefetchCondition1 = new Scan(); + scanWithPrefetchCondition1.setAsyncPrefetch(true); + scanWithPrefetchCondition1.setPrefetchCondition((maxSize,size) -> size < maxSize/2); + if (caching > 0) { + scanWithDefault.setCaching(caching); + } + doTestAsyncScanner(ht,scanWithPrefetchCondition1,listener,rowNumber,kvListExp,toLog); + + Scan scanWithPrefetchCondition2 = new Scan(); + scanWithPrefetchCondition2.setAsyncPrefetch(true); + scanWithPrefetchCondition2.setPrefetchCondition((maxSize,size) -> true); + if (caching > 0) { + scanWithDefault.setCaching(caching); + } + doTestAsyncScanner(ht,scanWithPrefetchCondition2,listener,rowNumber,kvListExp,toLog); + + Scan scanWithPrefetchCondition3 = new Scan(); + scanWithPrefetchCondition3.setAsyncPrefetch(true); + scanWithPrefetchCondition3.setPrefetchCondition((maxSize,size) -> false); + if (caching > 0) { + scanWithDefault.setCaching(caching); + } + doTestAsyncScanner(ht,scanWithPrefetchCondition3,listener,rowNumber,kvListExp,toLog); + Scan scan = new Scan(); scan.setAsyncPrefetch(true); + scan.setPrefetchCondition((maxSize,size) -> size < maxSize/2); + scan.setPrefetchExecutor(executorService); if (caching > 0) { - scan.setCaching(caching); + scanWithDefault.setCaching(caching); } + doTestAsyncScanner(ht,scan,listener,rowNumber,kvListExp,toLog); + + executorService.shutdown(); + + TEST_UTIL.deleteTable(table); + } + + private void doTestAsyncScanner(Table ht,Scan scan,Consumer listener,int rowNumber, + List kvListExp,boolean toLog) + throws IOException, InterruptedException { try (ResultScanner scanner = ht.getScanner(scan)) { assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner); ((ClientAsyncPrefetchScanner) scanner).setPrefetchListener(listener); @@ -815,8 +870,6 @@ public class TestScannersFromClientSide { result = Result.create(kvListScan); verifyResult(result, kvListExp, toLog, "Testing async scan"); } - - TEST_UTIL.deleteTable(table); } private static byte[][] makeNAsciiWithZeroPrefix(byte[] base, int n) { -- 2.18.0