From 7f8aa6b91ef47a47e2738ac876f68f887fa6759b Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 19 Dec 2016 16:09:04 +0800 Subject: [PATCH] HBASE-17142 Implement multi get --- .../client/AsyncMultiGetRpcRetryingCaller.java | 402 +++++++++++++++++++++ .../client/AsyncRpcRetryingCallerFactory.java | 45 +++ .../AsyncSingleRequestRpcRetryingCaller.java | 6 +- .../apache/hadoop/hbase/client/AsyncTableBase.java | 24 ++ .../apache/hadoop/hbase/client/AsyncTableImpl.java | 6 + .../hadoop/hbase/client/ConnectionUtils.java | 3 + .../apache/hadoop/hbase/client/RawAsyncTable.java | 20 +- .../hadoop/hbase/client/RawAsyncTableImpl.java | 7 + .../hbase/client/TestAsyncTableMultiGet.java | 163 +++++++++ 9 files changed, 662 insertions(+), 14 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableMultiGet.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java new file mode 100644 index 0000000..b7d1c13 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java @@ -0,0 +1,402 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; +import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; +import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; +import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; +import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; +import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; + +import io.netty.util.HashedWheelTimer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MultiResponse.RegionResult; +import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * Retry caller for multi get. + *

+ * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with + * other single operations + *

+ * And the {@link #maxAttempts} is a limit for each single get in the batch logically. In the + * implementation, we will record a {@code tries} parameter for each operation group, and if it is + * split to several groups when retrying, the sub groups will inherit {@code tries}. You can imagine + * that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of the depth + * of the tree. + */ +@InterfaceAudience.Private +class AsyncMultiGetRpcRetryingCaller { + + private static final Log LOG = LogFactory.getLog(AsyncMultiGetRpcRetryingCaller.class); + + private final HashedWheelTimer retryTimer; + + private final AsyncConnectionImpl conn; + + private final TableName tableName; + + private final List gets; + + private final List> futures; + + private final IdentityHashMap> get2Future; + + private final IdentityHashMap> get2Errors; + + private final long pauseNs; + + private final int maxAttempts; + + private final long operationTimeoutNs; + + private final long rpcTimeoutNs; + + private final int startLogErrorsCnt; + + private final long startNs; + + // we can not use HRegionLocation as the map key because the hashCode and equals method of + // HRegionLocation only consider serverName. + private static final class RegionRequest { + + public final HRegionLocation loc; + + public final ConcurrentLinkedQueue gets = new ConcurrentLinkedQueue<>(); + + public RegionRequest(HRegionLocation loc) { + this.loc = loc; + } + } + + public AsyncMultiGetRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, + TableName tableName, List gets, long pauseNs, int maxRetries, long operationTimeoutNs, + long rpcTimeoutNs, int startLogErrorsCnt) { + this.retryTimer = retryTimer; + this.conn = conn; + this.tableName = tableName; + this.gets = gets; + this.pauseNs = pauseNs; + this.maxAttempts = retries2Attempts(maxRetries); + this.operationTimeoutNs = operationTimeoutNs; + this.rpcTimeoutNs = rpcTimeoutNs; + this.startLogErrorsCnt = startLogErrorsCnt; + + this.futures = new ArrayList<>(gets.size()); + this.get2Future = new IdentityHashMap<>(gets.size()); + gets.forEach( + get -> futures.add(get2Future.computeIfAbsent(get, k -> new CompletableFuture<>()))); + this.get2Errors = new IdentityHashMap<>(); + this.startNs = System.nanoTime(); + } + + private long remainingTimeNs() { + return operationTimeoutNs - (System.nanoTime() - startNs); + } + + private List removeErrors(Get get) { + synchronized (get2Errors) { + return get2Errors.remove(get); + } + } + + private void logException(int tries, Throwable error, ServerName serverName) { + if (tries > startLogErrorsCnt) { + LOG.warn("Get data of " + tableName + " from " + serverName + " failed, tries=" + tries, + error); + } + } + + private String getExtras(ServerName serverName) { + return serverName != null ? serverName.getServerName() : ""; + } + + private void addError(Get get, Throwable error, ServerName serverName) { + List errors; + synchronized (get2Errors) { + errors = get2Errors.computeIfAbsent(get, k -> new ArrayList<>()); + } + errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(), + serverName != null ? serverName.toString() : "")); + } + + private void addError(Iterable gets, Throwable error, ServerName serverName) { + gets.forEach(get -> addError(get, error, serverName)); + } + + private void failOne(Get get, int tries, Throwable error, long currentTime, String extras) { + CompletableFuture future = get2Future.get(get); + if (future.isDone()) { + return; + } + ThrowableWithExtraContext errorWithCtx = + new ThrowableWithExtraContext(error, currentTime, extras); + List errors = removeErrors(get); + if (errors == null) { + errors = Collections.singletonList(errorWithCtx); + } else { + errors.add(errorWithCtx); + } + future.completeExceptionally(new RetriesExhaustedException(tries, errors)); + } + + private void failAll(Stream gets, int tries, Throwable error, ServerName serverName) { + long currentTime = System.currentTimeMillis(); + String extras = getExtras(serverName); + gets.forEach(get -> failOne(get, tries, error, currentTime, extras)); + } + + private void failAll(Stream gets, int tries) { + gets.forEach(get -> { + CompletableFuture future = get2Future.get(get); + if (future.isDone()) { + return; + } + future.completeExceptionally(new RetriesExhaustedException(tries, + Optional.ofNullable(removeErrors(get)).orElse(Collections.emptyList()))); + }); + } + + private ClientProtos.MultiRequest buildReq(Map getsByRegion) + throws IOException { + ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder(); + for (Map.Entry entry : getsByRegion.entrySet()) { + ClientProtos.RegionAction.Builder regionActionBuilder = + ClientProtos.RegionAction.newBuilder().setRegion( + RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, entry.getKey())); + int index = 0; + for (Get get : entry.getValue().gets) { + regionActionBuilder.addAction( + ClientProtos.Action.newBuilder().setIndex(index).setGet(ProtobufUtil.toGet(get))); + index++; + } + multiRequestBuilder.addRegionAction(regionActionBuilder); + } + return multiRequestBuilder.build(); + } + + private void onComplete(Map getsByRegion, int tries, ServerName serverName, + MultiResponse resp) { + List failedGets = new ArrayList<>(); + getsByRegion.forEach((rn, regionReq) -> { + RegionResult regionResult = resp.getResults().get(rn); + if (regionResult != null) { + int index = 0; + for (Get get : regionReq.gets) { + Object result = regionResult.result.get(index); + if (result == null) { + LOG.error("Server sent us neither result nor exception for row '" + + Bytes.toStringBinary(get.getRow()) + "' of " + Bytes.toStringBinary(rn)); + addError(get, new RuntimeException("Invalid response"), serverName); + failedGets.add(get); + } else if (result instanceof Throwable) { + Throwable error = translateException((Throwable) result); + logException(tries, error, serverName); + if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { + failOne(get, tries, error, EnvironmentEdgeManager.currentTime(), + getExtras(serverName)); + } else { + failedGets.add(get); + } + } else { + get2Future.get(get).complete((Result) result); + } + index++; + } + } else { + Throwable t = resp.getException(rn); + Throwable error; + if (t == null) { + LOG.error( + "Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn)); + error = new RuntimeException("Invalid response"); + } else { + error = translateException(t); + logException(tries, error, serverName); + conn.getLocator().updateCachedLocation(regionReq.loc, error); + if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { + failAll(regionReq.gets.stream(), tries, error, serverName); + return; + } + addError(regionReq.gets, error, serverName); + failedGets.addAll(regionReq.gets); + } + } + }); + if (!failedGets.isEmpty()) { + tryResubmit(failedGets.stream(), tries); + } + } + + private void send(Map> getsByServer, int tries) { + long callTimeoutNs; + if (operationTimeoutNs > 0) { + long remainingNs = remainingTimeNs(); + if (remainingNs <= 0) { + failAll(getsByServer.values().stream().flatMap(m -> m.values().stream()) + .flatMap(r -> r.gets.stream()), + tries); + return; + } + callTimeoutNs = Math.min(remainingNs, rpcTimeoutNs); + } else { + callTimeoutNs = rpcTimeoutNs; + } + getsByServer.forEach((sn, getsByRegion) -> { + ClientService.Interface stub; + try { + stub = conn.getRegionServerStub(sn); + } catch (IOException e) { + onError(getsByRegion, tries, e, sn); + return; + } + ClientProtos.MultiRequest req; + try { + req = buildReq(getsByRegion); + } catch (IOException e) { + onError(getsByRegion, tries, e, sn); + return; + } + HBaseRpcController controller = conn.rpcControllerFactory.newController(); + resetController(controller, callTimeoutNs); + stub.multi(controller, req, resp -> { + if (controller.failed()) { + onError(getsByRegion, tries, controller.getFailed(), sn); + } else { + try { + onComplete(getsByRegion, tries, sn, + ResponseConverter.getResults(req, resp, controller.cellScanner())); + } catch (Exception e) { + onError(getsByRegion, tries, e, sn); + return; + } + } + }); + }); + } + + private void onError(Map getsByRegion, int tries, Throwable t, + ServerName serverName) { + onError(getsByRegion.values().stream().flatMap(r -> r.gets.stream()), tries, t, serverName); + } + + private void onError(Stream gets, int tries, Throwable t, ServerName serverName) { + Throwable error = translateException(t); + logException(tries, error, serverName); + if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { + failAll(gets, tries, error, serverName); + return; + } + List copiedGets = gets.collect(Collectors.toList()); + addError(copiedGets, error, serverName); + tryResubmit(copiedGets.stream(), tries); + } + + private void tryResubmit(Stream gets, int tries) { + long delayNs; + if (operationTimeoutNs > 0) { + long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; + if (maxDelayNs <= 0) { + failAll(gets, tries); + return; + } + delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); + } else { + delayNs = getPauseTime(pauseNs, tries - 1); + } + retryTimer.newTimeout(t -> groupAndSend(gets, tries + 1), delayNs, TimeUnit.NANOSECONDS); + } + + private void groupAndSend(Stream gets, int tries) { + long locateTimeoutNs; + if (operationTimeoutNs > 0) { + locateTimeoutNs = remainingTimeNs(); + if (locateTimeoutNs <= 0) { + failAll(gets, tries); + return; + } + } else { + locateTimeoutNs = -1L; + } + ConcurrentMap> getsByServer = + new ConcurrentHashMap<>(); + ConcurrentLinkedQueue locateFailed = new ConcurrentLinkedQueue<>(); + CompletableFuture.allOf(gets.map(get -> conn.getLocator() + .getRegionLocation(tableName, get.getRow(), locateTimeoutNs).whenComplete((loc, error) -> { + if (error != null) { + error = translateException(error); + if (error instanceof DoNotRetryIOException) { + failOne(get, tries, error, EnvironmentEdgeManager.currentTime(), ""); + return; + } + addError(get, error, null); + locateFailed.add(get); + } else { + ConcurrentMap getsByRegion = computeIfAbsent(getsByServer, + loc.getServerName(), () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR)); + computeIfAbsent(getsByRegion, loc.getRegionInfo().getRegionName(), + () -> new RegionRequest(loc)).gets.add(get); + } + })).toArray(CompletableFuture[]::new)).whenComplete((v, r) -> { + if (!getsByServer.isEmpty()) { + send(getsByServer, tries); + } + if (!locateFailed.isEmpty()) { + tryResubmit(locateFailed.stream(), tries); + } + }); + } + + public List> call() { + groupAndSend(gets.stream(), 1); + return futures; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index c40de31..f1a4247 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -257,4 +257,49 @@ class AsyncRpcRetryingCallerFactory { public ScanSingleRegionCallerBuilder scanSingleRegion() { return new ScanSingleRegionCallerBuilder(); } + + public class MultiGetCallerBuilder { + + private TableName tableName; + + private List gets; + + private long operationTimeoutNs = -1L; + + private long rpcTimeoutNs = -1L; + + public MultiGetCallerBuilder table(TableName tableName) { + this.tableName = tableName; + return this; + } + + public MultiGetCallerBuilder gets(List gets) { + this.gets = gets; + return this; + } + + public MultiGetCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) { + this.operationTimeoutNs = unit.toNanos(operationTimeout); + return this; + } + + public MultiGetCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { + this.rpcTimeoutNs = unit.toNanos(rpcTimeout); + return this; + } + + public AsyncMultiGetRpcRetryingCaller build() { + return new AsyncMultiGetRpcRetryingCaller(retryTimer, conn, tableName, gets, + conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs, + rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt()); + } + + public List> call() { + return build().call(); + } + } + + public MultiGetCallerBuilder multiGet() { + return new MultiGetCallerBuilder(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 44a237d..d6da131 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; @@ -52,9 +53,6 @@ class AsyncSingleRequestRpcRetryingCaller { private static final Log LOG = LogFactory.getLog(AsyncSingleRequestRpcRetryingCaller.class); - // Add a delta to avoid timeout immediately after a retry sleeping. - private static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1); - @FunctionalInterface public interface Callable { CompletableFuture call(HBaseRpcController controller, HRegionLocation loc, @@ -146,7 +144,7 @@ class AsyncSingleRequestRpcRetryingCaller { } long delayNs; if (operationTimeoutNs > 0) { - long maxDelayNs = operationTimeoutNs - (System.nanoTime() - startNs) - SLEEP_DELTA_NS; + long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; if (maxDelayNs <= 0) { completeExceptionally(); return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java index e051a6b..a2b5247 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; @@ -351,4 +352,27 @@ public interface AsyncTableBase { * {@link CompletableFuture}. */ CompletableFuture> smallScan(Scan scan, int limit); + + /** + * Extracts certain cells from the given rows, in batch. + *

+ * Notice that you may not get all the results with this function, which means some of the + * returned {@link CompletableFuture}s may succeed while some of the other returned + * {@link CompletableFuture}s may fail. + * @param gets The objects that specify what data to fetch and from which rows. + * @return A list of {@link CompletableFuture}s that represent the result for each get. + */ + List> get(List gets); + + /** + * A simple version for batch get. It will fail if there are any failures and you will get the + * whole result list at once if the operation is succeeded. + * @param gets The objects that specify what data to fetch and from which rows. + * @return A {@link CompletableFuture} that wrapper the result list. + */ + default CompletableFuture> getAll(List gets) { + List> futures = get(gets); + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(Collectors.toList())); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index cecf815..6cc2551 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; @@ -191,4 +192,9 @@ class AsyncTableImpl implements AsyncTable { public void scan(Scan scan, ScanResultConsumer consumer) { pool.execute(() -> scan0(scan, consumer)); } + + @Override + public List> get(List gets) { + return rawTable.get(gets).stream().map(this::wrap).collect(Collectors.toList()); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 9df9fbb..cc27992 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -324,4 +324,7 @@ public final class ConnectionUtils { return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null, result.isStale(), true); } + + // Add a delta to avoid timeout immediately after a retry sleeping. + static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java index 823367a..0c292a6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java @@ -32,9 +32,9 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; * especially for the {@link #scan(Scan, RawScanResultConsumer)} below. *

* TODO: For now the only difference between this interface and {@link AsyncTable} is the scan - * method. The {@link RawScanResultConsumer} exposes the implementation details of a scan(heartbeat) so - * it is not suitable for a normal user. If it is still the only difference after we implement most - * features of AsyncTable, we can think about merge these two interfaces. + * method. The {@link RawScanResultConsumer} exposes the implementation details of a scan(heartbeat) + * so it is not suitable for a normal user. If it is still the only difference after we implement + * most features of AsyncTable, we can think about merge these two interfaces. */ @InterfaceAudience.Public @InterfaceStability.Unstable @@ -42,13 +42,13 @@ public interface RawAsyncTable extends AsyncTableBase { /** * The basic scan API uses the observer pattern. All results that match the given scan object will - * be passed to the given {@code consumer} by calling {@link RawScanResultConsumer#onNext(Result[])}. - * {@link RawScanResultConsumer#onComplete()} means the scan is finished, and - * {@link RawScanResultConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan - * is terminated. {@link RawScanResultConsumer#onHeartbeat()} means the RS is still working but we - * can not get a valid result to call {@link RawScanResultConsumer#onNext(Result[])}. This is usually - * because the matched results are too sparse, for example, a filter which almost filters out - * everything is specified. + * be passed to the given {@code consumer} by calling + * {@link RawScanResultConsumer#onNext(Result[])}. {@link RawScanResultConsumer#onComplete()} + * means the scan is finished, and {@link RawScanResultConsumer#onError(Throwable)} means we hit + * an unrecoverable error and the scan is terminated. {@link RawScanResultConsumer#onHeartbeat()} + * means the RS is still working but we can not get a valid result to call + * {@link RawScanResultConsumer#onNext(Result[])}. This is usually because the matched results are + * too sparse, for example, a filter which almost filters out everything is specified. *

* Notice that, the methods of the given {@code consumer} will be called directly in the rpc * framework's callback thread, so typically you should not do any time consuming work inside diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index cdc90ab..6fad0da 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -405,4 +405,11 @@ class RawAsyncTableImpl implements RawAsyncTable { public long getScanTimeout(TimeUnit unit) { return TimeUnit.NANOSECONDS.convert(scanTimeoutNs, unit); } + + @Override + public List> get(List gets) { + return conn.callerFactory.multiGet().table(tableName).gets(gets) + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) + .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call(); + } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableMultiGet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableMultiGet.java new file mode 100644 index 0000000..612e830 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableMultiGet.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.function.BiFunction; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableMultiGet { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] CQ = Bytes.toBytes("cq"); + + private static int COUNT = 100; + + private static AsyncConnection ASYNC_CONN; + + @Parameter + public Supplier getTable; + + private static RawAsyncTable getRawTable() { + return ASYNC_CONN.getRawTable(TABLE_NAME); + } + + private static AsyncTable getTable() { + return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + } + + @Parameters + public static List params() { + return Arrays.asList(new Supplier[] { TestAsyncTableMultiGet::getRawTable }, + new Supplier[] { TestAsyncTableMultiGet::getTable }); + } + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.startMiniCluster(3); + byte[][] splitKeys = new byte[8][]; + for (int i = 11; i < 99; i += 11) { + splitKeys[i / 11 - 1] = Bytes.toBytes(String.format("%02d", i)); + } + TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + TEST_UTIL.getAdmin().setBalancerRunning(false, true); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); + RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME); + List> futures = new ArrayList<>(); + IntStream.range(0, COUNT).forEach(i -> futures.add(table.put( + new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i))))); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + } + + @AfterClass + public static void tearDown() throws Exception { + ASYNC_CONN.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + private void move() throws IOException, InterruptedException { + HRegionServer src = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME); + HRegionServer dst = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() + .map(t -> t.getRegionServer()).filter(r -> r != src).findAny().get(); + Region region = src.getOnlineRegions(TABLE_NAME).stream().findAny().get(); + TEST_UTIL.getAdmin().move(region.getRegionInfo().getEncodedNameAsBytes(), + Bytes.toBytes(dst.getServerName().getServerName())); + Thread.sleep(1000); + } + + private void test(BiFunction, List> getFunc) + throws IOException, InterruptedException { + AsyncTableBase table = getTable.get(); + List gets = + IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(String.format("%02d", i)))) + .collect(Collectors.toList()); + List results = getFunc.apply(table, gets); + assertEquals(COUNT, results.size()); + for (int i = 0; i < COUNT; i++) { + Result result = results.get(i); + assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ))); + } + // test basic failure recovery + move(); + results = getFunc.apply(table, gets); + assertEquals(COUNT, results.size()); + for (int i = 0; i < COUNT; i++) { + Result result = results.get(i); + assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ))); + } + } + + @Test + public void testGet() throws InterruptedException, IOException { + test((table, gets) -> { + return table.get(gets).stream().map(f -> { + try { + return f.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + }); + + } + + @Test + public void testGetAll() throws InterruptedException, IOException { + test((table, gets) -> { + try { + return table.getAll(gets).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + } +} -- 2.7.4