From c2acce048a363f31afda3c27e84c6b8f93c9ef37 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 2 Nov 2016 14:32:20 +0800 Subject: [PATCH] HBASE-16838 Implement basic scan --- .../hbase/client/AllowPartialScanResultCache.java | 97 ++++++ .../hadoop/hbase/client/AsyncClientScanner.java | 147 +++++++++ .../hbase/client/AsyncConnectionConfiguration.java | 6 +- .../client/AsyncRpcRetryingCallerFactory.java | 86 +++++ .../client/AsyncScanRegionRpcRetryingCaller.java | 367 +++++++++++++++++++++ .../AsyncSingleRequestRpcRetryingCaller.java | 39 +-- .../org/apache/hadoop/hbase/client/AsyncTable.java | 26 ++ .../apache/hadoop/hbase/client/AsyncTableImpl.java | 32 +- .../apache/hadoop/hbase/client/ClientScanner.java | 1 + .../client/CompleteResultScanResultCache.java | 97 ++++++ .../hadoop/hbase/client/ConnectionUtils.java | 26 ++ .../apache/hadoop/hbase/client/ScanObserver.java | 63 ++++ .../hadoop/hbase/client/ScanResultCache.java | 35 ++ .../hadoop/hbase/client/TestAsyncTableScan.java | 159 +++++++++ 14 files changed, 1141 insertions(+), 40 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanRegionRpcRetryingCaller.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteResultScanResultCache.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanObserver.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java new file mode 100644 index 0000000..7704a9e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A ScanResultCache that may return partial result. + *

+ * As we can only scan from the starting of a row when error, so here we also implement the logic + * that skips the cells that have already been returned. + */ +@InterfaceAudience.Private +class AllowPartialScanResultCache implements ScanResultCache { + + // used to filter out the cells that already returned to user as we always start from the + // beginning of a row when retry. + private Cell lastCell; + + private Result filterCells(Result result) { + if (lastCell == null) { + return result; + } + // not the same row + if (!Bytes.equals(lastCell.getRowArray(), lastCell.getRowOffset(), lastCell.getRowLength(), + result.getRow(), 0, result.getRow().length)) { + return result; + } + Cell[] rawCells = result.rawCells(); + int index = Arrays.binarySearch(rawCells, lastCell, CellComparator::compareWithoutRow); + if (index < 0) { + index = -index - 1; + } else { + index++; + } + if (index == 0) { + return result; + } + if (index == rawCells.length) { + return null; + } + return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), false, + result.isStale(), true); + } + + private void updateLastCell(Result result) { + lastCell = result.isPartial() ? result.rawCells()[result.rawCells().length - 1] : null; + } + + @Override + public Result[] offerAndPoll(Result[] results, boolean isHeartbeatMessage) throws IOException { + if (results.length == 0) { + return EMPTY_RESULT_ARRAY; + } + Result first = filterCells(results[0]); + if (results.length == 1) { + if (first == null) { + // do not update last cell if we filter out all cells + return EMPTY_RESULT_ARRAY; + } + updateLastCell(results[0]); + results[0] = first; + return results; + } + updateLastCell(results[results.length - 1]); + if (first == null) { + return Arrays.copyOfRange(results, 1, results.length); + } + results[0] = first; + return results; + } + + @Override + public void clear() { + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java new file mode 100644 index 0000000..953df17 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; +import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; + +/** + * The asynchronous client scanner implementation. + *

+ * Here we will call OpenScanner first and use the returned scannerId to create a + * {@link AsyncScanRegionRpcRetryingCaller} to do the real scan operation. The return value of + * {@link AsyncScanRegionRpcRetryingCaller} will tell us whether open a new scanner or finish scan. + */ +@InterfaceAudience.Private +class AsyncClientScanner { + + private final Scan originScan; + + private final ScanObserver scanObserver; + + private final TableName tableName; + + private final AsyncConnectionImpl conn; + + private final long scanTimeoutNs; + + private final long rpcTimeoutNs; + + private final ScanResultCache resultCache; + + public AsyncClientScanner(Scan scan, ScanObserver scanObserver, TableName tableName, + AsyncConnectionImpl conn, long scanTimeoutNs, long rpcTimeoutNs) { + this.originScan = scan; + if (scan.getStartRow() == null) { + scan.setStartRow(EMPTY_START_ROW); + } + if (scan.getStopRow() == null) { + scan.setStopRow(EMPTY_END_ROW); + } + this.scanObserver = scanObserver; + this.tableName = tableName; + this.conn = conn; + this.scanTimeoutNs = scanTimeoutNs; + this.rpcTimeoutNs = rpcTimeoutNs; + this.resultCache = scan.getAllowPartialResults() || scan.getBatch() > 0 + ? new AllowPartialScanResultCache() : new CompleteResultScanResultCache(); + } + + private static final class OpenScannerResponse { + + public final HRegionLocation loc; + + public final ClientService.Interface stub; + + public final long scannerId; + + public OpenScannerResponse(HRegionLocation loc, Interface stub, long scannerId) { + this.loc = loc; + this.stub = stub; + this.scannerId = scannerId; + } + } + + private CompletableFuture callOpenScanner(HBaseRpcController controller, + HRegionLocation loc, ClientService.Interface stub) { + CompletableFuture future = new CompletableFuture<>(); + try { + ScanRequest request = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), + originScan, 0, false); + stub.scan(controller, request, resp -> { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + return; + } + future.complete(new OpenScannerResponse(loc, stub, resp.getScannerId())); + }); + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + } + + private void startScan(OpenScannerResponse resp) { + conn.callerFactory.scanRegion().id(resp.scannerId).location(resp.loc).stub(resp.stub) + .setScan(originScan).observer(scanObserver).resultCache(resultCache) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).start() + .whenComplete((scanResp, error) -> { + if (error != null) { + scanObserver.onError(error); + return; + } + if (scanResp == null) { + scanObserver.onComplete(); + } else { + openScanner(scanResp.getScan(), scanResp.isLocateToPreviousRegion()); + } + }); + } + + private void openScanner(Scan scan, boolean locateToPreviousRegion) { + conn.callerFactory. single().table(tableName).row(scan.getStartRow()) + .locateToPreviousRegion(locateToPreviousRegion) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::callOpenScanner).call() + .whenComplete((resp, error) -> { + if (error != null) { + scanObserver.onError(error); + return; + } + startScan(resp); + }); + } + + public void start() { + openScanner(originScan, originScan.isReversed() && originScan.getStartRow().length == 0); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index aaac845..3a648f9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -125,15 +125,15 @@ class AsyncConnectionConfiguration { return startLogErrorsCnt; } - public long getScanTimeoutNs() { + long getScanTimeoutNs() { return scanTimeoutNs; } - public int getScannerCaching() { + int getScannerCaching() { return scannerCaching; } - public long getScannerMaxResultSize() { + long getScannerMaxResultSize() { return scannerMaxResultSize; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 0d23c39..360215c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -26,8 +26,10 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; /** * Factory to create an AsyncRpcRetryCaller. @@ -171,4 +173,88 @@ class AsyncRpcRetryingCallerFactory { public SmallScanCallerBuilder smallScan() { return new SmallScanCallerBuilder(); } + + public class ScanRegionCallerBuilder { + + private long scannerId = -1L; + + private Scan scan; + + private ScanResultCache resultCache; + + private ScanObserver scanObserver; + + private ClientService.Interface stub; + + private HRegionLocation loc; + + private long scanTimeoutNs; + + private long rpcTimeoutNs; + + public ScanRegionCallerBuilder id(long scannerId) { + this.scannerId = scannerId; + return this; + } + + public ScanRegionCallerBuilder setScan(Scan scan) { + this.scan = scan; + return this; + } + + public ScanRegionCallerBuilder resultCache(ScanResultCache resultCache) { + this.resultCache = resultCache; + return this; + } + + public ScanRegionCallerBuilder observer(ScanObserver scanObserver) { + this.scanObserver = scanObserver; + return this; + } + + public ScanRegionCallerBuilder stub(ClientService.Interface stub) { + this.stub = stub; + return this; + } + + public ScanRegionCallerBuilder location(HRegionLocation loc) { + this.loc = loc; + return this; + } + + public ScanRegionCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) { + this.scanTimeoutNs = unit.toNanos(scanTimeout); + return this; + } + + public ScanRegionCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { + this.rpcTimeoutNs = unit.toNanos(rpcTimeout); + return this; + } + + public AsyncScanRegionRpcRetryingCaller build() { + checkArgument(scannerId >= 0, "invalid scannerId %d", scannerId); + return new AsyncScanRegionRpcRetryingCaller(retryTimer, conn, + checkNotNull(scan, "scan is null"), scannerId, + checkNotNull(resultCache, "resultCache is null"), + checkNotNull(scanObserver, "observer is null"), checkNotNull(stub, "stub is null"), + checkNotNull(loc, "location is null"), conn.connConf.getPauseNs(), + conn.connConf.getMaxRetries(), scanTimeoutNs, rpcTimeoutNs, + conn.connConf.getStartLogErrorsCnt()); + } + + /** + * Short cut for {@code build().start()}. + */ + public CompletableFuture start() { + return build().start(); + } + } + + /** + * Create retry caller for scanning a region. + */ + public ScanRegionCallerBuilder scanRegion() { + return new ScanRegionCallerBuilder(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanRegionRpcRetryingCaller.java new file mode 100644 index 0000000..17aaf58 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanRegionRpcRetryingCaller.java @@ -0,0 +1,367 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.*; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; +import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; +import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; +import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; +import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; + +import io.netty.util.HashedWheelTimer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; +import org.apache.hadoop.hbase.exceptions.ScannerResetException; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * Retry caller for scanning a region. + */ +@InterfaceAudience.Private +class AsyncScanRegionRpcRetryingCaller { + + private static final Log LOG = LogFactory.getLog(AsyncScanRegionRpcRetryingCaller.class); + + private final HashedWheelTimer retryTimer; + + private final Scan scan; + + private final long scannerId; + + private final ScanResultCache resultCache; + + private final ScanObserver scanObserver; + + private final ClientService.Interface stub; + + private final HRegionLocation loc; + + private final long pauseNs; + + private final int maxAttempts; + + private final long scanTimeoutNs; + + private final long rpcTimeoutNs; + + private final int startLogErrorsCnt; + + private final Supplier createNextStartRowWhenError; + + private final Runnable completeWhenNoMoreResultsInRegion; + + public static final class Response { + + private final Scan scan; + + private final boolean locateToPreviousRegion; + + public Response(Scan scan, boolean locateToPreviousRegion) { + this.scan = scan; + this.locateToPreviousRegion = locateToPreviousRegion; + } + + public Scan getScan() { + return scan; + } + + public boolean isLocateToPreviousRegion() { + return locateToPreviousRegion; + } + } + + private final CompletableFuture future; + + private final HBaseRpcController controller; + + private byte[] nextStartRowWhenError; + + private boolean includeNextStartRowWhenError; + + private long nextCallStartNs; + + private int tries = 1; + + private final List exceptions; + + private long nextCallSeq = -1L; + + public AsyncScanRegionRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, + Scan scan, long scannerId, ScanResultCache resultCache, ScanObserver scanObserver, + Interface stub, HRegionLocation loc, long pauseNs, int maxRetries, long scanTimeoutNs, + long rpcTimeoutNs, int startLogErrorsCnt) { + this.retryTimer = retryTimer; + this.scan = scan; + this.scannerId = scannerId; + this.resultCache = resultCache; + this.scanObserver = scanObserver; + this.stub = stub; + this.loc = loc; + this.pauseNs = pauseNs; + this.maxAttempts = retries2Attempts(maxRetries); + this.scanTimeoutNs = scanTimeoutNs; + this.rpcTimeoutNs = rpcTimeoutNs; + this.startLogErrorsCnt = startLogErrorsCnt; + if (scan.isReversed()) { + createNextStartRowWhenError = this::createReversedNextStartRowWhenError; + completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion; + } else { + createNextStartRowWhenError = this::createNextStartRowWhenError; + completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion; + } + this.future = new CompletableFuture<>(); + this.controller = conn.rpcControllerFactory.newController(); + this.exceptions = new ArrayList<>(); + } + + private long elapsedMs() { + return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs); + } + + private void closeScanner() { + resetController(controller, rpcTimeoutNs); + ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false); + stub.scan(controller, req, resp -> { + if (controller.failed()) { + LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId + + " for " + loc.getRegionInfo().getEncodedName() + " of " + + loc.getRegionInfo().getTable() + " failed, ignore, probably already closed", + controller.getFailed()); + } + }); + } + + private void completeExceptionally(boolean closeScanner) { + resultCache.clear(); + if (closeScanner) { + closeScanner(); + } + future.completeExceptionally(new RetriesExhaustedException(tries, exceptions)); + } + + private void completeNoMoreResults() { + future.complete(null); + } + + private void completeWithNextStartRow(byte[] nextStartRow) { + future.complete(new Response(scan.setStartRow(nextStartRow), scan.isReversed())); + } + + private byte[] createNextStartRowWhenError() { + return createClosestRowAfter(nextStartRowWhenError); + } + + private byte[] createReversedNextStartRowWhenError() { + return createClosestRowBefore(nextStartRowWhenError); + } + + private Scan getNextScanWhenError() { + if (nextStartRowWhenError == null) { + // no need to reset start row + return scan; + } + if (includeNextStartRowWhenError) { + return scan.setStartRow(nextStartRowWhenError); + } + return scan.setStartRow(createNextStartRowWhenError.get()); + } + + private void completeWhenError(boolean closeScanner) { + resultCache.clear(); + if (closeScanner) { + closeScanner(); + } + Scan nextScan = getNextScanWhenError(); + future.complete(new Response(nextScan, nextScan.isReversed() + && Bytes.equals(nextScan.getStartRow(), loc.getRegionInfo().getEndKey()))); + } + + private void onError(Throwable error) { + error = translateException(error); + if (tries > startLogErrorsCnt) { + LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " + + loc.getRegionInfo().getEncodedName() + " of " + loc.getRegionInfo().getTable() + + " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " + + TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() + + " ms", + error); + } + boolean scannerClosed = error instanceof UnknownScannerException + || error instanceof NotServingRegionException + || error instanceof RegionServerStoppedException; + RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext( + error, EnvironmentEdgeManager.currentTime(), ""); + exceptions.add(qt); + if (tries >= maxAttempts) { + completeExceptionally(!scannerClosed); + return; + } + long delayNs; + if (scanTimeoutNs > 0) { + long maxDelayNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs); + if (maxDelayNs <= 0) { + completeExceptionally(!scannerClosed); + return; + } + delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); + } else { + delayNs = getPauseTime(pauseNs, tries - 1); + } + if (scannerClosed) { + completeWhenError(false); + return; + } + if (error instanceof OutOfOrderScannerNextException || error instanceof ScannerResetException) { + completeWhenError(true); + return; + } + if (error instanceof DoNotRetryIOException) { + completeExceptionally(true); + return; + } + retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS); + } + + private void updateNextStartRowWhenError(Result result) { + nextStartRowWhenError = result.getRow(); + includeNextStartRowWhenError = result.isPartial(); + } + + private void completeWhenNoMoreResultsInRegion() { + if (isEmptyStopRow(scan.getStopRow())) { + if (isEmptyStopRow(loc.getRegionInfo().getEndKey())) { + completeNoMoreResults(); + } + } else { + if (Bytes.compareTo(loc.getRegionInfo().getEndKey(), scan.getStopRow()) >= 0) { + completeNoMoreResults(); + } + } + completeWithNextStartRow(loc.getRegionInfo().getEndKey()); + } + + private void completeReversedWhenNoMoreResultsInRegion() { + if (isEmptyStopRow(scan.getStopRow())) { + if (isEmptyStartRow(loc.getRegionInfo().getStartKey())) { + completeNoMoreResults(); + } + } else { + if (Bytes.compareTo(loc.getRegionInfo().getStartKey(), scan.getStopRow()) <= 0) { + completeNoMoreResults(); + } + } + completeWithNextStartRow(loc.getRegionInfo().getStartKey()); + } + + private void onComplete(ScanResponse resp) { + if (controller.failed()) { + onError(controller.getFailed()); + return; + } + boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage(); + Result[] results; + try { + results = resultCache.offerAndPoll( + ResponseConverter.getResults(controller.cellScanner(), resp), isHeartbeatMessage); + } catch (IOException e) { + // We can not retry here. The server has responded normally and the call sequence has been + // increased so a new scan with the same call sequence will cause an + // OutOfOrderScannerNextException. Let the upper layer open a scanner. + LOG.warn("decode scan response failed", e); + completeWhenError(true); + return; + } + + boolean stopByUser; + if (results.length == 0) { + // if we have nothing to return then this must be a heartbeat message. + stopByUser = !scanObserver.onHeartbeat(); + } else { + updateNextStartRowWhenError(results[results.length - 1]); + stopByUser = !scanObserver.onNext(results); + } + if (resp.hasMoreResults() && !resp.getMoreResults()) { + // RS tells us there is no more data for the whole scan + completeNoMoreResults(); + return; + } + if (stopByUser) { + if (resp.getMoreResultsInRegion()) { + // we have more results in region but user request to stop the scan, so we need to close the + // scanner explicitly. + closeScanner(); + } + completeNoMoreResults(); + return; + } + // as in 2.0 this value will always be set + if (!resp.getMoreResultsInRegion()) { + completeWhenNoMoreResultsInRegion.run(); + return; + } + next(); + } + + private void call() { + resetController(controller, rpcTimeoutNs); + ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false, + nextCallSeq, false, false); + stub.scan(controller, req, this::onComplete); + } + + private void next() { + nextCallSeq++; + tries = 0; + exceptions.clear(); + nextCallStartNs = System.nanoTime(); + call(); + } + + /** + * @return return the scan object and locate direction for next open scanner call, or null if we + * should stop. + */ + public CompletableFuture start() { + next(); + return future; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index f10c9a5..36687c6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -18,14 +18,13 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; +import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; +import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import io.netty.util.HashedWheelTimer; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; import java.io.IOException; -import java.lang.reflect.UndeclaredThrowableException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -40,11 +39,9 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.ipc.RemoteException; /** * Retry caller for a single request, such as get, put, delete, etc. @@ -121,19 +118,6 @@ class AsyncSingleRequestRpcRetryingCaller { return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); } - private static Throwable translateException(Throwable t) { - if (t instanceof UndeclaredThrowableException && t.getCause() != null) { - t = t.getCause(); - } - if (t instanceof RemoteException) { - t = ((RemoteException) t).unwrapRemoteException(); - } - if (t instanceof ServiceException && t.getCause() != null) { - t = translateException(t.getCause()); - } - return t; - } - private void completeExceptionally() { future.completeExceptionally(new RetriesExhaustedException(tries, exceptions)); } @@ -165,22 +149,7 @@ class AsyncSingleRequestRpcRetryingCaller { } updateCachedLocation.accept(error); tries++; - retryTimer.newTimeout(new TimerTask() { - - @Override - public void run(Timeout timeout) throws Exception { - // always restart from beginning. - locateThenCall(); - } - }, delayNs, TimeUnit.NANOSECONDS); - } - - private void resetController() { - controller.reset(); - if (rpcTimeoutNs >= 0) { - controller.setCallTimeout( - (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(rpcTimeoutNs))); - } + retryTimer.newTimeout(t -> locateThenCall(), delayNs, TimeUnit.NANOSECONDS); } private void call(HRegionLocation loc) { @@ -197,7 +166,7 @@ class AsyncSingleRequestRpcRetryingCaller { err -> conn.getLocator().updateCachedLocation(loc, err)); return; } - resetController(); + resetController(controller, rpcTimeoutNs); callable.call(controller, loc, stub).whenComplete((result, error) -> { if (error != null) { onError(error, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index 94747b9..b350ccd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -99,6 +99,10 @@ public interface AsyncTable { */ long getOperationTimeout(TimeUnit unit); + void setScanTimeout(long timeout, TimeUnit unit); + + long getScanTimeout(TimeUnit unit); + /** * Test for the existence of columns in the table, as specified by the Get. *

@@ -335,4 +339,26 @@ public interface AsyncTable { * {@link CompletableFuture}. */ CompletableFuture> smallScan(Scan scan, int limit); + + /** + * The basic scan API using the observer pattern. All results that match the given scan object + * will be passed to the given {@code scanObserver} by calling + * {@link ScanObserver#onNext(Result[])}. {@link ScanObserver#onComplete()} means the scan is + * finished, and {@link ScanObserver#onError(Throwable)} means we hit an unrecoverable error and + * the scan is terminated. {@link ScanObserver#onHeartbeat()} means the RS is still working but we + * can not get a valid result to call {@link ScanObserver#onNext(Result[])}. This is usually + * because the matched results are too sparse, for example, a filter which almost filters out + * everything is specified. + *

+ * Notice that, the methods of the given {@code scanObserver} will be called directly in the rpc + * framework's callback thread, so typically you should not do any time consuming work inside + * these methods, otherwise you will be likely to block at least one connection to RS(even more if + * the rpc framework uses NIO). + *

+ * This method is only for experts, do NOT use this method if you have other + * choice. + * @param scan A configured {@link Scan} object. + * @param scanObserver the observer used to receive results. + */ + void scan(Scan scan, ScanObserver scanObserver); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index ce53775..6d74399 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -25,6 +25,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; @@ -55,6 +57,8 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; @InterfaceAudience.Private class AsyncTableImpl implements AsyncTable { + private static final Log LOG = LogFactory.getLog(AsyncTableImpl.class); + private final AsyncConnectionImpl conn; private final TableName tableName; @@ -267,8 +271,8 @@ class AsyncTableImpl implements AsyncTable { future.completeExceptionally(controller.getFailed()); } else { try { - org.apache.hadoop.hbase.client.MultiResponse multiResp = - ResponseConverter.getResults(req, resp, controller.cellScanner()); + org.apache.hadoop.hbase.client.MultiResponse multiResp = ResponseConverter + .getResults(req, resp, controller.cellScanner()); Throwable ex = multiResp.getException(regionName); if (ex != null) { future @@ -348,6 +352,20 @@ class AsyncTableImpl implements AsyncTable { .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call(); } + public void scan(Scan scan, ScanObserver scanObserver) { + if (scan.isSmall()) { + if (scan.getBatch() > 0 || scan.getAllowPartialResults()) { + scanObserver.onError( + new IllegalArgumentException("Batch and allowPartial is not allowed for small scan")); + } else { + LOG.warn("This is small scan " + scan + ", consider using smallScan directly?"); + } + } + scan = setDefaultScanConfig(scan); + new AsyncClientScanner(scan, scanObserver, tableName, conn, scanTimeoutNs, readRpcTimeoutNs) + .start(); + } + @Override public void setReadRpcTimeout(long timeout, TimeUnit unit) { this.readRpcTimeoutNs = unit.toNanos(timeout); @@ -377,4 +395,14 @@ class AsyncTableImpl implements AsyncTable { public long getOperationTimeout(TimeUnit unit) { return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); } + + @Override + public void setScanTimeout(long timeout, TimeUnit unit) { + this.scanTimeoutNs = unit.toNanos(timeout); + } + + @Override + public long getScanTimeout(TimeUnit unit) { + return TimeUnit.NANOSECONDS.convert(scanTimeoutNs, unit); + } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 00ff350..b7bdb83 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.util.Bytes; */ @InterfaceAudience.Private public abstract class ClientScanner extends AbstractClientScanner { + private static final Log LOG = LogFactory.getLog(ClientScanner.class); protected Scan scan; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteResultScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteResultScanResultCache.java new file mode 100644 index 0000000..a9ebe53 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteResultScanResultCache.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A scan result cache that only returns complete result. + */ +@InterfaceAudience.Private +class CompleteResultScanResultCache implements ScanResultCache { + + private final List partialResults = new ArrayList<>(); + + private Result combine() throws IOException { + Result result = Result.createCompleteResult(partialResults); + partialResults.clear(); + return result; + } + + private Result[] prependCombined(Result[] results, int length) throws IOException { + Result[] prependResults = new Result[length + 1]; + prependResults[0] = combine(); + System.arraycopy(results, 0, prependResults, 1, length); + return prependResults; + } + + @Override + public Result[] offerAndPoll(Result[] results, boolean isHeartbeatMessage) throws IOException { + // If no results were returned it indicates that either we have the all the partial results + // necessary to construct the complete result or the server had to send a heartbeat message + // to the client to keep the client-server connection alive + if (results.length == 0) { + // If this response was an empty heartbeat message, then we have not exhausted the region + // and thus there may be more partials server side that still need to be added to the partial + // list before we form the complete Result + if (!partialResults.isEmpty() && !isHeartbeatMessage) { + return new Result[] { combine() }; + } + return EMPTY_RESULT_ARRAY; + } + // In every RPC response there should be at most a single partial result. Furthermore, if + // there is a partial result, it is guaranteed to be in the last position of the array. + Result last = results[results.length - 1]; + if (last.isPartial()) { + if (partialResults.isEmpty()) { + partialResults.add(last); + return Arrays.copyOf(results, results.length - 1); + } + // We have only one result and it is partial + if (results.length == 1) { + // check if there is a row change + if (Bytes.equals(partialResults.get(0).getRow(), last.getRow())) { + partialResults.add(last); + return EMPTY_RESULT_ARRAY; + } + Result completeResult = combine(); + partialResults.add(last); + return new Result[] { completeResult }; + } + // We have some complete results + Result[] resultsToReturn = prependCombined(results, results.length - 1); + partialResults.add(last); + return resultsToReturn; + } + if (!partialResults.isEmpty()) { + return prependCombined(results, results.length - 1); + } + return results; + } + + @Override + public void clear() { + partialResults.clear(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 729f874..d464c3b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -24,11 +24,13 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Arrays; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,11 +39,14 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.ipc.RemoteException; /** * Utility used by client connections. @@ -264,4 +269,25 @@ public final class ConnectionUtils { static boolean isEmptyStopRow(byte[] row) { return Bytes.equals(row, EMPTY_END_ROW); } + + static void resetController(HBaseRpcController controller, long timeoutNs) { + controller.reset(); + if (timeoutNs >= 0) { + controller.setCallTimeout( + (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(timeoutNs))); + } + } + + static Throwable translateException(Throwable t) { + if (t instanceof UndeclaredThrowableException && t.getCause() != null) { + t = t.getCause(); + } + if (t instanceof RemoteException) { + t = ((RemoteException) t).unwrapRemoteException(); + } + if (t instanceof ServiceException && t.getCause() != null) { + t = translateException(t.getCause()); + } + return t; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanObserver.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanObserver.java new file mode 100644 index 0000000..a9f7287 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanObserver.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Result; + +/** + * Receives {@link Result} from an asynchronous scanner. + *

+ * Notice that, the {@link #onNext(Result[])} method will be called in the thread which we send + * request to HBase service. So if you want the asynchronous scanner fetch data from HBase in + * background while you process the returned data, you need to move the processing work to another + * thread to make the {@code onNext} call return immediately. And please do NOT do any time + * consuming tasks in all methods below unless you know what you are doing. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface ScanObserver { + + /** + * @param results the data fetched from HBase service. + * @return {@code false} if you want to stop the scanner process. Otherwise {@code true} + */ + boolean onNext(Result[] results); + + /** + * Indicate that there is an heartbeat message but we have not cumulated enough cells to call + * onNext. + *

+ * This method give you a chance to terminate a slow scan operation. + * @return {@code false} if you want to stop the scanner process. Otherwise {@code true} + */ + boolean onHeartbeat(); + + /** + * Indicate that we hit an unrecoverable error and the scan operation is terminated. + *

+ * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}. + */ + void onError(Throwable error); + + /** + * Indicate that the scan operation is completed normally. + */ + void onComplete(); +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java new file mode 100644 index 0000000..643733e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * + */ +@InterfaceAudience.Private +interface ScanResultCache { + + static final Result[] EMPTY_RESULT_ARRAY = new Result[0]; + + Result[] offerAndPoll(Result[] results, boolean isHeartbeatMessage) throws IOException; + + void clear(); +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java new file mode 100644 index 0000000..782b5ef --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import com.google.common.base.Throwables; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * + */ +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableScan { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] QUALIFIER = Bytes.toBytes("cq"); + + private static AsyncConnection ASYNC_CONN; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.startMiniCluster(3); + byte[][] splitKeys = new byte[8][]; + for (int i = 1; i < 9; i++) { + splitKeys[i - 1] = + Bytes.toBytes(new StringBuilder().append(i).append(i).append(i).toString()); + } + TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDown() throws Exception { + ASYNC_CONN.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + private static final class SimpleScanObserver implements ScanObserver { + + private final Queue queue = new ArrayDeque<>(); + + private boolean finished; + + private Throwable error; + + @Override + public synchronized boolean onNext(Result[] results) { + for (Result result : results) { + queue.offer(result); + } + notifyAll(); + return true; + } + + @Override + public boolean onHeartbeat() { + return true; + } + + @Override + public synchronized void onError(Throwable error) { + finished = true; + this.error = error; + notifyAll(); + } + + @Override + public synchronized void onComplete() { + finished = true; + notifyAll(); + } + + public synchronized Result take() throws IOException, InterruptedException { + for (;;) { + if (!queue.isEmpty()) { + return queue.poll(); + } + if (finished) { + if (error != null) { + Throwables.propagateIfPossible(error, IOException.class); + throw new IOException(error); + } else { + return null; + } + } + wait(); + } + } + } + + @Test + public void test() throws InterruptedException, ExecutionException, IOException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + int count = 1000; + List> futures = new ArrayList<>(); + IntStream.range(0, count) + .forEach(i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i))) + .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))))); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + SimpleScanObserver scanObserver = new SimpleScanObserver(); + table.scan(new Scan().setCaching(10), scanObserver); + for (int i = 0; i < count; i++) { + Result result = scanObserver.take(); + assertEquals(String.format("%03d", i), Bytes.toString(result.getRow())); + assertEquals(i, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); + } + assertNull(scanObserver.take()); + + scanObserver = new SimpleScanObserver(); + table.scan(new Scan().setCaching(10).setReversed(true), scanObserver); + for (int i = count - 1; i >= 0; i--) { + Result result = scanObserver.take(); + assertEquals(String.format("%03d", i), Bytes.toString(result.getRow())); + assertEquals(i, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); + } + assertNull(scanObserver.take()); + } +} -- 2.7.4