From 74d9fd54255bde09bd66fd83ccc275a075ec4315 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 21 Oct 2016 22:54:07 +0800 Subject: [PATCH] HBASE-16838 Implement scan --- .../hbase/client/AllowPartialScanResultCache.java | 94 ++++++ .../hadoop/hbase/client/AsyncClientScanner.java | 143 +++++++++ .../hbase/client/AsyncConnectionConfiguration.java | 41 ++- .../hadoop/hbase/client/AsyncRegionLocator.java | 23 ++ .../client/AsyncRpcRetryingCallerFactory.java | 93 +++++- .../hbase/client/AsyncScanRpcRetryingCaller.java | 343 +++++++++++++++++++++ .../AsyncSingleRequestRpcRetryingCaller.java | 55 +--- .../org/apache/hadoop/hbase/client/AsyncTable.java | 6 + .../apache/hadoop/hbase/client/AsyncTableImpl.java | 41 ++- .../apache/hadoop/hbase/client/ClientScanner.java | 93 ++---- .../hbase/client/ClientSmallReversedScanner.java | 9 +- .../client/CompleteResultScanResultCache.java | 97 ++++++ .../hadoop/hbase/client/ConnectionUtils.java | 54 ++++ .../hadoop/hbase/client/ReversedClientScanner.java | 4 +- .../apache/hadoop/hbase/client/ScanObserver.java | 55 ++++ .../hadoop/hbase/client/ScanResultCache.java | 35 +++ .../hbase/client/ScannerCallableWithReplicas.java | 16 +- .../hbase/client/TestAsyncTableNoncedRetry.java | 3 +- .../hadoop/hbase/client/TestAsyncTableScan.java | 159 ++++++++++ .../hadoop/hbase/client/TestFromClientSide.java | 2 +- 20 files changed, 1244 insertions(+), 122 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanRpcRetryingCaller.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteResultScanResultCache.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanObserver.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java new file mode 100644 index 0000000..8166a81 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * + */ +@InterfaceAudience.Private +class AllowPartialScanResultCache implements ScanResultCache { + + // used to filter out the cells that already returned to user as we always start from the + // beginning of a row when retry. + private Cell lastCell; + + private Result filterCells(Result result) { + if (lastCell == null) { + return result; + } + // not the same row + if (!Bytes.equals(lastCell.getRowArray(), lastCell.getRowOffset(), lastCell.getRowLength(), + result.getRow(), 0, result.getRow().length)) { + return result; + } + Cell[] rawCells = result.rawCells(); + int index = Arrays.binarySearch(rawCells, lastCell, CellComparator::compareWithoutRow); + if (index < 0) { + index = -index - 1; + } else { + index++; + } + if (index == 0) { + return result; + } + if (index == rawCells.length) { + return null; + } + return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), false, + result.isStale(), true); + } + + private void updateLastCell(Result result) { + lastCell = result.isPartial() ? result.rawCells()[result.rawCells().length - 1] : null; + } + + @Override + public Result[] offerAndPoll(Result[] results, boolean isHeartbeatMessage) throws IOException { + if (results.length == 0) { + return EMPTY_RESULT_ARRAY; + } + Result first = filterCells(results[0]); + if (results.length == 1) { + if (first == null) { + // do not update last cell if we filter out all cells + return EMPTY_RESULT_ARRAY; + } + updateLastCell(results[0]); + results[0] = first; + return results; + } + updateLastCell(results[results.length - 1]); + if (first == null) { + return Arrays.copyOfRange(results, 1, results.length); + } + results[0] = first; + return results; + } + + @Override + public void clear() { + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java new file mode 100644 index 0000000..8be5ed2 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; +import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; + +/** + * + */ +@InterfaceAudience.Private +class AsyncClientScanner { + + private final Scan originScan; + + private final ScanObserver scanObserver; + + private final TableName tableName; + + private final AsyncConnectionImpl conn; + + private final long scanTimeoutNs; + + private final long rpcTimeoutNs; + + private final ScanResultCache resultCache; + + public AsyncClientScanner(Scan scan, ScanObserver scanObserver, TableName tableName, + AsyncConnectionImpl conn, long scanTimeoutNs, long rpcTimeoutNs) { + this.originScan = scan; + if (scan.getStartRow() == null) { + scan.setStartRow(EMPTY_START_ROW); + } + if (scan.getStopRow() == null) { + scan.setStopRow(EMPTY_END_ROW); + } + this.scanObserver = scanObserver; + this.tableName = tableName; + this.conn = conn; + this.scanTimeoutNs = scanTimeoutNs; + this.rpcTimeoutNs = rpcTimeoutNs; + this.resultCache = scan.getAllowPartialResults() || scan.getBatch() > 0 + ? new AllowPartialScanResultCache() : new CompleteResultScanResultCache(); + } + + private static final class OpenScannerResponse { + + public final HRegionLocation loc; + + public final ClientService.Interface stub; + + public final long scannerId; + + public OpenScannerResponse(HRegionLocation loc, Interface stub, long scannerId) { + this.loc = loc; + this.stub = stub; + this.scannerId = scannerId; + } + } + + private CompletableFuture callOpenScanner(HBaseRpcController controller, + HRegionLocation loc, ClientService.Interface stub) { + CompletableFuture future = new CompletableFuture<>(); + try { + ScanRequest request = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), + originScan, 0, false); + stub.scan(controller, request, resp -> { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + return; + } + future.complete(new OpenScannerResponse(loc, stub, resp.getScannerId())); + }); + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + } + + private void startScan(OpenScannerResponse resp) { + conn.callerFactory.scan().id(resp.scannerId).location(resp.loc).stub(resp.stub) + .setScan(originScan).observer(scanObserver).resultCache(resultCache) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).start() + .whenComplete((scanResp, error) -> { + if (error != null) { + scanObserver.onError(error); + return; + } + if (scanResp == null) { + scanObserver.onComplete(); + } else { + openScanner(scanResp.getScan(), scanResp.isLocateToPreviousRegion()); + } + }); + } + + private void openScanner(Scan scan, boolean locateToPreviousRegion) { + conn.callerFactory. single().table(tableName).row(scan.getStartRow()) + .locateToPreviousRegion(locateToPreviousRegion) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::callOpenScanner).call() + .whenComplete((resp, error) -> { + if (error != null) { + scanObserver.onError(error); + return; + } + startScan(resp); + }); + } + + public void start() { + openScanner(originScan, originScan.isReversed() && originScan.getStartRow().length == 0); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index ba2e660..3a648f9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -20,11 +20,18 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; +import static org.apache.hadoop.hbase.HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY; @@ -34,6 +41,7 @@ import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.classification.InterfaceAudience; /** @@ -59,6 +67,13 @@ class AsyncConnectionConfiguration { /** How many retries are allowed before we start to log */ private final int startLogErrorsCnt; + private final long scanTimeoutNs; + + private final int scannerCaching; + + private final long scannerMaxResultSize; + + @SuppressWarnings("deprecation") AsyncConnectionConfiguration(Configuration conf) { this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); @@ -68,11 +83,18 @@ class AsyncConnectionConfiguration { conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT))); this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT))); - this.pauseNs = TimeUnit.MILLISECONDS - .toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE)); + this.pauseNs = + TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE)); this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, - DEFAULT_START_LOG_ERRORS_AFTER_COUNT); + this.startLogErrorsCnt = + conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); + this.scanTimeoutNs = TimeUnit.MILLISECONDS + .toNanos(HBaseConfiguration.getInt(conf, HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)); + this.scannerCaching = + conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING); + this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); } long getMetaOperationTimeoutNs() { @@ -103,4 +125,15 @@ class AsyncConnectionConfiguration { return startLogErrorsCnt; } + long getScanTimeoutNs() { + return scanTimeoutNs; + } + + int getScannerCaching() { + return scannerCaching; + } + + long getScannerMaxResultSize() { + return scannerMaxResultSize; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java index dc75ba6..9bed3d8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestNextRow; + import java.io.Closeable; import java.io.IOException; import java.util.concurrent.CompletableFuture; @@ -27,6 +29,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; /** * TODO: reimplement using aync connection when the scan logic is ready. The current implementation @@ -52,6 +55,26 @@ class AsyncRegionLocator implements Closeable { return future; } + CompletableFuture getPreviousRegionLocation(TableName tableName, + byte[] startRowOfCurrentRegion, boolean reload) { + CompletableFuture future = new CompletableFuture<>(); + byte[] toLocateRow = createClosestNextRow(startRowOfCurrentRegion, true); + try { + for (;;) { + HRegionLocation loc = conn.getRegionLocation(tableName, toLocateRow, reload); + byte[] endKey = loc.getRegionInfo().getEndKey(); + if (Bytes.equals(startRowOfCurrentRegion, endKey)) { + future.complete(loc); + break; + } + toLocateRow = endKey; + } + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + } + void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row, Object exception, ServerName source) { conn.updateCachedLocations(tableName, regionName, row, exception, source); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index c5ac9a5..7d82784 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -24,8 +24,10 @@ import io.netty.util.HashedWheelTimer; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; /** * Factory to create an AsyncRpcRetryCaller. @@ -48,6 +50,8 @@ class AsyncRpcRetryingCallerFactory { private byte[] row; + private boolean locateToPreviousRegion; + private AsyncSingleRequestRpcRetryingCaller.Callable callable; private long operationTimeoutNs = -1L; @@ -80,10 +84,15 @@ class AsyncRpcRetryingCallerFactory { return this; } + public SingleRequestCallerBuilder locateToPreviousRegion(boolean locateToPreviousRegion) { + this.locateToPreviousRegion = locateToPreviousRegion; + return this; + } + public AsyncSingleRequestRpcRetryingCaller build() { return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, Preconditions.checkNotNull(tableName, "tableName is null"), - Preconditions.checkNotNull(row, "row is null"), + Preconditions.checkNotNull(row, "row is null"), locateToPreviousRegion, Preconditions.checkNotNull(callable, "action is null"), conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt()); @@ -103,4 +112,86 @@ class AsyncRpcRetryingCallerFactory { public SingleRequestCallerBuilder single() { return new SingleRequestCallerBuilder<>(); } + + public class ScanRequestCallerBuilder { + + private long scannerId = -1L; + + private Scan scan; + + private ScanResultCache resultCache; + + private ScanObserver scanObserver; + + private ClientService.Interface stub; + + private HRegionLocation loc; + + private long scanTimeoutNs; + + private long rpcTimeoutNs; + + public ScanRequestCallerBuilder id(long scannerId) { + this.scannerId = scannerId; + return this; + } + + public ScanRequestCallerBuilder setScan(Scan scan) { + this.scan = scan; + return this; + } + + public ScanRequestCallerBuilder resultCache(ScanResultCache resultCache) { + this.resultCache = resultCache; + return this; + } + + public ScanRequestCallerBuilder observer(ScanObserver scanObserver) { + this.scanObserver = scanObserver; + return this; + } + + public ScanRequestCallerBuilder stub(ClientService.Interface stub) { + this.stub = stub; + return this; + } + + public ScanRequestCallerBuilder location(HRegionLocation loc) { + this.loc = loc; + return this; + } + + public ScanRequestCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) { + this.scanTimeoutNs = unit.toNanos(scanTimeout); + return this; + } + + public ScanRequestCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { + this.rpcTimeoutNs = unit.toNanos(rpcTimeout); + return this; + } + + public AsyncScanRpcRetryingCaller build() { + Preconditions.checkArgument(scannerId >= 0, "invalid scannerId %d", scannerId); + return new AsyncScanRpcRetryingCaller(retryTimer, conn, + Preconditions.checkNotNull(scan, "scan is null"), scannerId, + Preconditions.checkNotNull(resultCache, "resultCache is null"), + Preconditions.checkNotNull(scanObserver, "observer is null"), + Preconditions.checkNotNull(stub, "stub is null"), + Preconditions.checkNotNull(loc, "location is null"), conn.connConf.getPauseNs(), + conn.connConf.getMaxRetries(), scanTimeoutNs, rpcTimeoutNs, + conn.connConf.getStartLogErrorsCnt()); + } + + /** + * Short cut for {@code build().start()}. + */ + public CompletableFuture start() { + return build().start(); + } + } + + public ScanRequestCallerBuilder scan() { + return new ScanRequestCallerBuilder(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanRpcRetryingCaller.java new file mode 100644 index 0000000..34442c0 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanRpcRetryingCaller.java @@ -0,0 +1,343 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestNextRow; +import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; +import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; +import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; +import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; + +import io.netty.util.HashedWheelTimer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; +import org.apache.hadoop.hbase.exceptions.ScannerResetException; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * + */ +@InterfaceAudience.Private +class AsyncScanRpcRetryingCaller { + + private static final Log LOG = LogFactory.getLog(AsyncScanRpcRetryingCaller.class); + + private final HashedWheelTimer retryTimer; + + private final Scan scan; + + private final long scannerId; + + private final ScanResultCache resultCache; + + private final ScanObserver scanObserver; + + private final ClientService.Interface stub; + + private final HRegionLocation loc; + + private final long pauseNs; + + private final int maxAttempts; + + private final long scanTimeoutNs; + + private final long rpcTimeoutNs; + + private final int startLogErrorsCnt; + + public final class Response { + + private final Scan scan; + + private final boolean locateToPreviousRegion; + + public Response(Scan scan, boolean locateToPreviousRegion) { + this.scan = scan; + this.locateToPreviousRegion = locateToPreviousRegion; + } + + public Scan getScan() { + return scan; + } + + public boolean isLocateToPreviousRegion() { + return locateToPreviousRegion; + } + } + + private final CompletableFuture future; + + private final HBaseRpcController controller; + + private byte[] nextStartRowWhenError; + + private boolean includeNextStartRowWhenError; + + private long nextCallStartNs; + + private int tries = 1; + + private final List exceptions; + + private long nextCallSeq = -1L; + + public AsyncScanRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, + Scan scan, long scannerId, ScanResultCache resultCache, ScanObserver scanObserver, + Interface stub, HRegionLocation loc, long pauseNs, int maxRetries, long scanTimeoutNs, + long rpcTimeoutNs, int startLogErrorsCnt) { + this.retryTimer = retryTimer; + this.scan = scan; + this.scannerId = scannerId; + this.resultCache = resultCache; + this.scanObserver = scanObserver; + this.stub = stub; + this.loc = loc; + this.pauseNs = pauseNs; + this.maxAttempts = retries2Attempts(maxRetries); + this.scanTimeoutNs = scanTimeoutNs; + this.rpcTimeoutNs = rpcTimeoutNs; + this.startLogErrorsCnt = startLogErrorsCnt; + this.future = new CompletableFuture<>(); + this.controller = conn.rpcControllerFactory.newController(); + this.exceptions = new ArrayList<>(); + } + + private long elapsedMs() { + return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs); + } + + private void closeScanner() { + resetController(controller, rpcTimeoutNs); + ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false); + stub.scan(controller, req, resp -> { + if (controller.failed()) { + LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId + + " for " + loc.getRegionInfo().getEncodedName() + " of " + + loc.getRegionInfo().getTable() + " failed, ignore, probably already closed", + controller.getFailed()); + } + }); + } + + private void completeExceptionally(boolean closeScanner) { + resultCache.clear(); + if (closeScanner) { + closeScanner(); + } + future.completeExceptionally(new RetriesExhaustedException(tries, exceptions)); + } + + private void complete(boolean hasMoreResults, byte[] nextStartRow) { + // we do not clear resultCache here as it must be empty. + if (!hasMoreResults) { + future.complete(null); + } else { + future.complete(new Response(scan.setStartRow(nextStartRow), scan.isReversed())); + } + } + + private Scan getNextScanWhenError() { + if (nextStartRowWhenError == null) { + // no need to reset start row + return scan; + } + if (includeNextStartRowWhenError) { + return scan.setStartRow(nextStartRowWhenError); + } + return scan.setStartRow(createClosestNextRow(nextStartRowWhenError, scan.isReversed())); + } + + private void completeWhenError(boolean closeScanner) { + resultCache.clear(); + if (closeScanner) { + closeScanner(); + } + Scan nextScan = getNextScanWhenError(); + future.complete(new Response(nextScan, nextScan.isReversed() + && Bytes.equals(nextScan.getStartRow(), loc.getRegionInfo().getEndKey()))); + } + + private void onError(Throwable error) { + error = translateException(error); + if (tries > startLogErrorsCnt) { + LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " + + loc.getRegionInfo().getEncodedName() + " of " + loc.getRegionInfo().getTable() + + " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " + + TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() + + " ms", + error); + } + boolean scannerClosed = + error instanceof UnknownScannerException || error instanceof NotServingRegionException + || error instanceof RegionServerStoppedException; + RetriesExhaustedException.ThrowableWithExtraContext qt = + new RetriesExhaustedException.ThrowableWithExtraContext(error, + EnvironmentEdgeManager.currentTime(), ""); + exceptions.add(qt); + if (tries >= maxAttempts) { + completeExceptionally(!scannerClosed); + return; + } + long delayNs; + if (scanTimeoutNs > 0) { + long maxDelayNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs); + if (maxDelayNs <= 0) { + completeExceptionally(!scannerClosed); + return; + } + delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); + } else { + delayNs = getPauseTime(pauseNs, tries - 1); + } + if (scannerClosed) { + completeWhenError(false); + return; + } + if (error instanceof OutOfOrderScannerNextException || error instanceof ScannerResetException) { + completeWhenError(true); + return; + } + if (error instanceof DoNotRetryIOException) { + completeExceptionally(true); + return; + } + retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS); + } + + private void updateNextStartRowWhenError(Result result) { + nextStartRowWhenError = result.getRow(); + includeNextStartRowWhenError = result.isPartial(); + } + + private void onComplete(ScanResponse resp) { + if (controller.failed()) { + onError(controller.getFailed()); + return; + } + boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage(); + Result[] results; + try { + results = resultCache.offerAndPoll( + ResponseConverter.getResults(controller.cellScanner(), resp), isHeartbeatMessage); + } catch (IOException e) { + // We can not retry here. The server has responded normally and the call sequence has been + // increased so a new scan with the same call sequence will cause an + // OutOfOrderScannerNextException. Let the upper layer open a scanner. + LOG.warn("decode scan response failed", e); + completeWhenError(true); + return; + } + + boolean stopByUser; + if (results.length == 0) { + // if we have nothing to return then this must be a heartbeat message. + stopByUser = !scanObserver.onHeartbeat(); + } else { + updateNextStartRowWhenError(results[results.length - 1]); + stopByUser = !scanObserver.onNext(results); + } + if (resp.hasMoreResults() && !resp.getMoreResults()) { + // RS tells us there is no more data for the whole scan + complete(false, null); + return; + } + if (stopByUser) { + if (resp.getMoreResultsInRegion()) { + // we have more results in region but user request to stop the scan, so we need to close the + // scanner explicitly. + closeScanner(); + } + complete(false, null); + return; + } + // as in 2.0 this value will always be set + if (!resp.getMoreResultsInRegion()) { + if (scan.isReversed()) { + if (scan.getStopRow().length == 0) { + if (loc.getRegionInfo().getStartKey().length == 0) { + complete(false, null); + } + } else { + if (Bytes.compareTo(loc.getRegionInfo().getStartKey(), scan.getStopRow()) <= 0) { + complete(false, null); + } + } + complete(true, loc.getRegionInfo().getStartKey()); + } else { + if (scan.getStopRow().length == 0) { + if (loc.getRegionInfo().getEndKey().length == 0) { + complete(false, null); + } + } else { + if (Bytes.compareTo(loc.getRegionInfo().getEndKey(), scan.getStopRow()) >= 0) { + complete(false, null); + } + } + complete(true, loc.getRegionInfo().getEndKey()); + } + return; + } + next(); + } + + private void call() { + resetController(controller, rpcTimeoutNs); + ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false, + nextCallSeq, false, false); + stub.scan(controller, req, this::onComplete); + } + + private void next() { + nextCallSeq++; + tries = 0; + exceptions.clear(); + nextCallStartNs = System.nanoTime(); + call(); + } + + /** + * @return return the scan object for next open scanner call, or null if we should stop. + */ + public CompletableFuture start() { + next(); + return future; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 8acde94..0cfd071 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -18,14 +18,13 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; +import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; +import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import io.netty.util.HashedWheelTimer; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; import java.io.IOException; -import java.lang.reflect.UndeclaredThrowableException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -40,11 +39,9 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.ipc.RemoteException; /** * Retry caller for a single request, such as get, put, delete, etc. @@ -68,6 +65,8 @@ class AsyncSingleRequestRpcRetryingCaller { private final byte[] row; + private final boolean locatePreviousRegion; + private final Callable callable; private final long pauseNs; @@ -89,12 +88,14 @@ class AsyncSingleRequestRpcRetryingCaller { private final long startNs; public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, - TableName tableName, byte[] row, Callable callable, long pauseNs, int maxRetries, - long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + TableName tableName, byte[] row, boolean locatePreviousRegion, Callable callable, + long pauseNs, int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, + int startLogErrorsCnt) { this.retryTimer = retryTimer; this.conn = conn; this.tableName = tableName; this.row = row; + this.locatePreviousRegion = locatePreviousRegion; this.callable = callable; this.pauseNs = pauseNs; this.maxAttempts = retries2Attempts(maxRetries); @@ -113,19 +114,6 @@ class AsyncSingleRequestRpcRetryingCaller { return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); } - private static Throwable translateException(Throwable t) { - if (t instanceof UndeclaredThrowableException && t.getCause() != null) { - t = t.getCause(); - } - if (t instanceof RemoteException) { - t = ((RemoteException) t).unwrapRemoteException(); - } - if (t instanceof ServiceException && t.getCause() != null) { - t = translateException(t.getCause()); - } - return t; - } - private void completeExceptionally() { future.completeExceptionally(new RetriesExhaustedException(tries, exceptions)); } @@ -156,22 +144,7 @@ class AsyncSingleRequestRpcRetryingCaller { } updateCachedLocation.accept(error); tries++; - retryTimer.newTimeout(new TimerTask() { - - @Override - public void run(Timeout timeout) throws Exception { - // always restart from beginning. - locateThenCall(); - } - }, delayNs, TimeUnit.NANOSECONDS); - } - - private void resetController() { - controller.reset(); - if (rpcTimeoutNs >= 0) { - controller.setCallTimeout( - (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(rpcTimeoutNs))); - } + retryTimer.newTimeout(t -> locateThenCall(), delayNs, TimeUnit.NANOSECONDS); } private void call(HRegionLocation loc) { @@ -189,7 +162,7 @@ class AsyncSingleRequestRpcRetryingCaller { loc.getRegionInfo().getRegionName(), row, err, loc.getServerName())); return; } - resetController(); + resetController(controller, rpcTimeoutNs); callable.call(controller, loc, stub).whenComplete((result, error) -> { if (error != null) { onError(error, @@ -207,7 +180,13 @@ class AsyncSingleRequestRpcRetryingCaller { } private void locateThenCall() { - conn.getLocator().getRegionLocation(tableName, row, tries > 1).whenComplete((loc, error) -> { + CompletableFuture locateFuture; + if (locatePreviousRegion) { + locateFuture = conn.getLocator().getPreviousRegionLocation(tableName, row, tries > 1); + } else { + locateFuture = conn.getLocator().getRegionLocation(tableName, row, tries > 1); + } + locateFuture.whenComplete((loc, error) -> { if (error != null) { onError(error, () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed, tries = " diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index 4642746..c33fe81 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -98,6 +98,10 @@ public interface AsyncTable { */ long getOperationTimeout(TimeUnit unit); + void setScanTimeout(long timeout, TimeUnit unit); + + long getScanTimeout(TimeUnit unit); + /** * Test for the existence of columns in the table, as specified by the Get. *

@@ -312,4 +316,6 @@ public interface AsyncTable { */ CompletableFuture checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, byte[] value, RowMutations mutation); + + void scan(Scan scan, ScanObserver scanObserver); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index 77a5bbe..b6e0a78 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResp import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ReflectionUtils; /** * The implementation of AsyncTable. @@ -57,12 +58,18 @@ class AsyncTableImpl implements AsyncTable { private final TableName tableName; + private final int defaultScannerCaching; + + private final long defaultScannerMaxResultSize; + private long readRpcTimeoutNs; private long writeRpcTimeoutNs; private long operationTimeoutNs; + private long scanTimeoutNs; + public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) { this.conn = conn; this.tableName = tableName; @@ -70,6 +77,9 @@ class AsyncTableImpl implements AsyncTable { this.writeRpcTimeoutNs = conn.connConf.getWriteRpcTimeoutNs(); this.operationTimeoutNs = tableName.isSystemTable() ? conn.connConf.getMetaOperationTimeoutNs() : conn.connConf.getOperationTimeoutNs(); + this.scanTimeoutNs = conn.connConf.getScanTimeoutNs(); + this.defaultScannerCaching = conn.connConf.getScannerCaching(); + this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); } @Override @@ -256,8 +266,8 @@ class AsyncTableImpl implements AsyncTable { future.completeExceptionally(controller.getFailed()); } else { try { - org.apache.hadoop.hbase.client.MultiResponse multiResp = ResponseConverter - .getResults(req, resp, controller.cellScanner()); + org.apache.hadoop.hbase.client.MultiResponse multiResp = + ResponseConverter.getResults(req, resp, controller.cellScanner()); Throwable ex = multiResp.getException(regionName); if (ex != null) { future @@ -305,6 +315,23 @@ class AsyncTableImpl implements AsyncTable { .call(); } + public void scan(Scan scan, ScanObserver scanObserver) { + if (scan.getBatch() > 0 && scan.isSmall()) { + scanObserver + .onError(new IllegalArgumentException("Small scan should not be used with batching")); + } + // as we may change the scan object later. + scan = ReflectionUtils.newInstance(scan.getClass(), scan); + if (scan.getCaching() <= 0) { + scan.setCaching(defaultScannerCaching); + } + if (scan.getMaxResultSize() <= 0) { + scan.setMaxResultSize(defaultScannerMaxResultSize); + } + new AsyncClientScanner(scan, scanObserver, tableName, conn, scanTimeoutNs, readRpcTimeoutNs) + .start(); + } + @Override public void setReadRpcTimeout(long timeout, TimeUnit unit) { this.readRpcTimeoutNs = unit.toNanos(timeout); @@ -334,4 +361,14 @@ class AsyncTableImpl implements AsyncTable { public long getOperationTimeout(TimeUnit unit) { return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); } + + @Override + public void setScanTimeout(long timeout, TimeUnit unit) { + this.scanTimeoutNs = unit.toNanos(timeout); + } + + @Override + public long getScanTimeout(TimeUnit unit) { + return TimeUnit.NANOSECONDS.convert(scanTimeoutNs, unit); + } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 371a68a..9c8871f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -17,7 +17,19 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestNextRow; + import com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ExecutorService; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -35,20 +47,11 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos; -import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.Bytes; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ExecutorService; - /** * Implements the scanner interface for the HBase client. * If there are multiple regions in a table, this scanner will iterate @@ -57,9 +60,7 @@ import java.util.concurrent.ExecutorService; @InterfaceAudience.Private public abstract class ClientScanner extends AbstractClientScanner { private static final Log LOG = LogFactory.getLog(ClientScanner.class); - // A byte array in which all elements are the max byte, and it is used to - // construct closest front row - static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); + protected Scan scan; protected boolean closed = false; // Current region scanner is against. Gets cleared if current region goes @@ -443,12 +444,8 @@ public abstract class ClientScanner extends AbstractClientScanner { // Reset the startRow to the row we've seen last so that the new scanner starts at // the correct row. Otherwise we may see previously returned rows again. // (ScannerCallable by now has "relocated" the correct region) - if (!this.lastResult.isPartial() && scan.getBatch() < 0 ) { - if (scan.isReversed()) { - scan.setStartRow(createClosestRowBefore(lastResult.getRow())); - } else { - scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1])); - } + if (!this.lastResult.isPartial() && scan.getBatch() < 0) { + scan.setStartRow(createClosestNextRow(lastResult.getRow(), scan.isReversed())); } else { // we need rescan this row because we only loaded partial row before scan.setStartRow(lastResult.getRow()); @@ -737,49 +734,27 @@ public abstract class ClientScanner extends AbstractClientScanner { } } - @Override - public void close() { - if (!scanMetricsPublished) writeScanMetrics(); - if (callable != null) { - callable.setClose(); - try { - call(callable, caller, scannerTimeout); - } catch (UnknownScannerException e) { - // We used to catch this error, interpret, and rethrow. However, we - // have since decided that it's not nice for a scanner's close to - // throw exceptions. Chances are it was just due to lease time out. - if (LOG.isDebugEnabled()) { - LOG.debug("scanner failed to close", e); - } - } catch (IOException e) { - /* An exception other than UnknownScanner is unexpected. */ - LOG.warn("scanner failed to close.", e); + @Override + public void close() { + if (!scanMetricsPublished) writeScanMetrics(); + if (callable != null) { + callable.setClose(); + try { + call(callable, caller, scannerTimeout); + } catch (UnknownScannerException e) { + // We used to catch this error, interpret, and rethrow. However, we + // have since decided that it's not nice for a scanner's close to + // throw exceptions. Chances are it was just due to lease time out. + if (LOG.isDebugEnabled()) { + LOG.debug("scanner failed to close", e); } - callable = null; + } catch (IOException e) { + /* An exception other than UnknownScanner is unexpected. */ + LOG.warn("scanner failed to close.", e); } - closed = true; - } - - /** - * Create the closest row before the specified row - * @param row - * @return a new byte array which is the closest front row of the specified one - */ - protected static byte[] createClosestRowBefore(byte[] row) { - if (row == null) { - throw new IllegalArgumentException("The passed row is empty"); - } - if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) { - return MAX_BYTE_ARRAY; - } - if (row[row.length - 1] == 0) { - return Arrays.copyOf(row, row.length - 1); - } else { - byte[] closestFrontRow = Arrays.copyOf(row, row.length); - closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1); - closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY); - return closestFrontRow; + callable = null; } + closed = true; } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java index 5fac93a..269bb00 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java @@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestNextRow; + +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.concurrent.ExecutorService; @@ -36,8 +39,6 @@ import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFac import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; -import com.google.common.annotations.VisibleForTesting; - /** *

* Client scanner for small reversed scan. Generally, only one RPC is called to fetch the @@ -149,13 +150,13 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { return false; } // We take the row just under to get to the previous region. - localStartKey = createClosestRowBefore(startKey); + localStartKey = createClosestNextRow(startKey, true); if (LOG.isDebugEnabled()) { LOG.debug("Finished with region " + this.currentRegion); } } else if (this.lastResult != null) { regionChanged = false; - localStartKey = createClosestRowBefore(lastResult.getRow()); + localStartKey = createClosestNextRow(lastResult.getRow(), true); } else { localStartKey = this.scan.getStartRow(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteResultScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteResultScanResultCache.java new file mode 100644 index 0000000..a9ebe53 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteResultScanResultCache.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A scan result cache that only returns complete result. + */ +@InterfaceAudience.Private +class CompleteResultScanResultCache implements ScanResultCache { + + private final List partialResults = new ArrayList<>(); + + private Result combine() throws IOException { + Result result = Result.createCompleteResult(partialResults); + partialResults.clear(); + return result; + } + + private Result[] prependCombined(Result[] results, int length) throws IOException { + Result[] prependResults = new Result[length + 1]; + prependResults[0] = combine(); + System.arraycopy(results, 0, prependResults, 1, length); + return prependResults; + } + + @Override + public Result[] offerAndPoll(Result[] results, boolean isHeartbeatMessage) throws IOException { + // If no results were returned it indicates that either we have the all the partial results + // necessary to construct the complete result or the server had to send a heartbeat message + // to the client to keep the client-server connection alive + if (results.length == 0) { + // If this response was an empty heartbeat message, then we have not exhausted the region + // and thus there may be more partials server side that still need to be added to the partial + // list before we form the complete Result + if (!partialResults.isEmpty() && !isHeartbeatMessage) { + return new Result[] { combine() }; + } + return EMPTY_RESULT_ARRAY; + } + // In every RPC response there should be at most a single partial result. Furthermore, if + // there is a partial result, it is guaranteed to be in the last position of the array. + Result last = results[results.length - 1]; + if (last.isPartial()) { + if (partialResults.isEmpty()) { + partialResults.add(last); + return Arrays.copyOf(results, results.length - 1); + } + // We have only one result and it is partial + if (results.length == 1) { + // check if there is a row change + if (Bytes.equals(partialResults.get(0).getRow(), last.getRow())) { + partialResults.add(last); + return EMPTY_RESULT_ARRAY; + } + Result completeResult = combine(); + partialResults.add(last); + return new Result[] { completeResult }; + } + // We have some complete results + Result[] resultsToReturn = prependCombined(results, results.length - 1); + partialResults.add(last); + return resultsToReturn; + } + if (!partialResults.isEmpty()) { + return prependCombined(results, results.length - 1); + } + return results; + } + + @Override + public void clear() { + partialResults.clear(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index eca9ad8..f7c4ae9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -21,10 +21,13 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Arrays; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,8 +36,12 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; @@ -222,4 +229,51 @@ public final class ConnectionUtils { return HConstants.NO_NONCE; } }; + + static void resetController(HBaseRpcController controller, long timeoutNs) { + controller.reset(); + if (timeoutNs >= 0) { + controller.setCallTimeout( + (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(timeoutNs))); + } + } + + static Throwable translateException(Throwable t) { + if (t instanceof UndeclaredThrowableException && t.getCause() != null) { + t = t.getCause(); + } + if (t instanceof RemoteException) { + t = ((RemoteException) t).unwrapRemoteException(); + } + if (t instanceof ServiceException && t.getCause() != null) { + t = translateException(t.getCause()); + } + return t; + } + + // A byte array in which all elements are the max byte, and it is used to + // construct closest front row + static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); + + /** + * Create the closest row next to the specified row + */ + static byte[] createClosestNextRow(byte[] row, boolean isReversed) { + if (isReversed) { + if (row.length == 0) { + return MAX_BYTE_ARRAY; + } + if (row[row.length - 1] == 0) { + return Arrays.copyOf(row, row.length - 1); + } else { + byte[] nextRow = new byte[row.length + MAX_BYTE_ARRAY.length]; + System.arraycopy(row, 0, nextRow, 0, row.length - 1); + nextRow[row.length - 1] = (byte) ((row[row.length - 1] & 0xFF) - 1); + System.arraycopy(nextRow, row.length, MAX_BYTE_ARRAY, 0, MAX_BYTE_ARRAY.length); + return nextRow; + } + } else { + return Arrays.copyOf(row, row.length + 1); + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index dde82ba..b482a3e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestNextRow; + import java.io.IOException; import java.util.concurrent.ExecutorService; @@ -108,7 +110,7 @@ public class ReversedClientScanner extends ClientSimpleScanner { // current region, thus the last one of located regions should be the // previous region of current region. The related logic of locating // regions is implemented in ReversedScannerCallable - byte[] locateStartRow = locateTheClosestFrontRow ? createClosestRowBefore(localStartKey) + byte[] locateStartRow = locateTheClosestFrontRow ? createClosestNextRow(localStartKey, true) : null; callable = getScannerCallable(localStartKey, nbRows, locateStartRow); // Open a scanner on the region server starting at the diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanObserver.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanObserver.java new file mode 100644 index 0000000..e1cd78e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanObserver.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Result; + +/** + * Receives {@link Result} from an asynchronous scanner. + *

+ * Notice that, the {@link #onNext(Result[])} method will be called in the thread which we send + * request to HBase service. So if you want the asynchronous scanner fetch data from HBase in + * background while you process the returned data, you need to move the processing work to another + * thread to make the {@code onNext} call return immediately. And please do NOT do any time + * consuming tasks in all methods below unless you know what you are doing. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface ScanObserver { + + /** + * @param results the data fetched from HBase service. + * @return {@code false} if you want to stop the scanner process. Otherwise {@code true} + */ + boolean onNext(Result[] results); + + /** + * Indicate that there is an heartbeat message but we have not cumulated enough cells to call + * onNext. + *

+ * This method give you a chance to terminate a slow scan operation. + * @return {@code false} if you want to stop the scanner process. Otherwise {@code true} + */ + boolean onHeartbeat(); + + void onError(Throwable error); + + void onComplete(); +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java new file mode 100644 index 0000000..643733e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * + */ +@InterfaceAudience.Private +interface ScanResultCache { + + static final Result[] EMPTY_RESULT_ARRAY = new Result[0]; + + Result[] offerAndPoll(Result[] results, boolean isHeartbeatMessage) throws IOException; + + void clear(); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 5174598..93e4c73 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -18,7 +18,9 @@ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefore; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestNextRow; + +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.InterruptedIOException; @@ -30,7 +32,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -40,12 +41,8 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; -import com.google.common.annotations.VisibleForTesting; - /** * This class has the logic for handling scanners for regions with and without replicas. * 1. A scan is attempted on the default (primary) region @@ -338,11 +335,8 @@ class ScannerCallableWithReplicas implements RetryingCallable { // The last result was not a partial result which means it contained all of the cells for // that row (we no longer need any information from it). Set the start row to the next // closest row that could be seen. - if (callable.getScan().isReversed()) { - callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow())); - } else { - callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1])); - } + callable.getScan().setStartRow( + createClosestNextRow(this.lastResult.getRow(), callable.getScan().isReversed())); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java index 8fc0f60..840f844 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java @@ -17,7 +17,8 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; import java.io.IOException; import java.util.concurrent.ExecutionException; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java new file mode 100644 index 0000000..782b5ef --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import com.google.common.base.Throwables; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * + */ +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableScan { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] QUALIFIER = Bytes.toBytes("cq"); + + private static AsyncConnection ASYNC_CONN; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.startMiniCluster(3); + byte[][] splitKeys = new byte[8][]; + for (int i = 1; i < 9; i++) { + splitKeys[i - 1] = + Bytes.toBytes(new StringBuilder().append(i).append(i).append(i).toString()); + } + TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDown() throws Exception { + ASYNC_CONN.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + private static final class SimpleScanObserver implements ScanObserver { + + private final Queue queue = new ArrayDeque<>(); + + private boolean finished; + + private Throwable error; + + @Override + public synchronized boolean onNext(Result[] results) { + for (Result result : results) { + queue.offer(result); + } + notifyAll(); + return true; + } + + @Override + public boolean onHeartbeat() { + return true; + } + + @Override + public synchronized void onError(Throwable error) { + finished = true; + this.error = error; + notifyAll(); + } + + @Override + public synchronized void onComplete() { + finished = true; + notifyAll(); + } + + public synchronized Result take() throws IOException, InterruptedException { + for (;;) { + if (!queue.isEmpty()) { + return queue.poll(); + } + if (finished) { + if (error != null) { + Throwables.propagateIfPossible(error, IOException.class); + throw new IOException(error); + } else { + return null; + } + } + wait(); + } + } + } + + @Test + public void test() throws InterruptedException, ExecutionException, IOException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + int count = 1000; + List> futures = new ArrayList<>(); + IntStream.range(0, count) + .forEach(i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i))) + .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))))); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + SimpleScanObserver scanObserver = new SimpleScanObserver(); + table.scan(new Scan().setCaching(10), scanObserver); + for (int i = 0; i < count; i++) { + Result result = scanObserver.take(); + assertEquals(String.format("%03d", i), Bytes.toString(result.getRow())); + assertEquals(i, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); + } + assertNull(scanObserver.take()); + + scanObserver = new SimpleScanObserver(); + table.scan(new Scan().setCaching(10).setReversed(true), scanObserver); + for (int i = count - 1; i >= 0; i--) { + Result result = scanObserver.take(); + assertEquals(String.format("%03d", i), Bytes.toString(result.getRow())); + assertEquals(i, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); + } + assertNull(scanObserver.take()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 89841a9..28f4ece 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -5933,7 +5933,7 @@ public class TestFromClientSide { public void testReversedScanUnderMultiRegions() throws Exception { // Test Initialization. TableName TABLE = TableName.valueOf("testReversedScanUnderMultiRegions"); - byte[] maxByteArray = ReversedClientScanner.MAX_BYTE_ARRAY; + byte[] maxByteArray = ConnectionUtils.MAX_BYTE_ARRAY; byte[][] splitRows = new byte[][] { Bytes.toBytes("005"), Bytes.add(Bytes.toBytes("005"), Bytes.multiple(maxByteArray, 16)), Bytes.toBytes("006"), -- 1.9.1