From 11a017cd03ec46a3c9b5764bd6dd476fdc7a5bcb Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 8 Nov 2016 15:57:38 +0800 Subject: [PATCH] HBASE-16838 Implement basic scan --- .../hbase/client/AllowPartialScanResultCache.java | 98 ++++++ .../hadoop/hbase/client/AsyncClientScanner.java | 147 ++++++++ .../hbase/client/AsyncConnectionConfiguration.java | 9 +- .../hadoop/hbase/client/AsyncConnectionImpl.java | 2 +- .../hadoop/hbase/client/AsyncRegistryFactory.java | 43 +++ .../client/AsyncRpcRetryingCallerFactory.java | 86 +++++ .../client/AsyncScanRegionRpcRetryingCaller.java | 374 +++++++++++++++++++++ .../AsyncSingleRequestRpcRetryingCaller.java | 39 +-- .../org/apache/hadoop/hbase/client/AsyncTable.java | 37 ++ .../apache/hadoop/hbase/client/AsyncTableImpl.java | 28 ++ .../apache/hadoop/hbase/client/ClientScanner.java | 1 + .../hbase/client/ClusterRegistryFactory.java | 43 --- .../client/CompleteResultScanResultCache.java | 97 ++++++ .../hadoop/hbase/client/ConnectionUtils.java | 26 ++ .../apache/hadoop/hbase/client/ScanConsumer.java | 63 ++++ .../hadoop/hbase/client/ScanResultCache.java | 53 +++ .../hbase/client/AbstractTestAsyncTableScan.java | 155 +++++++++ .../client/TestAllowPartialScanResultCache.java | 92 +++++ .../hadoop/hbase/client/TestAsyncTableScan.java | 147 ++++++++ .../hbase/client/TestAsyncTableSmallScan.java | 164 +-------- .../client/TestCompleteResultScanResultCache.java | 159 +++++++++ 21 files changed, 1630 insertions(+), 233 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanRegionRpcRetryingCaller.java delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteResultScanResultCache.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanConsumer.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java new file mode 100644 index 0000000..bc6e44e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A ScanResultCache that may return partial result. + *

+ * As we can only scan from the starting of a row when error, so here we also implement the logic + * that skips the cells that have already been returned. + */ +@InterfaceAudience.Private +class AllowPartialScanResultCache implements ScanResultCache { + + // used to filter out the cells that already returned to user as we always start from the + // beginning of a row when retry. + private Cell lastCell; + + private Result filterCells(Result result) { + if (lastCell == null) { + return result; + } + + // not the same row + if (!CellUtil.matchingRow(lastCell, result.getRow(), 0, result.getRow().length)) { + return result; + } + Cell[] rawCells = result.rawCells(); + int index = Arrays.binarySearch(rawCells, lastCell, CellComparator::compareWithoutRow); + if (index < 0) { + index = -index - 1; + } else { + index++; + } + if (index == 0) { + return result; + } + if (index == rawCells.length) { + return null; + } + return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null, + result.isStale(), true); + } + + private void updateLastCell(Result result) { + lastCell = result.isPartial() ? result.rawCells()[result.rawCells().length - 1] : null; + } + + @Override + public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException { + if (results.length == 0) { + return EMPTY_RESULT_ARRAY; + } + Result first = filterCells(results[0]); + if (results.length == 1) { + if (first == null) { + // do not update last cell if we filter out all cells + return EMPTY_RESULT_ARRAY; + } + updateLastCell(results[0]); + results[0] = first; + return results; + } + updateLastCell(results[results.length - 1]); + if (first == null) { + return Arrays.copyOfRange(results, 1, results.length); + } + results[0] = first; + return results; + } + + @Override + public void clear() { + // we do not cache anything + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java new file mode 100644 index 0000000..c955972 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; +import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; + +/** + * The asynchronous client scanner implementation. + *

+ * Here we will call OpenScanner first and use the returned scannerId to create a + * {@link AsyncScanRegionRpcRetryingCaller} to do the real scan operation. The return value of + * {@link AsyncScanRegionRpcRetryingCaller} will tell us whether open a new scanner or finish scan. + */ +@InterfaceAudience.Private +class AsyncClientScanner { + + private final Scan originalScan; + + private final ScanConsumer consumer; + + private final TableName tableName; + + private final AsyncConnectionImpl conn; + + private final long scanTimeoutNs; + + private final long rpcTimeoutNs; + + private final ScanResultCache resultCache; + + public AsyncClientScanner(Scan scan, ScanConsumer consumer, TableName tableName, + AsyncConnectionImpl conn, long scanTimeoutNs, long rpcTimeoutNs) { + this.originalScan = scan; + if (scan.getStartRow() == null) { + scan.setStartRow(EMPTY_START_ROW); + } + if (scan.getStopRow() == null) { + scan.setStopRow(EMPTY_END_ROW); + } + this.consumer = consumer; + this.tableName = tableName; + this.conn = conn; + this.scanTimeoutNs = scanTimeoutNs; + this.rpcTimeoutNs = rpcTimeoutNs; + this.resultCache = scan.getAllowPartialResults() || scan.getBatch() > 0 + ? new AllowPartialScanResultCache() : new CompleteResultScanResultCache(); + } + + private static final class OpenScannerResponse { + + public final HRegionLocation loc; + + public final ClientService.Interface stub; + + public final long scannerId; + + public OpenScannerResponse(HRegionLocation loc, Interface stub, long scannerId) { + this.loc = loc; + this.stub = stub; + this.scannerId = scannerId; + } + } + + private CompletableFuture callOpenScanner(HBaseRpcController controller, + HRegionLocation loc, ClientService.Interface stub) { + CompletableFuture future = new CompletableFuture<>(); + try { + ScanRequest request = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), + originalScan, 0, false); + stub.scan(controller, request, resp -> { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + return; + } + future.complete(new OpenScannerResponse(loc, stub, resp.getScannerId())); + }); + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + } + + private void startScan(OpenScannerResponse resp) { + conn.callerFactory.scanRegion().id(resp.scannerId).location(resp.loc).stub(resp.stub) + .setScan(originalScan).consumer(consumer).resultCache(resultCache) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).start() + .whenComplete((scanResp, error) -> { + if (error != null) { + consumer.onError(error); + return; + } + if (scanResp == null) { + consumer.onComplete(); + } else { + openScanner(scanResp.getScan(), scanResp.isLocateToPreviousRegion()); + } + }); + } + + private void openScanner(Scan scan, boolean locateToPreviousRegion) { + conn.callerFactory. single().table(tableName).row(scan.getStartRow()) + .locateToPreviousRegion(locateToPreviousRegion) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::callOpenScanner).call() + .whenComplete((resp, error) -> { + if (error != null) { + consumer.onError(error); + return; + } + startScan(resp); + }); + } + + public void start() { + openScanner(originalScan, originalScan.isReversed() && originalScan.getStartRow().length == 0); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index aaac845..971dd78 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -52,6 +52,8 @@ class AsyncConnectionConfiguration { private final long metaOperationTimeoutNs; + // timeout for a whole operation such as get, put or delete. Notice that scan will not be effected + // by this value, see scanTimeoutNs. private final long operationTimeoutNs; // timeout for each read rpc request @@ -67,6 +69,7 @@ class AsyncConnectionConfiguration { /** How many retries are allowed before we start to log */ private final int startLogErrorsCnt; + // timeout for each single operation when scanning, such as openScanner and next. private final long scanTimeoutNs; private final int scannerCaching; @@ -125,15 +128,15 @@ class AsyncConnectionConfiguration { return startLogErrorsCnt; } - public long getScanTimeoutNs() { + long getScanTimeoutNs() { return scanTimeoutNs; } - public int getScannerCaching() { + int getScannerCaching() { return scannerCaching; } - public long getScannerMaxResultSize() { + long getScannerMaxResultSize() { return scannerMaxResultSize; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 6cad6a2..70e024e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -92,7 +92,7 @@ class AsyncConnectionImpl implements AsyncConnection { this.user = user; this.connConf = new AsyncConnectionConfiguration(conf); this.locator = new AsyncRegionLocator(this); - this.registry = ClusterRegistryFactory.getRegistry(conf); + this.registry = AsyncRegistryFactory.getRegistry(conf); this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> { if (LOG.isDebugEnabled()) { LOG.debug("cluster id came back null, using default " + CLUSTER_ID_DEFAULT); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java new file mode 100644 index 0000000..2fc3322 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +/** + * Get instance of configured Registry. + */ +@InterfaceAudience.Private +final class AsyncRegistryFactory { + + static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl"; + + private AsyncRegistryFactory() { + } + + /** + * @return The cluster registry implementation to use. + */ + static AsyncRegistry getRegistry(Configuration conf) { + Class clazz = + conf.getClass(REGISTRY_IMPL_CONF_KEY, ZKAsyncRegistry.class, AsyncRegistry.class); + return ReflectionUtils.newInstance(clazz, conf); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 0d23c39..e519a9d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -26,8 +26,10 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; /** * Factory to create an AsyncRpcRetryCaller. @@ -171,4 +173,88 @@ class AsyncRpcRetryingCallerFactory { public SmallScanCallerBuilder smallScan() { return new SmallScanCallerBuilder(); } + + public class ScanRegionCallerBuilder { + + private long scannerId = -1L; + + private Scan scan; + + private ScanResultCache resultCache; + + private ScanConsumer consumer; + + private ClientService.Interface stub; + + private HRegionLocation loc; + + private long scanTimeoutNs; + + private long rpcTimeoutNs; + + public ScanRegionCallerBuilder id(long scannerId) { + this.scannerId = scannerId; + return this; + } + + public ScanRegionCallerBuilder setScan(Scan scan) { + this.scan = scan; + return this; + } + + public ScanRegionCallerBuilder resultCache(ScanResultCache resultCache) { + this.resultCache = resultCache; + return this; + } + + public ScanRegionCallerBuilder consumer(ScanConsumer consumer) { + this.consumer = consumer; + return this; + } + + public ScanRegionCallerBuilder stub(ClientService.Interface stub) { + this.stub = stub; + return this; + } + + public ScanRegionCallerBuilder location(HRegionLocation loc) { + this.loc = loc; + return this; + } + + public ScanRegionCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) { + this.scanTimeoutNs = unit.toNanos(scanTimeout); + return this; + } + + public ScanRegionCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { + this.rpcTimeoutNs = unit.toNanos(rpcTimeout); + return this; + } + + public AsyncScanRegionRpcRetryingCaller build() { + checkArgument(scannerId >= 0, "invalid scannerId %d", scannerId); + return new AsyncScanRegionRpcRetryingCaller(retryTimer, conn, + checkNotNull(scan, "scan is null"), scannerId, + checkNotNull(resultCache, "resultCache is null"), + checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"), + checkNotNull(loc, "location is null"), conn.connConf.getPauseNs(), + conn.connConf.getMaxRetries(), scanTimeoutNs, rpcTimeoutNs, + conn.connConf.getStartLogErrorsCnt()); + } + + /** + * Short cut for {@code build().start()}. + */ + public CompletableFuture start() { + return build().start(); + } + } + + /** + * Create retry caller for scanning a region. + */ + public ScanRegionCallerBuilder scanRegion() { + return new ScanRegionCallerBuilder(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanRegionRpcRetryingCaller.java new file mode 100644 index 0000000..5c78f95 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanRegionRpcRetryingCaller.java @@ -0,0 +1,374 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.*; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; +import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; +import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; +import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; +import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; + +import io.netty.util.HashedWheelTimer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; +import org.apache.hadoop.hbase.exceptions.ScannerResetException; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * Retry caller for scanning a region. + */ +@InterfaceAudience.Private +class AsyncScanRegionRpcRetryingCaller { + + private static final Log LOG = LogFactory.getLog(AsyncScanRegionRpcRetryingCaller.class); + + private final HashedWheelTimer retryTimer; + + private final Scan scan; + + private final long scannerId; + + private final ScanResultCache resultCache; + + private final ScanConsumer consumer; + + private final ClientService.Interface stub; + + private final HRegionLocation loc; + + private final long pauseNs; + + private final int maxAttempts; + + private final long scanTimeoutNs; + + private final long rpcTimeoutNs; + + private final int startLogErrorsCnt; + + private final Supplier createNextStartRowWhenError; + + private final Runnable completeWhenNoMoreResultsInRegion; + + public static final class Response { + + private final Scan scan; + + private final boolean locateToPreviousRegion; + + public Response(Scan scan, boolean locateToPreviousRegion) { + this.scan = scan; + this.locateToPreviousRegion = locateToPreviousRegion; + } + + public Scan getScan() { + return scan; + } + + public boolean isLocateToPreviousRegion() { + return locateToPreviousRegion; + } + } + + private final CompletableFuture future; + + private final HBaseRpcController controller; + + private byte[] nextStartRowWhenError; + + private boolean includeNextStartRowWhenError; + + private long nextCallStartNs; + + private int tries = 1; + + private final List exceptions; + + private long nextCallSeq = -1L; + + public AsyncScanRegionRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, + Scan scan, long scannerId, ScanResultCache resultCache, ScanConsumer consumer, Interface stub, + HRegionLocation loc, long pauseNs, int maxRetries, long scanTimeoutNs, long rpcTimeoutNs, + int startLogErrorsCnt) { + this.retryTimer = retryTimer; + this.scan = scan; + this.scannerId = scannerId; + this.resultCache = resultCache; + this.consumer = consumer; + this.stub = stub; + this.loc = loc; + this.pauseNs = pauseNs; + this.maxAttempts = retries2Attempts(maxRetries); + this.scanTimeoutNs = scanTimeoutNs; + this.rpcTimeoutNs = rpcTimeoutNs; + this.startLogErrorsCnt = startLogErrorsCnt; + if (scan.isReversed()) { + createNextStartRowWhenError = this::createReversedNextStartRowWhenError; + completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion; + } else { + createNextStartRowWhenError = this::createNextStartRowWhenError; + completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion; + } + this.future = new CompletableFuture<>(); + this.controller = conn.rpcControllerFactory.newController(); + this.exceptions = new ArrayList<>(); + } + + private long elapsedMs() { + return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs); + } + + private void closeScanner() { + resetController(controller, rpcTimeoutNs); + ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false); + stub.scan(controller, req, resp -> { + if (controller.failed()) { + LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId + + " for " + loc.getRegionInfo().getEncodedName() + " of " + + loc.getRegionInfo().getTable() + " failed, ignore, probably already closed", + controller.getFailed()); + } + }); + } + + private void completeExceptionally(boolean closeScanner) { + resultCache.clear(); + if (closeScanner) { + closeScanner(); + } + future.completeExceptionally(new RetriesExhaustedException(tries, exceptions)); + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NONNULL_PARAM_VIOLATION", + justification = "https://github.com/findbugsproject/findbugs/issues/79") + private void completeNoMoreResults() { + future.complete(null); + } + + private void completeWithNextStartRow(byte[] nextStartRow) { + future.complete(new Response(scan.setStartRow(nextStartRow), scan.isReversed())); + } + + private byte[] createNextStartRowWhenError() { + return createClosestRowAfter(nextStartRowWhenError); + } + + private byte[] createReversedNextStartRowWhenError() { + return createClosestRowBefore(nextStartRowWhenError); + } + + private Scan getNextScanWhenError() { + if (nextStartRowWhenError == null) { + // no need to reset start row + return scan; + } + if (includeNextStartRowWhenError) { + return scan.setStartRow(nextStartRowWhenError); + } + return scan.setStartRow(createNextStartRowWhenError.get()); + } + + private void completeWhenError(boolean closeScanner) { + resultCache.clear(); + if (closeScanner) { + closeScanner(); + } + Scan nextScan = getNextScanWhenError(); + future.complete(new Response(nextScan, nextScan.isReversed() + && Bytes.equals(nextScan.getStartRow(), loc.getRegionInfo().getEndKey()))); + } + + private void onError(Throwable error) { + error = translateException(error); + if (tries > startLogErrorsCnt) { + LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " + + loc.getRegionInfo().getEncodedName() + " of " + loc.getRegionInfo().getTable() + + " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " + + TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() + + " ms", + error); + } + boolean scannerClosed = + error instanceof UnknownScannerException || error instanceof NotServingRegionException + || error instanceof RegionServerStoppedException; + RetriesExhaustedException.ThrowableWithExtraContext qt = + new RetriesExhaustedException.ThrowableWithExtraContext(error, + EnvironmentEdgeManager.currentTime(), ""); + exceptions.add(qt); + if (tries >= maxAttempts) { + completeExceptionally(!scannerClosed); + return; + } + long delayNs; + if (scanTimeoutNs > 0) { + long maxDelayNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs); + if (maxDelayNs <= 0) { + completeExceptionally(!scannerClosed); + return; + } + delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); + } else { + delayNs = getPauseTime(pauseNs, tries - 1); + } + if (scannerClosed) { + completeWhenError(false); + return; + } + if (error instanceof OutOfOrderScannerNextException || error instanceof ScannerResetException) { + completeWhenError(true); + return; + } + if (error instanceof DoNotRetryIOException) { + completeExceptionally(true); + return; + } + tries++; + retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS); + } + + private void updateNextStartRowWhenError(Result result) { + nextStartRowWhenError = result.getRow(); + includeNextStartRowWhenError = result.isPartial(); + } + + private void completeWhenNoMoreResultsInRegion() { + if (isEmptyStopRow(scan.getStopRow())) { + if (isEmptyStopRow(loc.getRegionInfo().getEndKey())) { + completeNoMoreResults(); + } + } else { + if (Bytes.compareTo(loc.getRegionInfo().getEndKey(), scan.getStopRow()) >= 0) { + completeNoMoreResults(); + } + } + completeWithNextStartRow(loc.getRegionInfo().getEndKey()); + } + + private void completeReversedWhenNoMoreResultsInRegion() { + if (isEmptyStopRow(scan.getStopRow())) { + if (isEmptyStartRow(loc.getRegionInfo().getStartKey())) { + completeNoMoreResults(); + } + } else { + if (Bytes.compareTo(loc.getRegionInfo().getStartKey(), scan.getStopRow()) <= 0) { + completeNoMoreResults(); + } + } + completeWithNextStartRow(loc.getRegionInfo().getStartKey()); + } + + private void onComplete(ScanResponse resp) { + if (controller.failed()) { + onError(controller.getFailed()); + return; + } + boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage(); + Result[] results; + try { + results = resultCache.addAndGet( + Optional.ofNullable(ResponseConverter.getResults(controller.cellScanner(), resp)) + .orElse(ScanResultCache.EMPTY_RESULT_ARRAY), + isHeartbeatMessage); + } catch (IOException e) { + // We can not retry here. The server has responded normally and the call sequence has been + // increased so a new scan with the same call sequence will cause an + // OutOfOrderScannerNextException. Let the upper layer open a scanner. + LOG.warn("decode scan response failed", e); + completeWhenError(true); + return; + } + + boolean stopByUser; + if (results.length == 0) { + // if we have nothing to return then this must be a heartbeat message. + stopByUser = !consumer.onHeartbeat(); + } else { + updateNextStartRowWhenError(results[results.length - 1]); + stopByUser = !consumer.onNext(results); + } + if (resp.hasMoreResults() && !resp.getMoreResults()) { + // RS tells us there is no more data for the whole scan + completeNoMoreResults(); + return; + } + if (stopByUser) { + if (resp.getMoreResultsInRegion()) { + // we have more results in region but user request to stop the scan, so we need to close the + // scanner explicitly. + closeScanner(); + } + completeNoMoreResults(); + return; + } + // as in 2.0 this value will always be set + if (!resp.getMoreResultsInRegion()) { + completeWhenNoMoreResultsInRegion.run(); + return; + } + next(); + } + + private void call() { + resetController(controller, rpcTimeoutNs); + ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false, + nextCallSeq, false, false); + stub.scan(controller, req, this::onComplete); + } + + private void next() { + nextCallSeq++; + tries = 0; + exceptions.clear(); + nextCallStartNs = System.nanoTime(); + call(); + } + + /** + * @return return the scan object and locate direction for next open scanner call, or null if we + * should stop. + */ + public CompletableFuture start() { + next(); + return future; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index f10c9a5..36687c6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -18,14 +18,13 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; +import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; +import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import io.netty.util.HashedWheelTimer; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; import java.io.IOException; -import java.lang.reflect.UndeclaredThrowableException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -40,11 +39,9 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.ipc.RemoteException; /** * Retry caller for a single request, such as get, put, delete, etc. @@ -121,19 +118,6 @@ class AsyncSingleRequestRpcRetryingCaller { return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); } - private static Throwable translateException(Throwable t) { - if (t instanceof UndeclaredThrowableException && t.getCause() != null) { - t = t.getCause(); - } - if (t instanceof RemoteException) { - t = ((RemoteException) t).unwrapRemoteException(); - } - if (t instanceof ServiceException && t.getCause() != null) { - t = translateException(t.getCause()); - } - return t; - } - private void completeExceptionally() { future.completeExceptionally(new RetriesExhaustedException(tries, exceptions)); } @@ -165,22 +149,7 @@ class AsyncSingleRequestRpcRetryingCaller { } updateCachedLocation.accept(error); tries++; - retryTimer.newTimeout(new TimerTask() { - - @Override - public void run(Timeout timeout) throws Exception { - // always restart from beginning. - locateThenCall(); - } - }, delayNs, TimeUnit.NANOSECONDS); - } - - private void resetController() { - controller.reset(); - if (rpcTimeoutNs >= 0) { - controller.setCallTimeout( - (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(rpcTimeoutNs))); - } + retryTimer.newTimeout(t -> locateThenCall(), delayNs, TimeUnit.NANOSECONDS); } private void call(HRegionLocation loc) { @@ -197,7 +166,7 @@ class AsyncSingleRequestRpcRetryingCaller { err -> conn.getLocator().updateCachedLocation(loc, err)); return; } - resetController(); + resetController(controller, rpcTimeoutNs); callable.call(controller, loc, stub).whenComplete((result, error) -> { if (error != null) { onError(error, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index 94747b9..db5265a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -100,6 +100,21 @@ public interface AsyncTable { long getOperationTimeout(TimeUnit unit); /** + * Set timeout of a single operation in a scan, such as openScanner and next. Will override the + * value {@code hbase.client.scanner.timeout.period} in configuration. + *

+ * Generally a scan will never timeout after we add heartbeat support unless the region is + * crashed. The {@code scanTimeout} works like the {@code operationTimeout} for each single + * operation in a scan. + */ + void setScanTimeout(long timeout, TimeUnit unit); + + /** + * Get the timeout of a single operation in a scan. + */ + long getScanTimeout(TimeUnit unit); + + /** * Test for the existence of columns in the table, as specified by the Get. *

* This will return true if the Get matches one or more keys, false if not. @@ -335,4 +350,26 @@ public interface AsyncTable { * {@link CompletableFuture}. */ CompletableFuture> smallScan(Scan scan, int limit); + + /** + * The basic scan API uses the observer pattern. All results that match the given scan object will + * be passed to the given {@code scanObserver} by calling {@link ScanConsumer#onNext(Result[])}. + * {@link ScanConsumer#onComplete()} means the scan is finished, and + * {@link ScanConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan is + * terminated. {@link ScanConsumer#onHeartbeat()} means the RS is still working but we can not get + * a valid result to call {@link ScanConsumer#onNext(Result[])}. This is usually because the + * matched results are too sparse, for example, a filter which almost filters out everything is + * specified. + *

+ * Notice that, the methods of the given {@code scanObserver} will be called directly in the rpc + * framework's callback thread, so typically you should not do any time consuming work inside + * these methods, otherwise you will be likely to block at least one connection to RS(even more if + * the rpc framework uses NIO). + *

+ * This method is only for experts, do NOT use this method if you have other + * choice. + * @param scan A configured {@link Scan} object. + * @param consumer the consumer used to receive results. + */ + void scan(Scan scan, ScanConsumer consumer); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index ce53775..8fbd0cf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -25,6 +25,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; @@ -55,6 +57,8 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; @InterfaceAudience.Private class AsyncTableImpl implements AsyncTable { + private static final Log LOG = LogFactory.getLog(AsyncTableImpl.class); + private final AsyncConnectionImpl conn; private final TableName tableName; @@ -348,6 +352,20 @@ class AsyncTableImpl implements AsyncTable { .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call(); } + public void scan(Scan scan, ScanConsumer consumer) { + if (scan.isSmall()) { + if (scan.getBatch() > 0 || scan.getAllowPartialResults()) { + consumer.onError( + new IllegalArgumentException("Batch and allowPartial is not allowed for small scan")); + } else { + LOG.warn("This is small scan " + scan + ", consider using smallScan directly?"); + } + } + scan = setDefaultScanConfig(scan); + new AsyncClientScanner(scan, consumer, tableName, conn, scanTimeoutNs, readRpcTimeoutNs) + .start(); + } + @Override public void setReadRpcTimeout(long timeout, TimeUnit unit) { this.readRpcTimeoutNs = unit.toNanos(timeout); @@ -377,4 +395,14 @@ class AsyncTableImpl implements AsyncTable { public long getOperationTimeout(TimeUnit unit) { return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); } + + @Override + public void setScanTimeout(long timeout, TimeUnit unit) { + this.scanTimeoutNs = unit.toNanos(timeout); + } + + @Override + public long getScanTimeout(TimeUnit unit) { + return TimeUnit.NANOSECONDS.convert(scanTimeoutNs, unit); + } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 00ff350..b7bdb83 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.util.Bytes; */ @InterfaceAudience.Private public abstract class ClientScanner extends AbstractClientScanner { + private static final Log LOG = LogFactory.getLog(ClientScanner.class); protected Scan scan; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java deleted file mode 100644 index 48bfb18..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.ReflectionUtils; - -/** - * Get instance of configured Registry. - */ -@InterfaceAudience.Private -final class ClusterRegistryFactory { - - static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl"; - - private ClusterRegistryFactory() { - } - - /** - * @return The cluster registry implementation to use. - */ - static AsyncRegistry getRegistry(Configuration conf) { - Class clazz = - conf.getClass(REGISTRY_IMPL_CONF_KEY, ZKAsyncRegistry.class, AsyncRegistry.class); - return ReflectionUtils.newInstance(clazz, conf); - } -} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteResultScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteResultScanResultCache.java new file mode 100644 index 0000000..abf954d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteResultScanResultCache.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A scan result cache that only returns complete result. + */ +@InterfaceAudience.Private +class CompleteResultScanResultCache implements ScanResultCache { + + private final List partialResults = new ArrayList<>(); + + private Result combine() throws IOException { + Result result = Result.createCompleteResult(partialResults); + partialResults.clear(); + return result; + } + + private Result[] prependCombined(Result[] results, int length) throws IOException { + Result[] prependResults = new Result[length + 1]; + prependResults[0] = combine(); + System.arraycopy(results, 0, prependResults, 1, length); + return prependResults; + } + + @Override + public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException { + // If no results were returned it indicates that either we have the all the partial results + // necessary to construct the complete result or the server had to send a heartbeat message + // to the client to keep the client-server connection alive + if (results.length == 0) { + // If this response was an empty heartbeat message, then we have not exhausted the region + // and thus there may be more partials server side that still need to be added to the partial + // list before we form the complete Result + if (!partialResults.isEmpty() && !isHeartbeatMessage) { + return new Result[] { combine() }; + } + return EMPTY_RESULT_ARRAY; + } + // In every RPC response there should be at most a single partial result. Furthermore, if + // there is a partial result, it is guaranteed to be in the last position of the array. + Result last = results[results.length - 1]; + if (last.isPartial()) { + if (partialResults.isEmpty()) { + partialResults.add(last); + return Arrays.copyOf(results, results.length - 1); + } + // We have only one result and it is partial + if (results.length == 1) { + // check if there is a row change + if (Bytes.equals(partialResults.get(0).getRow(), last.getRow())) { + partialResults.add(last); + return EMPTY_RESULT_ARRAY; + } + Result completeResult = combine(); + partialResults.add(last); + return new Result[] { completeResult }; + } + // We have some complete results + Result[] resultsToReturn = prependCombined(results, results.length - 1); + partialResults.add(last); + return resultsToReturn; + } + if (!partialResults.isEmpty()) { + return prependCombined(results, results.length); + } + return results; + } + + @Override + public void clear() { + partialResults.clear(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 729f874..d464c3b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -24,11 +24,13 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Arrays; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,11 +39,14 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.ipc.RemoteException; /** * Utility used by client connections. @@ -264,4 +269,25 @@ public final class ConnectionUtils { static boolean isEmptyStopRow(byte[] row) { return Bytes.equals(row, EMPTY_END_ROW); } + + static void resetController(HBaseRpcController controller, long timeoutNs) { + controller.reset(); + if (timeoutNs >= 0) { + controller.setCallTimeout( + (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(timeoutNs))); + } + } + + static Throwable translateException(Throwable t) { + if (t instanceof UndeclaredThrowableException && t.getCause() != null) { + t = t.getCause(); + } + if (t instanceof RemoteException) { + t = ((RemoteException) t).unwrapRemoteException(); + } + if (t instanceof ServiceException && t.getCause() != null) { + t = translateException(t.getCause()); + } + return t; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanConsumer.java new file mode 100644 index 0000000..77b8f3b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanConsumer.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Result; + +/** + * Receives {@link Result} from an asynchronous scanner. + *

+ * Notice that, the {@link #onNext(Result[])} method will be called in the thread which we send + * request to HBase service. So if you want the asynchronous scanner fetch data from HBase in + * background while you process the returned data, you need to move the processing work to another + * thread to make the {@code onNext} call return immediately. And please do NOT do any time + * consuming tasks in all methods below unless you know what you are doing. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface ScanConsumer { + + /** + * @param results the data fetched from HBase service. + * @return {@code false} if you want to stop the scanner process. Otherwise {@code true} + */ + boolean onNext(Result[] results); + + /** + * Indicate that there is an heartbeat message but we have not cumulated enough cells to call + * onNext. + *

+ * This method give you a chance to terminate a slow scan operation. + * @return {@code false} if you want to stop the scanner process. Otherwise {@code true} + */ + boolean onHeartbeat(); + + /** + * Indicate that we hit an unrecoverable error and the scan operation is terminated. + *

+ * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}. + */ + void onError(Throwable error); + + /** + * Indicate that the scan operation is completed normally. + */ + void onComplete(); +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java new file mode 100644 index 0000000..2366b57 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Used to separate the row constructing logic. + *

+ * After we add heartbeat support for scan, RS may return partial result even if allowPartial is + * false and batch is 0. With this interface, the implementation now looks like: + *

    + *
  1. Get results from ScanResponse proto.
  2. + *
  3. Pass them to ScanResultCache and get something back.
  4. + *
  5. If we actually get something back, then pass it to ScanObserver.
  6. + *
+ */ +@InterfaceAudience.Private +interface ScanResultCache { + + static final Result[] EMPTY_RESULT_ARRAY = new Result[0]; + + /** + * Add the given results to cache and get valid results back. + * @param results the results of a scan next. Must not be null. + * @param isHeartbeatMessage indicate whether the results is gotten from a heartbeat response. + * @return valid results, never null. + */ + Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException; + + /** + * Clear the cached result if any. Called when scan error and we will start from a start of a row + * again. + */ + void clear(); +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java new file mode 100644 index 0000000..a0792ef --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public abstract class AbstractTestAsyncTableScan { + + protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + protected static TableName TABLE_NAME = TableName.valueOf("async"); + + protected static byte[] FAMILY = Bytes.toBytes("cf"); + + protected static byte[] CQ1 = Bytes.toBytes("cq1"); + + protected static byte[] CQ2 = Bytes.toBytes("cq2"); + + protected static int COUNT = 1000; + + protected static AsyncConnection ASYNC_CONN; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.startMiniCluster(3); + byte[][] splitKeys = new byte[8][]; + for (int i = 111; i < 999; i += 111) { + splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); + } + TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + List> futures = new ArrayList<>(); + IntStream.range(0, COUNT).forEach( + i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i))) + .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))))); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + } + + @AfterClass + public static void tearDown() throws Exception { + ASYNC_CONN.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + protected abstract Scan createScan(); + + protected abstract List doScan(AsyncTable table, Scan scan) throws Exception; + + @Test + public void testScanAll() throws Exception { + List results = doScan(ASYNC_CONN.getTable(TABLE_NAME), createScan()); + assertEquals(COUNT, results.size()); + IntStream.range(0, COUNT).forEach(i -> { + Result result = results.get(i); + assertEquals(String.format("%03d", i), Bytes.toString(result.getRow())); + assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1))); + }); + } + + private void assertResultEquals(Result result, int i) { + assertEquals(String.format("%03d", i), Bytes.toString(result.getRow())); + assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1))); + assertEquals(i * i, Bytes.toInt(result.getValue(FAMILY, CQ2))); + } + + @Test + public void testReversedScanAll() throws Exception { + List results = doScan(ASYNC_CONN.getTable(TABLE_NAME), createScan().setReversed(true)); + assertEquals(COUNT, results.size()); + IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1)); + } + + @Test + public void testScanNoStopKey() throws Exception { + int start = 345; + List results = doScan(ASYNC_CONN.getTable(TABLE_NAME), + createScan().setStartRow(Bytes.toBytes(String.format("%03d", start)))); + assertEquals(COUNT - start, results.size()); + IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i)); + } + + @Test + public void testReverseScanNoStopKey() throws Exception { + int start = 765; + List results = doScan(ASYNC_CONN.getTable(TABLE_NAME), + createScan().setStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true)); + assertEquals(start + 1, results.size()); + IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i)); + } + + private void testScan(int start, int stop) throws Exception { + List results = doScan(ASYNC_CONN.getTable(TABLE_NAME), + createScan().setStartRow(Bytes.toBytes(String.format("%03d", start))) + .setStopRow(Bytes.toBytes(String.format("%03d", stop)))); + assertEquals(stop - start, results.size()); + IntStream.range(0, stop - start).forEach(i -> assertResultEquals(results.get(i), start + i)); + } + + private void testReversedScan(int start, int stop) throws Exception { + List results = doScan(ASYNC_CONN.getTable(TABLE_NAME), + createScan().setStartRow(Bytes.toBytes(String.format("%03d", start))) + .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setReversed(true)); + assertEquals(start - stop, results.size()); + IntStream.range(0, start - stop).forEach(i -> assertResultEquals(results.get(i), start - i)); + } + + @Test + public void testScanWithStartKeyAndStopKey() throws Exception { + testScan(345, 567); + } + + @Test + public void testReversedScanWithStartKeyAndStopKey() throws Exception { + testReversedScan(765, 543); + } + + @Test + public void testScanAtRegionBoundary() throws Exception { + testScan(222, 333); + } + + @Test + public void testReversedScanAtRegionBoundary() throws Exception { + testScan(222, 333); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java new file mode 100644 index 0000000..fc5ba14 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.Arrays; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ SmallTests.class, ClientTests.class }) +public class TestAllowPartialScanResultCache { + + private static byte[] CF = Bytes.toBytes("cf"); + + private AllowPartialScanResultCache resultCache; + + @Before + public void setUp() { + resultCache = new AllowPartialScanResultCache(); + } + + @After + public void tearDown() { + resultCache.clear(); + resultCache = null; + } + + private static Cell createCell(int key, int cq) { + return new KeyValue(Bytes.toBytes(key), CF, Bytes.toBytes("cq" + cq), Bytes.toBytes(key)); + } + + @Test + public void test() throws IOException { + assertSame(ScanResultCache.EMPTY_RESULT_ARRAY, + resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false)); + assertSame(ScanResultCache.EMPTY_RESULT_ARRAY, + resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true)); + + Cell[] cells1 = IntStream.range(0, 10).mapToObj(i -> createCell(1, i)).toArray(Cell[]::new); + Cell[] cells2 = IntStream.range(0, 10).mapToObj(i -> createCell(2, i)).toArray(Cell[]::new); + + Result[] results1 = resultCache.addAndGet( + new Result[] { Result.create(Arrays.copyOf(cells1, 5), null, false, true) }, false); + assertEquals(1, results1.length); + assertEquals(1, Bytes.toInt(results1[0].getRow())); + assertEquals(5, results1[0].rawCells().length); + IntStream.range(0, 5).forEach( + i -> assertEquals(1, Bytes.toInt(results1[0].getValue(CF, Bytes.toBytes("cq" + i))))); + + Result[] results2 = resultCache.addAndGet( + new Result[] { Result.create(Arrays.copyOfRange(cells1, 1, 10), null, false, true) }, false); + assertEquals(1, results2.length); + assertEquals(1, Bytes.toInt(results2[0].getRow())); + assertEquals(5, results2[0].rawCells().length); + IntStream.range(5, 10).forEach( + i -> assertEquals(1, Bytes.toInt(results2[0].getValue(CF, Bytes.toBytes("cq" + i))))); + + Result[] results3 = resultCache + .addAndGet(new Result[] { Result.create(cells1), Result.create(cells2) }, false); + assertEquals(1, results3.length); + assertEquals(2, Bytes.toInt(results3[0].getRow())); + assertEquals(10, results3[0].rawCells().length); + IntStream.range(0, 10).forEach( + i -> assertEquals(2, Bytes.toInt(results3[0].getValue(CF, Bytes.toBytes("cq" + i))))); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java new file mode 100644 index 0000000..4bbe9f3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertTrue; + +import com.google.common.base.Throwables; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Queue; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableScan extends AbstractTestAsyncTableScan { + + private static final class SimpleScanObserver implements ScanConsumer { + + private final Queue queue = new ArrayDeque<>(); + + private boolean finished; + + private Throwable error; + + @Override + public synchronized boolean onNext(Result[] results) { + for (Result result : results) { + queue.offer(result); + } + notifyAll(); + return true; + } + + @Override + public boolean onHeartbeat() { + return true; + } + + @Override + public synchronized void onError(Throwable error) { + finished = true; + this.error = error; + notifyAll(); + } + + @Override + public synchronized void onComplete() { + finished = true; + notifyAll(); + } + + public synchronized Result take() throws IOException, InterruptedException { + for (;;) { + if (!queue.isEmpty()) { + return queue.poll(); + } + if (finished) { + if (error != null) { + Throwables.propagateIfPossible(error, IOException.class); + throw new IOException(error); + } else { + return null; + } + } + wait(); + } + } + } + + @Parameter + public Supplier scanCreater; + + @Parameters + public static List params() { + return Arrays.asList(new Supplier[] { TestAsyncTableScan::createNormalScan }, + new Supplier[] { TestAsyncTableScan::createBatchScan }); + } + + private static Scan createNormalScan() { + return new Scan(); + } + + private static Scan createBatchScan() { + return new Scan().setBatch(1); + } + + @Override + protected Scan createScan() { + return scanCreater.get(); + } + + private Result convertToPartial(Result result) { + return Result.create(result.rawCells(), result.getExists(), result.isStale(), true); + } + + @Override + protected List doScan(AsyncTable table, Scan scan) throws Exception { + SimpleScanObserver scanObserver = new SimpleScanObserver(); + table.scan(scan, scanObserver); + List results = new ArrayList<>(); + for (Result result; (result = scanObserver.take()) != null;) { + results.add(result); + } + if (scan.getBatch() > 0) { + assertTrue(results.size() % 2 == 0); + return IntStream.range(0, results.size() / 2).mapToObj(i -> { + try { + return Result.createCompleteResult(Arrays.asList(convertToPartial(results.get(2 * i)), + convertToPartial(results.get(2 * i + 1)))); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).collect(Collectors.toList()); + } + return results; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java index 972780e..e920013 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java @@ -19,166 +19,18 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.stream.IntStream; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @Category({ MediumTests.class, ClientTests.class }) -public class TestAsyncTableSmallScan { - - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - private static TableName TABLE_NAME = TableName.valueOf("async"); - - private static byte[] FAMILY = Bytes.toBytes("cf"); - - private static byte[] QUALIFIER = Bytes.toBytes("cq"); - - private static int COUNT = 1000; - - private static AsyncConnection ASYNC_CONN; - - @BeforeClass - public static void setUp() throws Exception { - TEST_UTIL.startMiniCluster(3); - byte[][] splitKeys = new byte[8][]; - for (int i = 111; i < 999; i += 111) { - splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); - } - TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); - TEST_UTIL.waitTableAvailable(TABLE_NAME); - ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); - List> futures = new ArrayList<>(); - IntStream.range(0, COUNT) - .forEach(i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i))) - .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))))); - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); - } - - @AfterClass - public static void tearDown() throws Exception { - ASYNC_CONN.close(); - TEST_UTIL.shutdownMiniCluster(); - } - - @Test - public void testScanAll() throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); - List results = table.smallScan(new Scan().setSmall(true)).get(); - assertEquals(COUNT, results.size()); - IntStream.range(0, COUNT).forEach(i -> { - Result result = results.get(i); - assertEquals(String.format("%03d", i), Bytes.toString(result.getRow())); - assertEquals(i, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); - }); - } - - @Test - public void testReversedScanAll() throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); - List results = table.smallScan(new Scan().setSmall(true).setReversed(true)).get(); - assertEquals(COUNT, results.size()); - IntStream.range(0, COUNT).forEach(i -> { - Result result = results.get(i); - int actualIndex = COUNT - i - 1; - assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); - assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); - }); - } - - @Test - public void testScanNoStopKey() throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); - int start = 345; - List results = table - .smallScan(new Scan(Bytes.toBytes(String.format("%03d", start))).setSmall(true)).get(); - assertEquals(COUNT - start, results.size()); - IntStream.range(0, COUNT - start).forEach(i -> { - Result result = results.get(i); - int actualIndex = start + i; - assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); - assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); - }); - } - - @Test - public void testReverseScanNoStopKey() throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); - int start = 765; - List results = table - .smallScan( - new Scan(Bytes.toBytes(String.format("%03d", start))).setSmall(true).setReversed(true)) - .get(); - assertEquals(start + 1, results.size()); - IntStream.range(0, start + 1).forEach(i -> { - Result result = results.get(i); - int actualIndex = start - i; - assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); - assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); - }); - } - - private void testScan(int start, int stop) throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); - List results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start))) - .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true)).get(); - assertEquals(stop - start, results.size()); - IntStream.range(0, stop - start).forEach(i -> { - Result result = results.get(i); - int actualIndex = start + i; - assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); - assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); - }); - } - - private void testReversedScan(int start, int stop) - throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); - List results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start))) - .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true).setReversed(true)) - .get(); - assertEquals(start - stop, results.size()); - IntStream.range(0, start - stop).forEach(i -> { - Result result = results.get(i); - int actualIndex = start - i; - assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); - assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); - }); - } - - @Test - public void testScanWithStartKeyAndStopKey() throws InterruptedException, ExecutionException { - testScan(345, 567); - } - - @Test - public void testReversedScanWithStartKeyAndStopKey() - throws InterruptedException, ExecutionException { - testReversedScan(765, 543); - } - - @Test - public void testScanAtRegionBoundary() throws InterruptedException, ExecutionException { - testScan(222, 333); - } - - @Test - public void testReversedScanAtRegionBoundary() throws InterruptedException, ExecutionException { - testScan(222, 333); - } +public class TestAsyncTableSmallScan extends AbstractTestAsyncTableScan { @Test public void testScanWithLimit() throws InterruptedException, ExecutionException { @@ -194,7 +46,7 @@ public class TestAsyncTableSmallScan { Result result = results.get(i); int actualIndex = start + i; assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); - assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); + assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1))); }); } @@ -213,7 +65,17 @@ public class TestAsyncTableSmallScan { Result result = results.get(i); int actualIndex = start - i; assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); - assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); + assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1))); }); } + + @Override + protected Scan createScan() { + return new Scan().setSmall(true); + } + + @Override + protected List doScan(AsyncTable table, Scan scan) throws Exception { + return table.smallScan(scan).get(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java new file mode 100644 index 0000000..cc79636 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +import java.io.IOException; +import java.util.Arrays; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ SmallTests.class, ClientTests.class }) +public class TestCompleteResultScanResultCache { + + private static byte[] CF = Bytes.toBytes("cf"); + + private static byte[] CQ1 = Bytes.toBytes("cq1"); + + private static byte[] CQ2 = Bytes.toBytes("cq2"); + + private static byte[] CQ3 = Bytes.toBytes("cq3"); + + private CompleteResultScanResultCache resultCache; + + @Before + public void setUp() { + resultCache = new CompleteResultScanResultCache(); + } + + @After + public void tearDown() { + resultCache.clear(); + resultCache = null; + } + + private static Cell createCell(int key, byte[] cq) { + return new KeyValue(Bytes.toBytes(key), CF, cq, Bytes.toBytes(key)); + } + + @Test + public void testNoPartial() throws IOException { + assertSame(ScanResultCache.EMPTY_RESULT_ARRAY, + resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false)); + assertSame(ScanResultCache.EMPTY_RESULT_ARRAY, + resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true)); + int count = 10; + Result[] results = new Result[count]; + IntStream.range(0, count).forEach(i -> { + results[i] = Result.create(Arrays.asList(createCell(i, CQ1))); + }); + assertSame(results, resultCache.addAndGet(results, false)); + } + + @Test + public void testCombine1() throws IOException { + Result previousResult = Result.create(Arrays.asList(createCell(0, CQ1)), null, false, true); + Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true); + Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true); + Result result3 = Result.create(Arrays.asList(createCell(1, CQ3)), null, false, true); + Result[] results = resultCache.addAndGet(new Result[] { previousResult, result1 }, false); + assertEquals(1, results.length); + assertSame(previousResult, results[0]); + + assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length); + assertEquals(0, resultCache.addAndGet(new Result[] { result3 }, false).length); + assertEquals(0, resultCache.addAndGet(new Result[0], true).length); + + results = resultCache.addAndGet(new Result[0], false); + assertEquals(1, results.length); + assertEquals(1, Bytes.toInt(results[0].getRow())); + assertEquals(3, results[0].rawCells().length); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1))); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2))); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ3))); + } + + @Test + public void testCombine2() throws IOException { + Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true); + Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true); + Result result3 = Result.create(Arrays.asList(createCell(1, CQ3)), null, false, true); + Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, true); + Result nextToNextResult1 = Result.create(Arrays.asList(createCell(3, CQ2)), null, false, false); + + assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length); + assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length); + assertEquals(0, resultCache.addAndGet(new Result[] { result3 }, false).length); + + Result[] results = resultCache.addAndGet(new Result[] { nextResult1 }, false); + assertEquals(1, results.length); + assertEquals(1, Bytes.toInt(results[0].getRow())); + assertEquals(3, results[0].rawCells().length); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1))); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2))); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ3))); + + results = resultCache.addAndGet(new Result[] { nextToNextResult1 }, false); + assertEquals(2, results.length); + assertEquals(2, Bytes.toInt(results[0].getRow())); + assertEquals(1, results[0].rawCells().length); + assertEquals(2, Bytes.toInt(results[0].getValue(CF, CQ1))); + assertEquals(3, Bytes.toInt(results[1].getRow())); + assertEquals(1, results[1].rawCells().length); + assertEquals(3, Bytes.toInt(results[1].getValue(CF, CQ2))); + } + + @Test + public void testCombine3() throws IOException { + Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true); + Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true); + Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, false); + Result nextToNextResult1 = Result.create(Arrays.asList(createCell(3, CQ1)), null, false, true); + + assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length); + assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length); + + Result[] results = resultCache.addAndGet(new Result[] { nextResult1, nextToNextResult1 }, + false); + assertEquals(2, results.length); + assertEquals(1, Bytes.toInt(results[0].getRow())); + assertEquals(2, results[0].rawCells().length); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1))); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2))); + assertEquals(2, Bytes.toInt(results[1].getRow())); + assertEquals(1, results[1].rawCells().length); + assertEquals(2, Bytes.toInt(results[1].getValue(CF, CQ1))); + + results = resultCache.addAndGet(new Result[0], false); + assertEquals(1, results.length); + assertEquals(3, Bytes.toInt(results[0].getRow())); + assertEquals(1, results[0].rawCells().length); + assertEquals(3, Bytes.toInt(results[0].getValue(CF, CQ1))); + } +} -- 2.7.4