From 3773b3555cc69add09170a0cf98cf2f477931809 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 15 Feb 2017 16:56:11 +0800 Subject: [PATCH] HBASE-17608 Add suspend support for RawScanResultConsumer --- .../hadoop/hbase/client/AsyncClientScanner.java | 7 +- .../client/AsyncRpcRetryingCallerFactory.java | 21 +- .../AsyncScanSingleRegionRpcRetryingCaller.java | 262 ++++++++++++++++++--- .../hbase/client/AsyncTableResultScanner.java | 106 +++------ .../apache/hadoop/hbase/client/RawAsyncTable.java | 14 +- .../hadoop/hbase/client/RawAsyncTableImpl.java | 3 +- .../hadoop/hbase/client/RawScanResultConsumer.java | 68 +++++- .../java/org/apache/hadoop/hbase/util/Threads.java | 2 +- .../client/coprocessor/AsyncAggregationClient.java | 8 +- .../hbase/client/TestAsyncTableScanRenewLease.java | 150 ++++++++++++ .../hadoop/hbase/client/TestAsyncTableScanner.java | 1 - .../TestAsyncTableScannerCloseWhileSuspending.java | 105 +++++++++ .../hadoop/hbase/client/TestRawAsyncTableScan.java | 3 +- 13 files changed, 605 insertions(+), 145 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanRenewLease.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index b9fd34f..2215d36 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -130,11 +130,12 @@ class AsyncClientScanner { private void startScan(OpenScannerResponse resp) { conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc) - .stub(resp.stub).setScan(scan).consumer(consumer).resultCache(resultCache) + .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub) + .setScan(scan).consumer(consumer).resultCache(resultCache) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp) - .whenComplete((hasMore, error) -> { + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) + .start(resp.controller, resp.resp).whenComplete((hasMore, error) -> { if (error != null) { consumer.onError(error); return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 82c5d63..9bc651d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -156,6 +156,8 @@ class AsyncRpcRetryingCallerFactory { private HRegionLocation loc; + private long scannerLeaseTimeoutPeriodNs; + private long scanTimeoutNs; private long rpcTimeoutNs; @@ -190,6 +192,12 @@ class AsyncRpcRetryingCallerFactory { return this; } + public ScanSingleRegionCallerBuilder scannerLeaseTimeoutPeriod(long scannerLeaseTimeoutPeriod, + TimeUnit unit) { + this.scannerLeaseTimeoutPeriodNs = unit.toNanos(scannerLeaseTimeoutPeriod); + return this; + } + public ScanSingleRegionCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) { this.scanTimeoutNs = unit.toNanos(scanTimeout); return this; @@ -221,8 +229,8 @@ class AsyncRpcRetryingCallerFactory { checkNotNull(scan, "scan is null"), scannerId, checkNotNull(resultCache, "resultCache is null"), checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"), - checkNotNull(loc, "location is null"), pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, - startLogErrorsCnt); + checkNotNull(loc, "location is null"), scannerLeaseTimeoutPeriodNs, pauseNs, maxAttempts, + scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } /** @@ -307,7 +315,8 @@ class AsyncRpcRetryingCallerFactory { private long rpcTimeoutNs = -1L; - public MasterRequestCallerBuilder action(AsyncMasterRequestRpcRetryingCaller.Callable callable) { + public MasterRequestCallerBuilder action( + AsyncMasterRequestRpcRetryingCaller.Callable callable) { this.callable = callable; return this; } @@ -338,9 +347,9 @@ class AsyncRpcRetryingCallerFactory { } public AsyncMasterRequestRpcRetryingCaller build() { - return new AsyncMasterRequestRpcRetryingCaller(retryTimer, conn, checkNotNull(callable, - "action is null"), pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, - startLogErrorsCnt); + return new AsyncMasterRequestRpcRetryingCaller(retryTimer, conn, + checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs, + rpcTimeoutNs, startLogErrorsCnt); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 3ef4a6f..9e4ff53 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -17,13 +17,17 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ConnectionUtils.*; +import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; +import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan; import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan; import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; +import com.google.common.base.Preconditions; + import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; import java.io.IOException; import java.util.ArrayList; @@ -39,6 +43,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.RawScanResultConsumer.ScanResumer; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.HBaseRpcController; @@ -76,6 +81,8 @@ class AsyncScanSingleRegionRpcRetryingCaller { private final HRegionLocation loc; + private final long scannerLeaseTimeoutPeriodNs; + private final long pauseNs; private final int maxAttempts; @@ -104,10 +111,174 @@ class AsyncScanSingleRegionRpcRetryingCaller { private long nextCallSeq = -1L; + private enum ControllerState { + INITIALIZED, SUSPENDED, TERMINATED, DESTROYED + } + + // Since suspend and terminate should only be called within onNext or onHeartbeat(see the comments + // of RawScanResultConsumer.onNext and onHeartbeat), we need to add some check to prevent invalid + // usage. We use two things to prevent invalid usage: + // 1. Record the thread that construct the ScanControllerImpl instance. We will throw an + // IllegalStateException if the caller thread is not this thread. + // 2. The ControllerState. The initial state is INITIALIZED, if you call suspend, the state will + // be transformed to SUSPENDED, and if you call terminate, the state will be transformed to + // TERMINATED. And when we are back from onNext or onHeartbeat in the onComplete method, we will + // call destroy to get the current state and set the state to DESTROYED. And when user calls + // suspend or terminate, we will check if the current state is INITIALIZED, if not we will throw + // an IllegalStateException. Notice that the DESTROYED state is necessary as you may not call + // suspend or terminate so the state will still be INITIALIZED when back from onNext or + // onHeartbeat. We need another state to replace the INITIALIZED state to prevent the controller + // to be used in the future. + // Notice that, the public methods of this class is supposed to be called by upper layer only, and + // package private methods can only be called within the implementation of + // AsyncScanSingleRegionRpcRetryingCaller. + private final class ScanControllerImpl implements RawScanResultConsumer.ScanController { + + // Make sure the methods are only called in this thread. + private final Thread callerThread = Thread.currentThread(); + + // INITIALIZED -> SUSPENDED -> DESTROYED + // INITIALIZED -> TERMINATED -> DESTROYED + // INITIALIZED -> DESTROYED + // If the state is incorrect we will throw IllegalStateException. + private ControllerState state = ControllerState.INITIALIZED; + + private ScanResumerImpl resumer; + + private void preCheck() { + Preconditions.checkState(Thread.currentThread() == callerThread, + "should not call this method outside onNext or onHeartbeat"); + Preconditions.checkState(state.equals(ControllerState.INITIALIZED), + "Invalid Stopper state %s", state); + } + + @Override + public ScanResumer suspend() { + preCheck(); + state = ControllerState.SUSPENDED; + ScanResumerImpl resumer = new ScanResumerImpl(); + this.resumer = resumer; + return resumer; + } + + @Override + public void terminate() { + preCheck(); + state = ControllerState.TERMINATED; + } + + // return the current state, and set the state to DESTROYED. + ControllerState destroy() { + ControllerState state = this.state; + this.state = ControllerState.DESTROYED; + return state; + } + } + + private enum ResumerState { + INITIALIZED, SUSPENDED, RESUMED + } + + // The resume method is allowed to be called in another thread so here we also use the + // ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back + // from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED, + // and when user calls resume method, we will change the state to RESUMED. But the resume method + // could be called in other thread, and in fact, user could just do this: + // controller.suspend().resume() + // This is strange but valid. This means the scan could be resumed before we call the prepare + // method to do the actual suspend work. So in the resume method, we will check if the state is + // INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare + // method, if the state is RESUMED already, we will just return an let the scan go on. + // Notice that, the public methods of this class is supposed to be called by upper layer only, and + // package private methods can only be called within the implementation of + // AsyncScanSingleRegionRpcRetryingCaller. + private final class ScanResumerImpl implements RawScanResultConsumer.ScanResumer { + + // INITIALIZED -> SUSPENDED -> RESUMED + // INITIALIZED -> RESUMED + private ResumerState state = ResumerState.INITIALIZED; + + private ScanResponse resp; + + private int numValidResults; + + // If the scan is suspended successfully, we need to do lease renewal to prevent it being closed + // by RS due to lease expire. It is a one-time timer task so we need to schedule a new task + // every time when the previous task is finished. There could also be race as the renewal is + // executed in the timer thread, so we also need to check the state before lease renewal. If the + // state is RESUMED already, we will give up lease renewal and also not schedule the next lease + // renewal task. + private Timeout leaseRenewer; + + @Override + public void resume() { + // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we + // just return at the first if condition without loading the resp and numValidResuls field. If + // resume is called after suspend, then it is also safe to just reference resp and + // numValidResults after the synchronized block as no one will change it anymore. + ScanResponse localResp; + int localNumValidResults; + synchronized (this) { + if (state == ResumerState.INITIALIZED) { + // user calls this method before we call prepare, so just set the state to + // RESUMED, the implementation will just go on. + state = ResumerState.RESUMED; + return; + } + if (state == ResumerState.RESUMED) { + // already resumed, give up. + return; + } + state = ResumerState.RESUMED; + if (leaseRenewer != null) { + leaseRenewer.cancel(); + } + localResp = this.resp; + localNumValidResults = this.numValidResults; + } + completeOrNext(localResp, localNumValidResults); + } + + private void scheduleRenewLeaseTask() { + leaseRenewer = retryTimer.newTimeout(t -> tryRenewLease(), scannerLeaseTimeoutPeriodNs / 2, + TimeUnit.NANOSECONDS); + } + + private synchronized void tryRenewLease() { + // the scan has already been resumed, give up + if (state == ResumerState.RESUMED) { + return; + } + renewLease(); + // schedule the next renew lease task again as this is a one-time task. + scheduleRenewLeaseTask(); + } + + // return false if the scan has already been resumed. See the comment above for ScanResumerImpl + // for more details. + synchronized boolean prepare(ScanResponse resp, int numValidResults) { + if (state == ResumerState.RESUMED) { + // user calls resume before we actually suspend the scan, just continue; + return false; + } + state = ResumerState.SUSPENDED; + this.resp = resp; + this.numValidResults = numValidResults; + // if there are no more results in region then the scanner at RS side will be closed + // automatically so we do not need to renew lease. + if (resp.getMoreResultsInRegion()) { + // schedule renew lease task + scheduleRenewLeaseTask(); + } + return true; + } + } + public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, Scan scan, long scannerId, ScanResultCache resultCache, - RawScanResultConsumer consumer, Interface stub, HRegionLocation loc, long pauseNs, - int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + RawScanResultConsumer consumer, Interface stub, HRegionLocation loc, + long scannerLeaseTimeoutPeriodNs, long pauseNs, int maxAttempts, long scanTimeoutNs, + long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.scan = scan; this.scannerId = scannerId; @@ -115,6 +286,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { this.consumer = consumer; this.stub = stub; this.loc = loc; + this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs; this.pauseNs = pauseNs; this.maxAttempts = maxAttempts; this.scanTimeoutNs = scanTimeoutNs; @@ -143,9 +315,9 @@ class AsyncScanSingleRegionRpcRetryingCaller { ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false); stub.scan(controller, req, resp -> { if (controller.failed()) { - LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId - + " for " + loc.getRegionInfo().getEncodedName() + " of " - + loc.getRegionInfo().getTable() + " failed, ignore, probably already closed", + LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId + + " for " + loc.getRegionInfo().getEncodedName() + " of " + + loc.getRegionInfo().getTable() + " failed, ignore, probably already closed", controller.getFailed()); } }); @@ -182,16 +354,15 @@ class AsyncScanSingleRegionRpcRetryingCaller { private void onError(Throwable error) { error = translateException(error); if (tries > startLogErrorsCnt) { - LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " - + loc.getRegionInfo().getEncodedName() + " of " + loc.getRegionInfo().getTable() - + " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " - + TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() - + " ms", + LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " + + loc.getRegionInfo().getEncodedName() + " of " + loc.getRegionInfo().getTable() + + " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " + + TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() + + " ms", error); } - boolean scannerClosed = - error instanceof UnknownScannerException || error instanceof NotServingRegionException - || error instanceof RegionServerStoppedException; + boolean scannerClosed = error instanceof UnknownScannerException || + error instanceof NotServingRegionException || error instanceof RegionServerStoppedException; RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(), ""); @@ -229,7 +400,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { private void updateNextStartRowWhenError(Result result) { nextStartRowWhenError = result.getRow(); - includeNextStartRowWhenError = scan.getBatch() > 0 || result.isPartial(); + includeNextStartRowWhenError = result.mayHaveMoreCellsInRow(); } private void completeWhenNoMoreResultsInRegion() { @@ -248,6 +419,27 @@ class AsyncScanSingleRegionRpcRetryingCaller { } } + private void completeOrNext(ScanResponse resp, int numValidResults) { + if (resp.hasMoreResults() && !resp.getMoreResults()) { + // RS tells us there is no more data for the whole scan + completeNoMoreResults(); + return; + } + if (scan.getLimit() > 0) { + // The RS should have set the moreResults field in ScanResponse to false when we have reached + // the limit. + int limit = scan.getLimit() - numValidResults; + assert limit > 0; + scan.setLimit(limit); + } + // as in 2.0 this value will always be set + if (!resp.getMoreResultsInRegion()) { + completeWhenNoMoreResultsInRegion.run(); + return; + } + next(); + } + private void onComplete(HBaseRpcController controller, ScanResponse resp) { if (controller.failed()) { onError(controller.getFailed()); @@ -269,20 +461,16 @@ class AsyncScanSingleRegionRpcRetryingCaller { return; } - boolean stopByUser; + ScanControllerImpl scanController = new ScanControllerImpl(); if (results.length == 0) { // if we have nothing to return then this must be a heartbeat message. - stopByUser = !consumer.onHeartbeat(); + consumer.onHeartbeat(scanController); } else { updateNextStartRowWhenError(results[results.length - 1]); - stopByUser = !consumer.onNext(results); + consumer.onNext(results, scanController); } - if (resp.hasMoreResults() && !resp.getMoreResults()) { - // RS tells us there is no more data for the whole scan - completeNoMoreResults(); - return; - } - if (stopByUser) { + ControllerState state = scanController.destroy(); + if (state == ControllerState.TERMINATED) { if (resp.getMoreResultsInRegion()) { // we have more results in region but user request to stop the scan, so we need to close the // scanner explicitly. @@ -291,19 +479,12 @@ class AsyncScanSingleRegionRpcRetryingCaller { completeNoMoreResults(); return; } - if (scan.getLimit() > 0) { - // The RS should have set the moreResults field in ScanResponse to false when we have reached - // the limit. - int limit = scan.getLimit() - results.length; - assert limit > 0; - scan.setLimit(limit); - } - // as in 2.0 this value will always be set - if (!resp.getMoreResultsInRegion()) { - completeWhenNoMoreResultsInRegion.run(); - return; + if (state == ControllerState.SUSPENDED) { + if (scanController.resumer.prepare(resp, results.length)) { + return; + } } - next(); + completeOrNext(resp, results.length); } private void call() { @@ -337,6 +518,15 @@ class AsyncScanSingleRegionRpcRetryingCaller { call(); } + private void renewLease() { + nextCallSeq++; + resetController(controller, rpcTimeoutNs); + ScanRequest req = + RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1); + stub.scan(controller, req, resp -> { + }); + } + /** * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java index e2c4ec3..38d4b2c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; -import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import java.io.IOException; @@ -29,9 +29,7 @@ import java.util.Queue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.Bytes; /** * The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically @@ -45,8 +43,6 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { private final RawAsyncTable rawTable; - private final Scan scan; - private final long maxCacheSize; private final Queue queue = new ArrayDeque<>(); @@ -57,16 +53,10 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { private Throwable error; - private boolean prefetchStopped; - - private int numberOfOnCompleteToIgnore; - - // used to filter out cells that already returned when we restart a scan - private Cell lastCell; + private ScanResumer resumer; public AsyncTableResultScanner(RawAsyncTable table, Scan scan, long maxCacheSize) { this.rawTable = table; - this.scan = scan; this.maxCacheSize = maxCacheSize; table.scan(scan, this); } @@ -76,71 +66,36 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { cacheSize += calcEstimatedSize(result); } - private void stopPrefetch(Result lastResult) { - prefetchStopped = true; - if (lastResult.isPartial() || scan.getBatch() > 0) { - scan.withStartRow(lastResult.getRow()); - lastCell = lastResult.rawCells()[lastResult.rawCells().length - 1]; - } else { - scan.withStartRow(lastResult.getRow(), false); - } + private void stopPrefetch(ScanController controller) { if (LOG.isDebugEnabled()) { - LOG.debug( - String.format("0x%x", System.identityHashCode(this)) + " stop prefetching when scanning " - + rawTable.getName() + " as the cache size " + cacheSize - + " is greater than the maxCacheSize " + maxCacheSize + ", the next start row is " - + Bytes.toStringBinary(scan.getStartRow()) + ", lastCell is " + lastCell); + LOG.debug(String.format("0x%x", System.identityHashCode(this)) + + " stop prefetching when scanning " + rawTable.getName() + " as the cache size " + + cacheSize + " is greater than the maxCacheSize " + maxCacheSize); } - // Ignore an onComplete call as the scan is stopped by us. - // Here we can not use a simple boolean flag. A scan operation can cross multiple regions and - // the regions may be located on different regionservers, so it is possible that the methods of - // RawScanResultConsumer are called in different rpc framework threads and overlapped with each - // other. It may happen that - // 1. we stop scan1 - // 2. we start scan2 - // 3. we stop scan2 - // 4. onComplete for scan1 is called - // 5. onComplete for scan2 is called - // So if we use a boolean flag here then we can only ignore the onComplete in step4 and think - // that the onComplete in step 5 tells us there is no data. - numberOfOnCompleteToIgnore++; + resumer = controller.suspend(); } @Override - public synchronized boolean onNext(Result[] results) { + public synchronized void onNext(Result[] results, ScanController controller) { assert results.length > 0; if (closed) { - return false; - } - Result firstResult = results[0]; - if (lastCell != null) { - firstResult = filterCells(firstResult, lastCell); - if (firstResult != null) { - // do not set lastCell to null if the result after filtering is null as there may still be - // other cells that can be filtered out - lastCell = null; - addToCache(firstResult); - } else if (results.length == 1) { - // the only one result is null - return true; - } - } else { - addToCache(firstResult); + controller.terminate(); + return; } - for (int i = 1; i < results.length; i++) { - addToCache(results[i]); + for (Result result : results) { + addToCache(result); } notifyAll(); - if (cacheSize < maxCacheSize) { - return true; + if (cacheSize >= maxCacheSize) { + stopPrefetch(controller); } - stopPrefetch(results[results.length - 1]); - return false; } @Override - public synchronized boolean onHeartbeat() { - return !closed; + public synchronized void onHeartbeat(ScanController controller) { + if (closed) { + controller.terminate(); + } } @Override @@ -150,12 +105,6 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { @Override public synchronized void onComplete() { - // Do not mark the scanner as closed if the scan is stopped by us due to cache size limit since - // we may resume later by starting a new scan. See resumePrefetch. - if (numberOfOnCompleteToIgnore > 0) { - numberOfOnCompleteToIgnore--; - return; - } closed = true; notifyAll(); } @@ -164,8 +113,8 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { if (LOG.isDebugEnabled()) { LOG.debug(String.format("0x%x", System.identityHashCode(this)) + " resume prefetching"); } - prefetchStopped = false; - rawTable.scan(scan, this); + resumer.resume(); + resumer = null; } @Override @@ -186,7 +135,7 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { } Result result = queue.poll(); cacheSize -= calcEstimatedSize(result); - if (prefetchStopped && cacheSize <= maxCacheSize / 2) { + if (resumer != null && cacheSize <= maxCacheSize / 2) { resumePrefetch(); } return result; @@ -197,13 +146,22 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { closed = true; queue.clear(); cacheSize = 0; + if (resumer != null) { + resumePrefetch(); + } notifyAll(); } @Override public boolean renewLease() { - // we will do prefetching in the background and if there is no space we will just terminate the - // background scan operation. So there is no reason to renew lease here. + // we will do prefetching in the background and if there is no space we will just suspend the + // scanner. The renew lease operation will be handled in the background. return false; } + + // used in tests to test whether the scanner has been suspended + @VisibleForTesting + synchronized boolean isSuspended() { + return resumer != null; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java index 59924cf..e493123 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java @@ -52,13 +52,13 @@ public interface RawAsyncTable extends AsyncTableBase { /** * The basic scan API uses the observer pattern. All results that match the given scan object will - * be passed to the given {@code consumer} by calling - * {@link RawScanResultConsumer#onNext(Result[])}. {@link RawScanResultConsumer#onComplete()} - * means the scan is finished, and {@link RawScanResultConsumer#onError(Throwable)} means we hit - * an unrecoverable error and the scan is terminated. {@link RawScanResultConsumer#onHeartbeat()} - * means the RS is still working but we can not get a valid result to call - * {@link RawScanResultConsumer#onNext(Result[])}. This is usually because the matched results are - * too sparse, for example, a filter which almost filters out everything is specified. + * be passed to the given {@code consumer} by calling {@code RawScanResultConsumer.onNext}. + * {@code RawScanResultConsumer.onComplete} means the scan is finished, and + * {@code RawScanResultConsumer.onError} means we hit an unrecoverable error and the scan is + * terminated. {@code RawScanResultConsumer.onHeartbeat} means the RS is still working but we can + * not get a valid result to call {@code RawScanResultConsumer.onNext}. This is usually because + * the matched results are too sparse, for example, a filter which almost filters out everything + * is specified. *

* Notice that, the methods of the given {@code consumer} will be called directly in the rpc * framework's callback thread, so typically you should not do any time consuming work inside diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 00f255e..fa3d792 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -358,9 +358,8 @@ class RawAsyncTableImpl implements RawAsyncTable { scan(scan, new RawScanResultConsumer() { @Override - public boolean onNext(Result[] results) { + public void onNext(Result[] results, ScanController controller) { scanResults.addAll(Arrays.asList(results)); - return true; } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java index 2e5d422..17e0afa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java @@ -24,10 +24,10 @@ import org.apache.hadoop.hbase.client.Result; /** * Receives {@link Result} for an asynchronous scan. *

- * Notice that, the {@link #onNext(Result[])} method will be called in the thread which we send - * request to HBase service. So if you want the asynchronous scanner fetch data from HBase in - * background while you process the returned data, you need to move the processing work to another - * thread to make the {@code onNext} call return immediately. And please do NOT do any time + * Notice that, the {@link #onNext(Result[], ScanController)} method will be called in the thread + * which we send request to HBase service. So if you want the asynchronous scanner fetch data from + * HBase in background while you process the returned data, you need to move the processing work to + * another thread to make the {@code onNext} call return immediately. And please do NOT do any time * consuming tasks in all methods below unless you know what you are doing. */ @InterfaceAudience.Public @@ -35,20 +35,70 @@ import org.apache.hadoop.hbase.client.Result; public interface RawScanResultConsumer { /** + * Used to resume a scan. + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + interface ScanResumer { + + /** + * Resume the scan. You are free to call it multiple time but only the first call will take + * effect. + */ + void resume(); + } + + /** + * Used to suspend or stop a scan. + *

+ * Notice that, you should only call the methods below inside onNext or onHeartbeat method. A + * IllegalStateException will be thrown if you call them at other places. + *

+ * You can only call one of the methods below, i.e., call suspend or terminate(of course you are + * free to not call them both), and the methods are not reentrant. A IllegalStateException will be + * thrown if you have already called one of the methods. + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + interface ScanController { + + /** + * Suspend the scan. + *

+ * This means we will stop fetching data in background, i.e., will not call onNext any more + * before you resume the scan. + * @return A resumer used to resume the scan later. + */ + ScanResumer suspend(); + + /** + * Terminate the scan. + *

+ * This is useful when you have got enough results and want to stop the scan in onNext method, + * or you want to stop the scan in onHeartbeat method because it has spent too many time. + */ + void terminate(); + } + + /** + * Indicate that we have receive some data. * @param results the data fetched from HBase service. - * @return {@code false} if you want to terminate the scan process. Otherwise {@code true} + * @param controller used to suspend or terminate the scan. Notice that the {@code controller} + * instance is only valid within scope of onNext method. You can only call its method in + * onNext, do NOT store it and call it later outside onNext. */ - boolean onNext(Result[] results); + void onNext(Result[] results, ScanController controller); /** * Indicate that there is an heartbeat message but we have not cumulated enough cells to call * onNext. *

* This method give you a chance to terminate a slow scan operation. - * @return {@code false} if you want to terminate the scan process. Otherwise {@code true} + * @param controller used to suspend or terminate the scan. Notice that the {@code controller} + * instance is only valid within the scope of onHeartbeat method. You can only call its + * method in onHeartbeat, do NOT store it and call it later outside onHeartbeat. */ - default boolean onHeartbeat() { - return true; + default void onHeartbeat(ScanController controller) { } /** diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java index d10e0f2..21b376c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -147,7 +147,7 @@ public class Threads { try { Thread.sleep(millis); } catch (InterruptedException e) { - e.printStackTrace(); + LOG.warn("sleep interrupted", e); Thread.currentThread().interrupt(); } } diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java index f8d0a19..30f3d30 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java @@ -407,7 +407,7 @@ public class AsyncAggregationClient { private R value = null; @Override - public boolean onNext(Result[] results) { + public void onNext(Result[] results, ScanController controller) { try { for (Result result : results) { Cell weightCell = result.getColumnLatestCell(family, weightQualifier); @@ -419,15 +419,15 @@ public class AsyncAggregationClient { } else { future.completeExceptionally(new NoSuchElementException()); } - return false; + controller.terminate(); + return; } Cell valueCell = result.getColumnLatestCell(family, valueQualifier); value = ci.getValue(family, valueQualifier, valueCell); } - return true; } catch (IOException e) { future.completeExceptionally(e); - return false; + controller.terminate(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanRenewLease.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanRenewLease.java new file mode 100644 index 0000000..a70b8d2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanRenewLease.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableScanRenewLease { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] CQ = Bytes.toBytes("cq"); + + private static AsyncConnection CONN; + + private static RawAsyncTable TABLE; + + private static int SCANNER_LEASE_TIMEOUT_PERIOD_MS = 5000; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + SCANNER_LEASE_TIMEOUT_PERIOD_MS); + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); + TABLE = CONN.getRawTable(TABLE_NAME); + TABLE.putAll(IntStream.range(0, 10).mapToObj( + i -> new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i))) + .collect(Collectors.toList())).get(); + } + + @AfterClass + public static void tearDown() throws Exception { + CONN.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + private static final class RenewLeaseConsumer implements RawScanResultConsumer { + + private final List results = new ArrayList<>(); + + private Throwable error; + + private boolean finished = false; + + private boolean suspended = false; + + @Override + public synchronized void onNext(Result[] results, ScanController controller) { + for (Result result : results) { + this.results.add(result); + } + if (!suspended) { + ScanResumer resumer = controller.suspend(); + new Thread(() -> { + Threads.sleep(2 * SCANNER_LEASE_TIMEOUT_PERIOD_MS); + try { + TABLE.put(new Put(Bytes.toBytes(String.format("%02d", 10))).addColumn(FAMILY, CQ, + Bytes.toBytes(10))).get(); + } catch (Exception e) { + onError(e); + } + resumer.resume(); + }).start(); + } + } + + @Override + public synchronized void onError(Throwable error) { + this.finished = true; + this.error = error; + notifyAll(); + } + + @Override + public synchronized void onComplete() { + this.finished = true; + notifyAll(); + } + + public synchronized List get() throws Throwable { + while (!finished) { + wait(); + } + if (error != null) { + throw error; + } + return results; + } + } + + @Test + public void test() throws Throwable { + RenewLeaseConsumer consumer = new RenewLeaseConsumer(); + TABLE.scan(new Scan(), consumer); + List results = consumer.get(); + // should not see the newly added value + assertEquals(10, results.size()); + IntStream.range(0, 10).forEach(i -> { + Result result = results.get(i); + assertEquals(String.format("%02d", i), Bytes.toString(result.getRow())); + assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ))); + }); + // now we can see the newly added value + List results2 = TABLE.scanAll(new Scan()).get(); + assertEquals(11, results2.size()); + IntStream.range(0, 11).forEach(i -> { + Result result = results2.get(i); + assertEquals(String.format("%02d", i), Bytes.toString(result.getRow())); + assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ))); + }); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java index 006770d..a3cad17 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java @@ -82,5 +82,4 @@ public class TestAsyncTableScanner extends AbstractTestAsyncTableScan { } return results; } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java new file mode 100644 index 0000000..0f132d1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.ForkJoinPool; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableScannerCloseWhileSuspending { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] CQ = Bytes.toBytes("cq"); + + private static AsyncConnection CONN; + + private static AsyncTable TABLE; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); + TABLE = CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + TABLE.putAll(IntStream.range(0, 100).mapToObj( + i -> new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i))) + .collect(Collectors.toList())).get(); + } + + @AfterClass + public static void tearDown() throws Exception { + CONN.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + private int getScannersCount() { + return TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() + .map(t -> t.getRegionServer()).mapToInt(rs -> rs.getRSRpcServices().getScannersCount()) + .sum(); + } + + @Test + public void testCloseScannerWhileSuspending() throws Exception { + try (ResultScanner scanner = TABLE.getScanner(new Scan().setMaxResultSize(1))) { + TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return ((AsyncTableResultScanner) scanner).isSuspended(); + } + + @Override + public String explainFailure() throws Exception { + return "The given scanner has been suspended in time"; + } + }); + assertEquals(1, getScannersCount()); + } + TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return getScannersCount() == 0; + } + + @Override + public String explainFailure() throws Exception { + return "Still have " + getScannersCount() + " scanners opened"; + } + }); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java index a8ef353..0be236d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java @@ -48,12 +48,11 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan { private Throwable error; @Override - public synchronized boolean onNext(Result[] results) { + public synchronized void onNext(Result[] results, ScanController controller) { for (Result result : results) { queue.offer(result); } notifyAll(); - return true; } @Override -- 2.7.4