From bda688fb678b1ec25456559125cbd8235077f8ed Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 9 Dec 2016 22:33:35 +0800 Subject: [PATCH] HBASE-17142 Implement multi get --- .../client/AsyncMultiGetRpcRetryingCaller.java | 393 +++++++++++++++++++++ .../client/AsyncRpcRetryingCallerFactory.java | 45 +++ .../AsyncSingleRequestRpcRetryingCaller.java | 6 +- .../hadoop/hbase/client/ConnectionUtils.java | 3 + .../apache/hadoop/hbase/client/RawAsyncTable.java | 25 +- .../hadoop/hbase/client/RawAsyncTableImpl.java | 7 + .../hbase/client/TestRawAsyncTableMultiGet.java | 92 +++++ 7 files changed, 557 insertions(+), 14 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableMultiGet.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java new file mode 100644 index 0000000..b57091f --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java @@ -0,0 +1,393 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; +import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; +import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; +import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; +import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; +import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; + +import io.netty.util.HashedWheelTimer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MultiResponse.RegionResult; +import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * + */ +@InterfaceAudience.Private +class AsyncMultiGetRpcRetryingCaller { + + private static final Log LOG = LogFactory.getLog(AsyncMultiGetRpcRetryingCaller.class); + + private final HashedWheelTimer retryTimer; + + private final AsyncConnectionImpl conn; + + private final TableName tableName; + + private final List gets; + + private final List> futures; + + private final IdentityHashMap> get2Future; + + private final IdentityHashMap> get2Errors; + + private final long pauseNs; + + private final int maxAttempts; + + private final long operationTimeoutNs; + + private final long rpcTimeoutNs; + + private final int startLogErrorsCnt; + + private final long startNs; + + // we can not use HRegionLocation as the map key because the hashCode and equals method of + // HRegionLocation only consider serverName. + private static final class RegionRequest { + + public final HRegionLocation loc; + + public final ConcurrentLinkedQueue gets = new ConcurrentLinkedQueue<>(); + + public RegionRequest(HRegionLocation loc) { + this.loc = loc; + } + } + + public AsyncMultiGetRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, + TableName tableName, List gets, long pauseNs, int maxRetries, long operationTimeoutNs, + long rpcTimeoutNs, int startLogErrorsCnt) { + this.retryTimer = retryTimer; + this.conn = conn; + this.tableName = tableName; + this.gets = gets; + this.pauseNs = pauseNs; + this.maxAttempts = retries2Attempts(maxRetries); + this.operationTimeoutNs = operationTimeoutNs; + this.rpcTimeoutNs = rpcTimeoutNs; + this.startLogErrorsCnt = startLogErrorsCnt; + + this.futures = new ArrayList<>(gets.size()); + this.get2Future = new IdentityHashMap<>(gets.size()); + gets.forEach( + get -> futures.add(get2Future.computeIfAbsent(get, k -> new CompletableFuture<>()))); + this.get2Errors = new IdentityHashMap<>(); + this.startNs = System.nanoTime(); + } + + private long remainingTimeNs() { + return operationTimeoutNs - (System.nanoTime() - startNs); + } + + private List removeErrors(Get get) { + synchronized (get2Errors) { + return get2Errors.remove(get); + } + } + + private void logException(int tries, Throwable error, ServerName serverName) { + if (tries > startLogErrorsCnt) { + LOG.warn("Get data of " + tableName + " from " + serverName + " failed, tries=" + tries, + error); + } + } + + private String getExtras(ServerName serverName) { + return serverName != null ? serverName.getServerName() : ""; + } + + private void addError(Get get, Throwable error, ServerName serverName) { + List errors; + synchronized (get2Errors) { + errors = get2Errors.computeIfAbsent(get, k -> new ArrayList<>()); + } + errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(), + serverName != null ? serverName.toString() : "")); + } + + private void addError(Iterable gets, Throwable error, ServerName serverName) { + gets.forEach(get -> addError(get, error, serverName)); + } + + private void failOne(Get get, int tries, Throwable error, long currentTime, String extras) { + CompletableFuture future = get2Future.get(get); + if (future.isDone()) { + return; + } + ThrowableWithExtraContext errorWithCtx = + new ThrowableWithExtraContext(error, currentTime, extras); + List errors = removeErrors(get); + if (errors == null) { + errors = Collections.singletonList(errorWithCtx); + } else { + errors.add(errorWithCtx); + } + future.completeExceptionally(new RetriesExhaustedException(tries, errors)); + } + + private void failAll(Stream gets, int tries, Throwable error, ServerName serverName) { + long currentTime = System.currentTimeMillis(); + String extras = getExtras(serverName); + gets.forEach(get -> failOne(get, tries, error, currentTime, extras)); + } + + private void failAll(Stream gets, int tries) { + gets.forEach(get -> { + CompletableFuture future = get2Future.get(get); + if (future.isDone()) { + return; + } + future.completeExceptionally(new RetriesExhaustedException(tries, + Optional.ofNullable(removeErrors(get)).orElse(Collections.emptyList()))); + }); + } + + private ClientProtos.MultiRequest buildReq(Map getsByRegion) + throws IOException { + ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder(); + for (Map.Entry entry : getsByRegion.entrySet()) { + ClientProtos.RegionAction.Builder regionActionBuilder = + ClientProtos.RegionAction.newBuilder().setRegion( + RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, entry.getKey())); + int index = 0; + for (Get get : entry.getValue().gets) { + regionActionBuilder.addAction( + ClientProtos.Action.newBuilder().setIndex(index).setGet(ProtobufUtil.toGet(get))); + index++; + } + multiRequestBuilder.addRegionAction(regionActionBuilder); + } + return multiRequestBuilder.build(); + } + + private void onComplete(Map getsByRegion, int tries, ServerName serverName, + MultiResponse resp) { + List failedGets = new ArrayList<>(); + getsByRegion.forEach((rn, regionReq) -> { + RegionResult regionResult = resp.getResults().get(rn); + if (regionResult != null) { + int index = 0; + for (Get get : regionReq.gets) { + Object result = regionResult.result.get(index); + if (result == null) { + LOG.error("Server sent us neither result nor exception for row '" + + Bytes.toStringBinary(get.getRow()) + "' of " + Bytes.toStringBinary(rn)); + addError(get, new RuntimeException("Invalid response"), serverName); + failedGets.add(get); + } else if (result instanceof Throwable) { + Throwable error = translateException((Throwable) result); + logException(tries, error, serverName); + if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { + failOne(get, tries, error, EnvironmentEdgeManager.currentTime(), + getExtras(serverName)); + } else { + failedGets.add(get); + } + } else { + get2Future.get(get).complete((Result) result); + } + index++; + } + } else { + Throwable t = resp.getException(rn); + Throwable error; + if (t == null) { + LOG.error( + "Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn)); + error = new RuntimeException("Invalid response"); + } else { + error = translateException(t); + logException(tries, error, serverName); + conn.getLocator().updateCachedLocation(regionReq.loc, error); + if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { + failAll(regionReq.gets.stream(), tries, error, serverName); + return; + } + addError(regionReq.gets, error, serverName); + failedGets.addAll(regionReq.gets); + } + } + }); + if (!failedGets.isEmpty()) { + tryResubmit(failedGets.stream(), tries); + } + } + + private void send(Map> getsByServer, int tries) { + long callTimeoutNs; + if (operationTimeoutNs > 0) { + long remainingNs = remainingTimeNs(); + if (remainingNs <= 0) { + failAll(getsByServer.values().stream().flatMap(m -> m.values().stream()) + .flatMap(r -> r.gets.stream()), + tries); + return; + } + callTimeoutNs = Math.min(remainingNs, rpcTimeoutNs); + } else { + callTimeoutNs = rpcTimeoutNs; + } + getsByServer.forEach((sn, getsByRegion) -> { + ClientService.Interface stub; + try { + stub = conn.getRegionServerStub(sn); + } catch (IOException e) { + onError(getsByRegion, tries, e, sn); + return; + } + ClientProtos.MultiRequest req; + try { + req = buildReq(getsByRegion); + } catch (IOException e) { + onError(getsByRegion, tries, e, sn); + return; + } + HBaseRpcController controller = conn.rpcControllerFactory.newController(); + resetController(controller, callTimeoutNs); + stub.multi(controller, req, resp -> { + if (controller.failed()) { + onError(getsByRegion, tries, controller.getFailed(), sn); + } else { + try { + onComplete(getsByRegion, tries, sn, + ResponseConverter.getResults(req, resp, controller.cellScanner())); + } catch (Exception e) { + onError(getsByRegion, tries, e, sn); + return; + } + } + }); + }); + } + + private void onError(Map getsByRegion, int tries, Throwable t, + ServerName serverName) { + onError(getsByRegion.values().stream().flatMap(r -> r.gets.stream()), tries, t, serverName); + } + + private void onError(Stream gets, int tries, Throwable t, ServerName serverName) { + Throwable error = translateException(t); + logException(tries, error, serverName); + if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { + failAll(gets, tries, error, serverName); + return; + } + List copiedGets = gets.collect(Collectors.toList()); + addError(copiedGets, error, serverName); + tryResubmit(copiedGets.stream(), tries); + } + + private void tryResubmit(Stream gets, int tries) { + long delayNs; + if (operationTimeoutNs > 0) { + long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; + if (maxDelayNs <= 0) { + failAll(gets, tries); + return; + } + delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); + } else { + delayNs = getPauseTime(pauseNs, tries - 1); + } + retryTimer.newTimeout(t -> groupAndSend(gets, tries + 1), delayNs, TimeUnit.NANOSECONDS); + } + + private void groupAndSend(Stream gets, int tries) { + long locateTimeoutNs; + if (operationTimeoutNs > 0) { + locateTimeoutNs = remainingTimeNs(); + if (locateTimeoutNs <= 0) { + failAll(gets, tries); + return; + } + } else { + locateTimeoutNs = -1L; + } + ConcurrentMap> getsByServer = + new ConcurrentHashMap<>(); + ConcurrentLinkedQueue locateFailed = new ConcurrentLinkedQueue<>(); + CompletableFuture.allOf(gets.map(get -> conn.getLocator() + .getRegionLocation(tableName, get.getRow(), locateTimeoutNs).whenComplete((loc, error) -> { + if (error != null) { + error = translateException(error); + if (error instanceof DoNotRetryIOException) { + failOne(get, tries, error, EnvironmentEdgeManager.currentTime(), ""); + return; + } + addError(get, error, null); + locateFailed.add(get); + } else { + ConcurrentMap getsByRegion = computeIfAbsent(getsByServer, + loc.getServerName(), () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR)); + computeIfAbsent(getsByRegion, loc.getRegionInfo().getRegionName(), + () -> new RegionRequest(loc)).gets.add(get); + } + })).toArray(CompletableFuture[]::new)).whenComplete((v, r) -> { + if (!getsByServer.isEmpty()) { + send(getsByServer, tries); + } + if (!locateFailed.isEmpty()) { + tryResubmit(locateFailed.stream(), tries); + } + }); + } + + public List> call() { + groupAndSend(gets.stream(), 1); + return futures; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index c40de31..f1a4247 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -257,4 +257,49 @@ class AsyncRpcRetryingCallerFactory { public ScanSingleRegionCallerBuilder scanSingleRegion() { return new ScanSingleRegionCallerBuilder(); } + + public class MultiGetCallerBuilder { + + private TableName tableName; + + private List gets; + + private long operationTimeoutNs = -1L; + + private long rpcTimeoutNs = -1L; + + public MultiGetCallerBuilder table(TableName tableName) { + this.tableName = tableName; + return this; + } + + public MultiGetCallerBuilder gets(List gets) { + this.gets = gets; + return this; + } + + public MultiGetCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) { + this.operationTimeoutNs = unit.toNanos(operationTimeout); + return this; + } + + public MultiGetCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { + this.rpcTimeoutNs = unit.toNanos(rpcTimeout); + return this; + } + + public AsyncMultiGetRpcRetryingCaller build() { + return new AsyncMultiGetRpcRetryingCaller(retryTimer, conn, tableName, gets, + conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs, + rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt()); + } + + public List> call() { + return build().call(); + } + } + + public MultiGetCallerBuilder multiGet() { + return new MultiGetCallerBuilder(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 44a237d..d6da131 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; @@ -52,9 +53,6 @@ class AsyncSingleRequestRpcRetryingCaller { private static final Log LOG = LogFactory.getLog(AsyncSingleRequestRpcRetryingCaller.class); - // Add a delta to avoid timeout immediately after a retry sleeping. - private static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1); - @FunctionalInterface public interface Callable { CompletableFuture call(HBaseRpcController controller, HRegionLocation loc, @@ -146,7 +144,7 @@ class AsyncSingleRequestRpcRetryingCaller { } long delayNs; if (operationTimeoutNs > 0) { - long maxDelayNs = operationTimeoutNs - (System.nanoTime() - startNs) - SLEEP_DELTA_NS; + long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; if (maxDelayNs <= 0) { completeExceptionally(); return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 9df9fbb..cc27992 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -324,4 +324,7 @@ public final class ConnectionUtils { return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null, result.isStale(), true); } + + // Add a delta to avoid timeout immediately after a retry sleeping. + static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java index 823367a..1020743 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.client; +import java.util.List; +import java.util.concurrent.CompletableFuture; + import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -32,9 +35,9 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; * especially for the {@link #scan(Scan, RawScanResultConsumer)} below. *

* TODO: For now the only difference between this interface and {@link AsyncTable} is the scan - * method. The {@link RawScanResultConsumer} exposes the implementation details of a scan(heartbeat) so - * it is not suitable for a normal user. If it is still the only difference after we implement most - * features of AsyncTable, we can think about merge these two interfaces. + * method. The {@link RawScanResultConsumer} exposes the implementation details of a scan(heartbeat) + * so it is not suitable for a normal user. If it is still the only difference after we implement + * most features of AsyncTable, we can think about merge these two interfaces. */ @InterfaceAudience.Public @InterfaceStability.Unstable @@ -42,13 +45,13 @@ public interface RawAsyncTable extends AsyncTableBase { /** * The basic scan API uses the observer pattern. All results that match the given scan object will - * be passed to the given {@code consumer} by calling {@link RawScanResultConsumer#onNext(Result[])}. - * {@link RawScanResultConsumer#onComplete()} means the scan is finished, and - * {@link RawScanResultConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan - * is terminated. {@link RawScanResultConsumer#onHeartbeat()} means the RS is still working but we - * can not get a valid result to call {@link RawScanResultConsumer#onNext(Result[])}. This is usually - * because the matched results are too sparse, for example, a filter which almost filters out - * everything is specified. + * be passed to the given {@code consumer} by calling + * {@link RawScanResultConsumer#onNext(Result[])}. {@link RawScanResultConsumer#onComplete()} + * means the scan is finished, and {@link RawScanResultConsumer#onError(Throwable)} means we hit + * an unrecoverable error and the scan is terminated. {@link RawScanResultConsumer#onHeartbeat()} + * means the RS is still working but we can not get a valid result to call + * {@link RawScanResultConsumer#onNext(Result[])}. This is usually because the matched results are + * too sparse, for example, a filter which almost filters out everything is specified. *

* Notice that, the methods of the given {@code consumer} will be called directly in the rpc * framework's callback thread, so typically you should not do any time consuming work inside @@ -58,4 +61,6 @@ public interface RawAsyncTable extends AsyncTableBase { * @param consumer the consumer used to receive results. */ void scan(Scan scan, RawScanResultConsumer consumer); + + List> get(List gets); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index cdc90ab..6fad0da 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -405,4 +405,11 @@ class RawAsyncTableImpl implements RawAsyncTable { public long getScanTimeout(TimeUnit unit) { return TimeUnit.NANOSECONDS.convert(scanTimeoutNs, unit); } + + @Override + public List> get(List gets) { + return conn.callerFactory.multiGet().table(tableName).gets(gets) + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) + .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call(); + } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableMultiGet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableMultiGet.java new file mode 100644 index 0000000..53bb86b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableMultiGet.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestRawAsyncTableMultiGet { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] CQ = Bytes.toBytes("cq"); + + private static int COUNT = 100; + + private static AsyncConnection ASYNC_CONN; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.startMiniCluster(3); + byte[][] splitKeys = new byte[8][]; + for (int i = 11; i < 99; i += 11) { + splitKeys[i / 11 - 1] = Bytes.toBytes(String.format("%02d", i)); + } + TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); + RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME); + List> futures = new ArrayList<>(); + IntStream.range(0, COUNT).forEach(i -> futures.add(table.put( + new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i))))); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + } + + @AfterClass + public static void tearDown() throws Exception { + ASYNC_CONN.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws InterruptedException, ExecutionException { + RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME); + table.setReadRpcTimeout(1, TimeUnit.HOURS); + table.setOperationTimeout(1, TimeUnit.DAYS); + List gets = + IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(String.format("%02d", i)))) + .collect(Collectors.toList()); + List> futures = table.get(gets); + for (int i = 0; i < COUNT; i++) { + Result result = futures.get(i).get(); + assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ))); + } + } +} -- 1.9.1