From 05bee3f7fddeda5613a51312d1bfa40f645add81 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 8 Feb 2017 17:30:25 +0800 Subject: [PATCH] HBASE-17608 Add suspend support for RawScanResultConsumer --- .../hadoop/hbase/client/AsyncClientScanner.java | 7 +- .../client/AsyncRpcRetryingCallerFactory.java | 21 +- .../AsyncScanSingleRegionRpcRetryingCaller.java | 211 +++++++++++++++++---- .../hbase/client/AsyncTableResultScanner.java | 106 ++++------- .../hadoop/hbase/client/RawAsyncTableImpl.java | 3 +- .../hadoop/hbase/client/RawScanResultConsumer.java | 49 ++++- .../java/org/apache/hadoop/hbase/util/Threads.java | 2 +- .../client/coprocessor/AsyncAggregationClient.java | 55 +++--- .../hbase/client/TestAsyncTableScanRenewLease.java | 150 +++++++++++++++ .../hadoop/hbase/client/TestAsyncTableScanner.java | 1 - .../TestAsyncTableScannerCloseWhileSuspending.java | 105 ++++++++++ .../hadoop/hbase/client/TestRawAsyncTableScan.java | 3 +- 12 files changed, 554 insertions(+), 159 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanRenewLease.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index b9fd34f..2215d36 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -130,11 +130,12 @@ class AsyncClientScanner { private void startScan(OpenScannerResponse resp) { conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc) - .stub(resp.stub).setScan(scan).consumer(consumer).resultCache(resultCache) + .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub) + .setScan(scan).consumer(consumer).resultCache(resultCache) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp) - .whenComplete((hasMore, error) -> { + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) + .start(resp.controller, resp.resp).whenComplete((hasMore, error) -> { if (error != null) { consumer.onError(error); return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 6bc2cc1..dabb8e7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -154,6 +154,8 @@ class AsyncRpcRetryingCallerFactory { private HRegionLocation loc; + private long scannerLeaseTimeoutPeriodNs; + private long scanTimeoutNs; private long rpcTimeoutNs; @@ -188,6 +190,12 @@ class AsyncRpcRetryingCallerFactory { return this; } + public ScanSingleRegionCallerBuilder scannerLeaseTimeoutPeriod(long scannerLeaseTimeoutPeriod, + TimeUnit unit) { + this.scannerLeaseTimeoutPeriodNs = unit.toNanos(scannerLeaseTimeoutPeriod); + return this; + } + public ScanSingleRegionCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) { this.scanTimeoutNs = unit.toNanos(scanTimeout); return this; @@ -219,8 +227,8 @@ class AsyncRpcRetryingCallerFactory { checkNotNull(scan, "scan is null"), scannerId, checkNotNull(resultCache, "resultCache is null"), checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"), - checkNotNull(loc, "location is null"), pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, - startLogErrorsCnt); + checkNotNull(loc, "location is null"), scannerLeaseTimeoutPeriodNs, pauseNs, maxAttempts, + scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } /** @@ -305,7 +313,8 @@ class AsyncRpcRetryingCallerFactory { private long rpcTimeoutNs = -1L; - public MasterRequestCallerBuilder action(AsyncMasterRequestRpcRetryingCaller.Callable callable) { + public MasterRequestCallerBuilder action( + AsyncMasterRequestRpcRetryingCaller.Callable callable) { this.callable = callable; return this; } @@ -336,9 +345,9 @@ class AsyncRpcRetryingCallerFactory { } public AsyncMasterRequestRpcRetryingCaller build() { - return new AsyncMasterRequestRpcRetryingCaller(retryTimer, conn, checkNotNull(callable, - "action is null"), pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, - startLogErrorsCnt); + return new AsyncMasterRequestRpcRetryingCaller(retryTimer, conn, + checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs, + rpcTimeoutNs, startLogErrorsCnt); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 3ef4a6f..788123f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -17,13 +17,17 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ConnectionUtils.*; +import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; +import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan; import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan; import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; +import com.google.common.base.Preconditions; + import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; import java.io.IOException; import java.util.ArrayList; @@ -39,6 +43,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.RawScanResultConsumer.Resumer; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.HBaseRpcController; @@ -76,6 +81,8 @@ class AsyncScanSingleRegionRpcRetryingCaller { private final HRegionLocation loc; + private final long scannerLeaseTimeoutPeriodNs; + private final long pauseNs; private final int maxAttempts; @@ -104,10 +111,125 @@ class AsyncScanSingleRegionRpcRetryingCaller { private long nextCallSeq = -1L; + private enum StopperState { + INITIALIZED, SUSPENDED, TERMINATED, DESTROYED + } + + private final class StopperImpl implements RawScanResultConsumer.Stopper { + + // Make sure the methods are only called in this thread. + private final Thread callerThread = Thread.currentThread(); + + // INITIALIZED -> SUSPENDED -> DESTROYED + // INITIALIZED -> TERMINATED -> DESTROYED + // INITIALIZED -> DESTROYED + // If the state is incorrect we will throw IllegalStateException. + private StopperState state = StopperState.INITIALIZED; + + private ResumerImpl resumer; + + private void preCheck() { + Preconditions.checkState(Thread.currentThread() == callerThread, + "should not call this method outside onNext or onHeartbeat"); + Preconditions.checkState(state.equals(StopperState.INITIALIZED), "Invalid Stopper state %s", + state); + } + + @Override + public Resumer suspend() { + preCheck(); + state = StopperState.SUSPENDED; + ResumerImpl resumer = new ResumerImpl(); + this.resumer = resumer; + return resumer; + } + + @Override + public void terminate() { + preCheck(); + state = StopperState.TERMINATED; + } + + public StopperState destroy() { + StopperState state = this.state; + this.state = StopperState.DESTROYED; + return state; + } + } + + private enum ResumerState { + INITIALIZED, SUSPENDED, RESUMED + } + + private final class ResumerImpl implements RawScanResultConsumer.Resumer { + + // INITIALIZED -> SUSPENDED -> RESUMED + // INITIALIZED -> RESUMED + private ResumerState state = ResumerState.INITIALIZED; + + private ScanResponse resp; + + private int numValidResults; + + private Timeout leaseRenewer; + + @Override + public void resume() { + synchronized (this) { + if (state == ResumerState.INITIALIZED) { + // user calls this method before we actually suspend the scan, so just set the state to + // RESUMED, the implementation will just go on. + state = ResumerState.RESUMED; + return; + } + if (state == ResumerState.RESUMED) { + return; + } + state = ResumerState.RESUMED; + if (leaseRenewer != null) { + leaseRenewer.cancel(); + } + } + completeOrNext(resp, numValidResults); + } + + private void scheduleRenewLeaseTask() { + leaseRenewer = retryTimer.newTimeout(t -> tryRenewLease(), scannerLeaseTimeoutPeriodNs / 2, + TimeUnit.NANOSECONDS); + } + + private synchronized void tryRenewLease() { + // the scan has already been resumed, give up + if (state == ResumerState.RESUMED) { + return; + } + renewLease(); + // schedule the next renew lease task + scheduleRenewLeaseTask(); + } + + // return false means that the scan has already been resumed. Just go on. + public synchronized boolean suspend(ScanResponse resp, int numValidResults) { + if (state == ResumerState.RESUMED) { + // user calls resume before we actually suspend the scan, just continue; + return false; + } + state = ResumerState.SUSPENDED; + this.resp = resp; + this.numValidResults = numValidResults; + if (resp.getMoreResultsInRegion()) { + // schedule renew lease task + scheduleRenewLeaseTask(); + } + return true; + } + } + public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, Scan scan, long scannerId, ScanResultCache resultCache, - RawScanResultConsumer consumer, Interface stub, HRegionLocation loc, long pauseNs, - int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + RawScanResultConsumer consumer, Interface stub, HRegionLocation loc, + long scannerLeaseTimeoutPeriodNs, long pauseNs, int maxAttempts, long scanTimeoutNs, + long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.scan = scan; this.scannerId = scannerId; @@ -115,6 +237,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { this.consumer = consumer; this.stub = stub; this.loc = loc; + this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs; this.pauseNs = pauseNs; this.maxAttempts = maxAttempts; this.scanTimeoutNs = scanTimeoutNs; @@ -143,9 +266,9 @@ class AsyncScanSingleRegionRpcRetryingCaller { ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false); stub.scan(controller, req, resp -> { if (controller.failed()) { - LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId - + " for " + loc.getRegionInfo().getEncodedName() + " of " - + loc.getRegionInfo().getTable() + " failed, ignore, probably already closed", + LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId + + " for " + loc.getRegionInfo().getEncodedName() + " of " + + loc.getRegionInfo().getTable() + " failed, ignore, probably already closed", controller.getFailed()); } }); @@ -182,16 +305,15 @@ class AsyncScanSingleRegionRpcRetryingCaller { private void onError(Throwable error) { error = translateException(error); if (tries > startLogErrorsCnt) { - LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " - + loc.getRegionInfo().getEncodedName() + " of " + loc.getRegionInfo().getTable() - + " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " - + TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() - + " ms", + LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " + + loc.getRegionInfo().getEncodedName() + " of " + loc.getRegionInfo().getTable() + + " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " + + TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() + + " ms", error); } - boolean scannerClosed = - error instanceof UnknownScannerException || error instanceof NotServingRegionException - || error instanceof RegionServerStoppedException; + boolean scannerClosed = error instanceof UnknownScannerException || + error instanceof NotServingRegionException || error instanceof RegionServerStoppedException; RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(), ""); @@ -248,6 +370,27 @@ class AsyncScanSingleRegionRpcRetryingCaller { } } + private void completeOrNext(ScanResponse resp, int numValidResults) { + if (resp.hasMoreResults() && !resp.getMoreResults()) { + // RS tells us there is no more data for the whole scan + completeNoMoreResults(); + return; + } + if (scan.getLimit() > 0) { + // The RS should have set the moreResults field in ScanResponse to false when we have reached + // the limit. + int limit = scan.getLimit() - numValidResults; + assert limit > 0; + scan.setLimit(limit); + } + // as in 2.0 this value will always be set + if (!resp.getMoreResultsInRegion()) { + completeWhenNoMoreResultsInRegion.run(); + return; + } + next(); + } + private void onComplete(HBaseRpcController controller, ScanResponse resp) { if (controller.failed()) { onError(controller.getFailed()); @@ -269,20 +412,16 @@ class AsyncScanSingleRegionRpcRetryingCaller { return; } - boolean stopByUser; + StopperImpl stopper = new StopperImpl(); if (results.length == 0) { // if we have nothing to return then this must be a heartbeat message. - stopByUser = !consumer.onHeartbeat(); + consumer.onHeartbeat(stopper); } else { updateNextStartRowWhenError(results[results.length - 1]); - stopByUser = !consumer.onNext(results); - } - if (resp.hasMoreResults() && !resp.getMoreResults()) { - // RS tells us there is no more data for the whole scan - completeNoMoreResults(); - return; + consumer.onNext(results, stopper); } - if (stopByUser) { + StopperState state = stopper.destroy(); + if (state == StopperState.TERMINATED) { if (resp.getMoreResultsInRegion()) { // we have more results in region but user request to stop the scan, so we need to close the // scanner explicitly. @@ -291,19 +430,12 @@ class AsyncScanSingleRegionRpcRetryingCaller { completeNoMoreResults(); return; } - if (scan.getLimit() > 0) { - // The RS should have set the moreResults field in ScanResponse to false when we have reached - // the limit. - int limit = scan.getLimit() - results.length; - assert limit > 0; - scan.setLimit(limit); - } - // as in 2.0 this value will always be set - if (!resp.getMoreResultsInRegion()) { - completeWhenNoMoreResultsInRegion.run(); - return; + if (state == StopperState.SUSPENDED) { + if (stopper.resumer.suspend(resp, results.length)) { + return; + } } - next(); + completeOrNext(resp, results.length); } private void call() { @@ -337,6 +469,15 @@ class AsyncScanSingleRegionRpcRetryingCaller { call(); } + private void renewLease() { + nextCallSeq++; + resetController(controller, rpcTimeoutNs); + ScanRequest req = + RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1); + stub.scan(controller, req, resp -> { + }); + } + /** * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java index e2c4ec3..335d2cd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; -import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import java.io.IOException; @@ -29,9 +29,7 @@ import java.util.Queue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.Bytes; /** * The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically @@ -45,8 +43,6 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { private final RawAsyncTable rawTable; - private final Scan scan; - private final long maxCacheSize; private final Queue queue = new ArrayDeque<>(); @@ -57,16 +53,10 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { private Throwable error; - private boolean prefetchStopped; - - private int numberOfOnCompleteToIgnore; - - // used to filter out cells that already returned when we restart a scan - private Cell lastCell; + private Resumer resumer; public AsyncTableResultScanner(RawAsyncTable table, Scan scan, long maxCacheSize) { this.rawTable = table; - this.scan = scan; this.maxCacheSize = maxCacheSize; table.scan(scan, this); } @@ -76,71 +66,36 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { cacheSize += calcEstimatedSize(result); } - private void stopPrefetch(Result lastResult) { - prefetchStopped = true; - if (lastResult.isPartial() || scan.getBatch() > 0) { - scan.withStartRow(lastResult.getRow()); - lastCell = lastResult.rawCells()[lastResult.rawCells().length - 1]; - } else { - scan.withStartRow(lastResult.getRow(), false); - } + private void stopPrefetch(Stopper stopper) { if (LOG.isDebugEnabled()) { - LOG.debug( - String.format("0x%x", System.identityHashCode(this)) + " stop prefetching when scanning " - + rawTable.getName() + " as the cache size " + cacheSize - + " is greater than the maxCacheSize " + maxCacheSize + ", the next start row is " - + Bytes.toStringBinary(scan.getStartRow()) + ", lastCell is " + lastCell); + LOG.debug(String.format("0x%x", System.identityHashCode(this)) + + " stop prefetching when scanning " + rawTable.getName() + " as the cache size " + + cacheSize + " is greater than the maxCacheSize " + maxCacheSize); } - // Ignore an onComplete call as the scan is stopped by us. - // Here we can not use a simple boolean flag. A scan operation can cross multiple regions and - // the regions may be located on different regionservers, so it is possible that the methods of - // RawScanResultConsumer are called in different rpc framework threads and overlapped with each - // other. It may happen that - // 1. we stop scan1 - // 2. we start scan2 - // 3. we stop scan2 - // 4. onComplete for scan1 is called - // 5. onComplete for scan2 is called - // So if we use a boolean flag here then we can only ignore the onComplete in step4 and think - // that the onComplete in step 5 tells us there is no data. - numberOfOnCompleteToIgnore++; + resumer = stopper.suspend(); } @Override - public synchronized boolean onNext(Result[] results) { + public synchronized void onNext(Result[] results, Stopper stopper) { assert results.length > 0; if (closed) { - return false; - } - Result firstResult = results[0]; - if (lastCell != null) { - firstResult = filterCells(firstResult, lastCell); - if (firstResult != null) { - // do not set lastCell to null if the result after filtering is null as there may still be - // other cells that can be filtered out - lastCell = null; - addToCache(firstResult); - } else if (results.length == 1) { - // the only one result is null - return true; - } - } else { - addToCache(firstResult); + stopper.terminate(); + return; } - for (int i = 1; i < results.length; i++) { - addToCache(results[i]); + for (Result result : results) { + addToCache(result); } notifyAll(); - if (cacheSize < maxCacheSize) { - return true; + if (cacheSize >= maxCacheSize) { + stopPrefetch(stopper); } - stopPrefetch(results[results.length - 1]); - return false; } @Override - public synchronized boolean onHeartbeat() { - return !closed; + public synchronized void onHeartbeat(Stopper stopper) { + if (closed) { + stopper.terminate(); + } } @Override @@ -150,12 +105,6 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { @Override public synchronized void onComplete() { - // Do not mark the scanner as closed if the scan is stopped by us due to cache size limit since - // we may resume later by starting a new scan. See resumePrefetch. - if (numberOfOnCompleteToIgnore > 0) { - numberOfOnCompleteToIgnore--; - return; - } closed = true; notifyAll(); } @@ -164,8 +113,8 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { if (LOG.isDebugEnabled()) { LOG.debug(String.format("0x%x", System.identityHashCode(this)) + " resume prefetching"); } - prefetchStopped = false; - rawTable.scan(scan, this); + resumer.resume(); + resumer = null; } @Override @@ -186,7 +135,7 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { } Result result = queue.poll(); cacheSize -= calcEstimatedSize(result); - if (prefetchStopped && cacheSize <= maxCacheSize / 2) { + if (resumer != null && cacheSize <= maxCacheSize / 2) { resumePrefetch(); } return result; @@ -197,13 +146,22 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { closed = true; queue.clear(); cacheSize = 0; + if (resumer != null) { + resumePrefetch(); + } notifyAll(); } @Override public boolean renewLease() { - // we will do prefetching in the background and if there is no space we will just terminate the - // background scan operation. So there is no reason to renew lease here. + // we will do prefetching in the background and if there is no space we will just suspend the + // scanner. The renew lease operation will be handled in the background. return false; } + + // used in tests to test whether the scanner has been suspended + @VisibleForTesting + synchronized boolean suspended() { + return resumer != null; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 00f255e..c1b0373 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -358,9 +358,8 @@ class RawAsyncTableImpl implements RawAsyncTable { scan(scan, new RawScanResultConsumer() { @Override - public boolean onNext(Result[] results) { + public void onNext(Result[] results, Stopper stopper) { scanResults.addAll(Arrays.asList(results)); - return true; } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java index 2e5d422..7c09596 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java @@ -35,20 +35,59 @@ import org.apache.hadoop.hbase.client.Result; public interface RawScanResultConsumer { /** + * Used to resume a scan. + */ + interface Resumer { + + /** + * Resume the scan. You are free to call it multiple time but only the first call will take + * effect. + */ + void resume(); + } + + /** + * Used to suspend or stop a scan. + *

+ * Notice that, you should only call the methods below inside onNext or onHeartbeat method. A + * IllegalStateException will be thrown if you call them at other places. + *

+ * You can only call one of the methods below, i.e., call suspend or terminate(of course you are + * free to not call them both), and the methods are not reentrant. A IllegalStateException will be + * thrown if you have already called one of the methods. + */ + interface Stopper { + + /** + * Suspend the scan. + * @return A resumer used to resume the scan later. + */ + Resumer suspend(); + + /** + * Terminate the scan. + */ + void terminate(); + } + + /** * @param results the data fetched from HBase service. - * @return {@code false} if you want to terminate the scan process. Otherwise {@code true} + * @param stopper used to suspend or terminate the scan. Notice that the stopper instance is only + * valid within scope of onNext method. You can only call its method in onNext, do NOT + * store it and call it later outside onNext. */ - boolean onNext(Result[] results); + void onNext(Result[] results, Stopper stopper); /** * Indicate that there is an heartbeat message but we have not cumulated enough cells to call * onNext. *

* This method give you a chance to terminate a slow scan operation. - * @return {@code false} if you want to terminate the scan process. Otherwise {@code true} + * @param stopper used to suspend or terminate the scan. Notice that the stopper instance is only + * valid within scope of onNext method. You can only call its method in onNext, do NOT + * store it and call it later outside onNext. */ - default boolean onHeartbeat() { - return true; + default void onHeartbeat(Stopper stopper) { } /** diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java index d10e0f2..21b376c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -147,7 +147,7 @@ public class Threads { try { Thread.sleep(millis); } catch (InterruptedException e) { - e.printStackTrace(); + LOG.warn("sleep interrupted", e); Thread.currentThread().interrupt(); } } diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java index f8d0a19..fcaf670 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java @@ -108,22 +108,22 @@ public class AsyncAggregationClient { } } - private static R - getCellValueFromProto(ColumnInterpreter ci, AggregateResponse resp, - int firstPartIndex) throws IOException { + private static R getCellValueFromProto( + ColumnInterpreter ci, AggregateResponse resp, int firstPartIndex) + throws IOException { Q q = getParsedGenericInstance(ci.getClass(), 3, resp.getFirstPart(firstPartIndex)); return ci.getCellValueFromProto(q); } - private static S - getPromotedValueFromProto(ColumnInterpreter ci, AggregateResponse resp, - int firstPartIndex) throws IOException { + private static S getPromotedValueFromProto( + ColumnInterpreter ci, AggregateResponse resp, int firstPartIndex) + throws IOException { T t = getParsedGenericInstance(ci.getClass(), 4, resp.getFirstPart(firstPartIndex)); return ci.getPromotedValueFromProto(t); } - public static CompletableFuture - max(RawAsyncTable table, ColumnInterpreter ci, Scan scan) { + public static CompletableFuture max( + RawAsyncTable table, ColumnInterpreter ci, Scan scan) { CompletableFuture future = new CompletableFuture<>(); AggregateRequest req; try { @@ -158,8 +158,8 @@ public class AsyncAggregationClient { return future; } - public static CompletableFuture - min(RawAsyncTable table, ColumnInterpreter ci, Scan scan) { + public static CompletableFuture min( + RawAsyncTable table, ColumnInterpreter ci, Scan scan) { CompletableFuture future = new CompletableFuture<>(); AggregateRequest req; try { @@ -194,9 +194,8 @@ public class AsyncAggregationClient { return future; } - public static - CompletableFuture - rowCount(RawAsyncTable table, ColumnInterpreter ci, Scan scan) { + public static CompletableFuture rowCount( + RawAsyncTable table, ColumnInterpreter ci, Scan scan) { CompletableFuture future = new CompletableFuture<>(); AggregateRequest req; try { @@ -226,8 +225,8 @@ public class AsyncAggregationClient { return future; } - public static CompletableFuture - sum(RawAsyncTable table, ColumnInterpreter ci, Scan scan) { + public static CompletableFuture sum( + RawAsyncTable table, ColumnInterpreter ci, Scan scan) { CompletableFuture future = new CompletableFuture<>(); AggregateRequest req; try { @@ -260,9 +259,8 @@ public class AsyncAggregationClient { return future; } - public static - CompletableFuture - avg(RawAsyncTable table, ColumnInterpreter ci, Scan scan) { + public static CompletableFuture avg( + RawAsyncTable table, ColumnInterpreter ci, Scan scan) { CompletableFuture future = new CompletableFuture<>(); AggregateRequest req; try { @@ -297,9 +295,8 @@ public class AsyncAggregationClient { return future; } - public static - CompletableFuture - std(RawAsyncTable table, ColumnInterpreter ci, Scan scan) { + public static CompletableFuture std( + RawAsyncTable table, ColumnInterpreter ci, Scan scan) { CompletableFuture future = new CompletableFuture<>(); AggregateRequest req; try { @@ -340,9 +337,8 @@ public class AsyncAggregationClient { } // the map key is the startRow of the region - private static - CompletableFuture> - sumByRegion(RawAsyncTable table, ColumnInterpreter ci, Scan scan) { + private static CompletableFuture> sumByRegion( + RawAsyncTable table, ColumnInterpreter ci, Scan scan) { CompletableFuture> future = new CompletableFuture>(); AggregateRequest req; @@ -407,7 +403,7 @@ public class AsyncAggregationClient { private R value = null; @Override - public boolean onNext(Result[] results) { + public void onNext(Result[] results, Stopper stopper) { try { for (Result result : results) { Cell weightCell = result.getColumnLatestCell(family, weightQualifier); @@ -419,15 +415,14 @@ public class AsyncAggregationClient { } else { future.completeExceptionally(new NoSuchElementException()); } - return false; + stopper.terminate(); } Cell valueCell = result.getColumnLatestCell(family, valueQualifier); value = ci.getValue(family, valueQualifier, valueCell); } - return true; } catch (IOException e) { future.completeExceptionally(e); - return false; + stopper.terminate(); } } @@ -446,8 +441,8 @@ public class AsyncAggregationClient { }); } - public static CompletableFuture - median(RawAsyncTable table, ColumnInterpreter ci, Scan scan) { + public static CompletableFuture median( + RawAsyncTable table, ColumnInterpreter ci, Scan scan) { CompletableFuture future = new CompletableFuture<>(); sumByRegion(table, ci, scan).whenComplete((sumByRegion, error) -> { if (error != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanRenewLease.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanRenewLease.java new file mode 100644 index 0000000..0240af3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanRenewLease.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableScanRenewLease { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] CQ = Bytes.toBytes("cq"); + + private static AsyncConnection CONN; + + private static RawAsyncTable TABLE; + + private static int SCANNER_LEASE_TIMEOUT_PERIOD_MS = 5000; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + SCANNER_LEASE_TIMEOUT_PERIOD_MS); + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); + TABLE = CONN.getRawTable(TABLE_NAME); + TABLE.putAll(IntStream.range(0, 10).mapToObj( + i -> new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i))) + .collect(Collectors.toList())).get(); + } + + @AfterClass + public static void tearDown() throws Exception { + CONN.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + private static final class RenewLeaseConsumer implements RawScanResultConsumer { + + private final List results = new ArrayList<>(); + + private Throwable error; + + private boolean finished = false; + + private boolean suspended = false; + + @Override + public synchronized void onNext(Result[] results, Stopper stopper) { + for (Result result : results) { + this.results.add(result); + } + if (!suspended) { + Resumer resumer = stopper.suspend(); + new Thread(() -> { + Threads.sleep(2 * SCANNER_LEASE_TIMEOUT_PERIOD_MS); + try { + TABLE.put(new Put(Bytes.toBytes(String.format("%02d", 10))).addColumn(FAMILY, CQ, + Bytes.toBytes(10))).get(); + } catch (Exception e) { + onError(e); + } + resumer.resume(); + }).start(); + } + } + + @Override + public synchronized void onError(Throwable error) { + this.finished = true; + this.error = error; + notifyAll(); + } + + @Override + public synchronized void onComplete() { + this.finished = true; + notifyAll(); + } + + public synchronized List get() throws Throwable { + while (!finished) { + wait(); + } + if (error != null) { + throw error; + } + return results; + } + } + + @Test + public void test() throws Throwable { + RenewLeaseConsumer consumer = new RenewLeaseConsumer(); + TABLE.scan(new Scan(), consumer); + List results = consumer.get(); + // should not see the newly added value + assertEquals(10, results.size()); + IntStream.range(0, 10).forEach(i -> { + Result result = results.get(i); + assertEquals(String.format("%02d", i), Bytes.toString(result.getRow())); + assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ))); + }); + // now we can see the newly added value + List results2 = TABLE.scanAll(new Scan()).get(); + assertEquals(11, results2.size()); + IntStream.range(0, 11).forEach(i -> { + Result result = results2.get(i); + assertEquals(String.format("%02d", i), Bytes.toString(result.getRow())); + assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ))); + }); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java index 006770d..a3cad17 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java @@ -82,5 +82,4 @@ public class TestAsyncTableScanner extends AbstractTestAsyncTableScan { } return results; } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java new file mode 100644 index 0000000..5c2b1dd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.ForkJoinPool; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableScannerCloseWhileSuspending { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] CQ = Bytes.toBytes("cq"); + + private static AsyncConnection CONN; + + private static AsyncTable TABLE; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); + TABLE = CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + TABLE.putAll(IntStream.range(0, 100).mapToObj( + i -> new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i))) + .collect(Collectors.toList())).get(); + } + + @AfterClass + public static void tearDown() throws Exception { + CONN.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + private int getScannersCount() { + return TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() + .map(t -> t.getRegionServer()).mapToInt(rs -> rs.getRSRpcServices().getScannersCount()) + .sum(); + } + + @Test + public void testCloseScannerWhileSuspending() throws Exception { + try (ResultScanner scanner = TABLE.getScanner(new Scan().setMaxResultSize(1))) { + TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return ((AsyncTableResultScanner) scanner).suspended(); + } + + @Override + public String explainFailure() throws Exception { + return "The given scanner has been suspended in time"; + } + }); + assertEquals(1, getScannersCount()); + } + TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return getScannersCount() == 0; + } + + @Override + public String explainFailure() throws Exception { + return "Still have " + getScannersCount() + " scanners opened"; + } + }); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java index a8ef353..92f235f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java @@ -48,12 +48,11 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan { private Throwable error; @Override - public synchronized boolean onNext(Result[] results) { + public synchronized void onNext(Result[] results, Stopper stopper) { for (Result result : results) { queue.offer(result); } notifyAll(); - return true; } @Override -- 2.7.4