From b89db58185b5e231396db96722d728c5f8fa7a20 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 22 Dec 2016 18:40:32 +0800 Subject: [PATCH] HBASE-17345 Implement batch --- .../hbase/client/AsyncBatchRpcRetryingCaller.java | 436 +++++++++++++++++++++ .../client/AsyncMultiGetRpcRetryingCaller.java | 407 ------------------- .../client/AsyncRpcRetryingCallerFactory.java | 30 +- .../AsyncScanSingleRegionRpcRetryingCaller.java | 2 +- .../AsyncSingleRequestRpcRetryingCaller.java | 2 +- .../apache/hadoop/hbase/client/AsyncTableBase.java | 103 ++++- .../apache/hadoop/hbase/client/AsyncTableImpl.java | 4 +- .../hadoop/hbase/client/ConnectionUtils.java | 62 ++- .../hadoop/hbase/client/RawAsyncTableImpl.java | 4 +- .../hbase/shaded/protobuf/RequestConverter.java | 5 +- .../hbase/client/AbstractTestAsyncTableScan.java | 12 +- .../hbase/client/TestAsyncGetMultiThread.java | 150 ------- .../hadoop/hbase/client/TestAsyncTableBatch.java | 236 +++++++++++ .../client/TestAsyncTableGetMultiThreaded.java | 149 +++++++ .../hbase/client/TestAsyncTableMultiGet.java | 163 -------- 15 files changed, 982 insertions(+), 783 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableMultiGet.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java new file mode 100644 index 0000000..30dc232 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -0,0 +1,436 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.CellUtil.createCellScanner; +import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; +import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; +import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; +import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; +import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; +import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; + +import io.netty.util.HashedWheelTimer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CellScannable; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MultiResponse.RegionResult; +import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * Retry caller for batch. + *

+ * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with + * other single operations + *

+ * And the {@link #maxAttempts} is a limit for each single operation in the batch logically. In the + * implementation, we will record a {@code tries} parameter for each operation group, and if it is + * split to several groups when retrying, the sub groups will inherit the {@code tries}. You can + * imagine that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of + * the depth of the tree. + */ +@InterfaceAudience.Private +class AsyncBatchRpcRetryingCaller { + + private static final Log LOG = LogFactory.getLog(AsyncBatchRpcRetryingCaller.class); + + private final HashedWheelTimer retryTimer; + + private final AsyncConnectionImpl conn; + + private final TableName tableName; + + private final List actions; + + private final List> futures; + + private final IdentityHashMap> action2Future; + + private final IdentityHashMap> action2Errors; + + private final long pauseNs; + + private final int maxAttempts; + + private final long operationTimeoutNs; + + private final long rpcTimeoutNs; + + private final int startLogErrorsCnt; + + private final long startNs; + + // we can not use HRegionLocation as the map key because the hashCode and equals method of + // HRegionLocation only consider serverName. + private static final class RegionRequest { + + public final HRegionLocation loc; + + public final ConcurrentLinkedQueue actions = new ConcurrentLinkedQueue<>(); + + public RegionRequest(HRegionLocation loc) { + this.loc = loc; + } + } + + public AsyncBatchRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, + TableName tableName, List actions, long pauseNs, int maxRetries, + long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + this.retryTimer = retryTimer; + this.conn = conn; + this.tableName = tableName; + this.pauseNs = pauseNs; + this.maxAttempts = retries2Attempts(maxRetries); + this.operationTimeoutNs = operationTimeoutNs; + this.rpcTimeoutNs = rpcTimeoutNs; + this.startLogErrorsCnt = startLogErrorsCnt; + + this.actions = new ArrayList<>(actions.size()); + this.futures = new ArrayList<>(actions.size()); + this.action2Future = new IdentityHashMap<>(actions.size()); + for (int i = 0, n = actions.size(); i < n; i++) { + Row rawAction = actions.get(i); + Action action = new Action(rawAction, i); + if (rawAction instanceof Append || rawAction instanceof Increment) { + action.setNonce(conn.getNonceGenerator().newNonce()); + } + this.actions.add(action); + CompletableFuture future = new CompletableFuture<>(); + futures.add(future); + action2Future.put(action, future); + } + this.action2Errors = new IdentityHashMap<>(); + this.startNs = System.nanoTime(); + } + + private long remainingTimeNs() { + return operationTimeoutNs - (System.nanoTime() - startNs); + } + + private List removeErrors(Action action) { + synchronized (action2Errors) { + return action2Errors.remove(action); + } + } + + private void logException(int tries, Supplier> regionsSupplier, + Throwable error, ServerName serverName) { + if (tries > startLogErrorsCnt) { + String regions = + regionsSupplier.get().map(r -> "'" + r.loc.getRegionInfo().getRegionNameAsString() + "'") + .collect(Collectors.joining(",", "[", "]")); + LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName + + " failed, tries=" + tries, + error); + } + } + + private String getExtraContextForError(ServerName serverName) { + return serverName != null ? serverName.getServerName() : ""; + } + + private void addError(Action action, Throwable error, ServerName serverName) { + List errors; + synchronized (action2Errors) { + errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>()); + } + errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(), + getExtraContextForError(serverName))); + } + + private void addError(Iterable actions, Throwable error, ServerName serverName) { + actions.forEach(action -> addError(action, error, serverName)); + } + + private void failOne(Action action, int tries, Throwable error, long currentTime, String extras) { + CompletableFuture future = action2Future.get(action); + if (future.isDone()) { + return; + } + ThrowableWithExtraContext errorWithCtx = + new ThrowableWithExtraContext(error, currentTime, extras); + List errors = removeErrors(action); + if (errors == null) { + errors = Collections.singletonList(errorWithCtx); + } else { + errors.add(errorWithCtx); + } + future.completeExceptionally(new RetriesExhaustedException(tries - 1, errors)); + } + + private void failAll(Stream actions, int tries, Throwable error, ServerName serverName) { + long currentTime = EnvironmentEdgeManager.currentTime(); + String extras = getExtraContextForError(serverName); + actions.forEach(action -> failOne(action, tries, error, currentTime, extras)); + } + + private void failAll(Stream actions, int tries) { + actions.forEach(action -> { + CompletableFuture future = action2Future.get(action); + if (future.isDone()) { + return; + } + future.completeExceptionally(new RetriesExhaustedException(tries, + Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList()))); + }); + } + + private ClientProtos.MultiRequest buildReq(Map actionsByRegion, + List cells) throws IOException { + ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder(); + ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder(); + ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); + ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder(); + for (Map.Entry entry : actionsByRegion.entrySet()) { + // TODO: remove the extra for loop as we will iterate it in mutationBuilder. + if (!multiRequestBuilder.hasNonceGroup()) { + for (Action action : entry.getValue().actions) { + if (action.hasNonce()) { + multiRequestBuilder.setNonceGroup(conn.getNonceGenerator().getNonceGroup()); + break; + } + } + } + regionActionBuilder.clear(); + regionActionBuilder.setRegion( + RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, entry.getKey())); + regionActionBuilder = RequestConverter.buildNoDataRegionAction(entry.getKey(), + entry.getValue().actions, cells, regionActionBuilder, actionBuilder, mutationBuilder); + multiRequestBuilder.addRegionAction(regionActionBuilder.build()); + } + return multiRequestBuilder.build(); + } + + @SuppressWarnings("unchecked") + private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName, + RegionResult regionResult, List failedActions) { + Object result = regionResult.result.get(action.getOriginalIndex()); + if (result == null) { + LOG.error("Server " + serverName + " sent us neither result nor exception for row '" + + Bytes.toStringBinary(action.getAction().getRow()) + "' of " + + regionReq.loc.getRegionInfo().getRegionNameAsString()); + addError(action, new RuntimeException("Invalid response"), serverName); + failedActions.add(action); + } else if (result instanceof Throwable) { + Throwable error = translateException((Throwable) result); + logException(tries, () -> Stream.of(regionReq), error, serverName); + if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { + failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), + getExtraContextForError(serverName)); + } else { + failedActions.add(action); + } + } else { + action2Future.get(action).complete((T) result); + } + } + + private void onComplete(Map actionsByRegion, int tries, + ServerName serverName, MultiResponse resp) { + List failedActions = new ArrayList<>(); + actionsByRegion.forEach((rn, regionReq) -> { + RegionResult regionResult = resp.getResults().get(rn); + if (regionResult != null) { + regionReq.actions.forEach( + action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions)); + } else { + Throwable t = resp.getException(rn); + Throwable error; + if (t == null) { + LOG.error( + "Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn)); + error = new RuntimeException("Invalid response"); + } else { + error = translateException(t); + logException(tries, () -> Stream.of(regionReq), error, serverName); + conn.getLocator().updateCachedLocation(regionReq.loc, error); + if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { + failAll(regionReq.actions.stream(), tries, error, serverName); + return; + } + addError(regionReq.actions, error, serverName); + failedActions.addAll(regionReq.actions); + } + } + }); + if (!failedActions.isEmpty()) { + tryResubmit(failedActions.stream(), tries); + } + } + + private void send(Map> actionsByServer, + int tries) { + long callTimeoutNs; + if (operationTimeoutNs > 0) { + long remainingNs = remainingTimeNs(); + if (remainingNs <= 0) { + failAll(actionsByServer.values().stream().flatMap(m -> m.values().stream()) + .flatMap(r -> r.actions.stream()), + tries); + return; + } + callTimeoutNs = Math.min(remainingNs, rpcTimeoutNs); + } else { + callTimeoutNs = rpcTimeoutNs; + } + actionsByServer.forEach((sn, actionsByRegion) -> { + ClientService.Interface stub; + try { + stub = conn.getRegionServerStub(sn); + } catch (IOException e) { + onError(actionsByRegion, tries, e, sn); + return; + } + ClientProtos.MultiRequest req; + List cells = new ArrayList<>(); + try { + req = buildReq(actionsByRegion, cells); + } catch (IOException e) { + onError(actionsByRegion, tries, e, sn); + return; + } + HBaseRpcController controller = conn.rpcControllerFactory.newController(); + resetController(controller, callTimeoutNs); + if (!cells.isEmpty()) { + controller.setCellScanner(createCellScanner(cells)); + } + stub.multi(controller, req, resp -> { + if (controller.failed()) { + onError(actionsByRegion, tries, controller.getFailed(), sn); + } else { + try { + onComplete(actionsByRegion, tries, sn, + ResponseConverter.getResults(req, resp, controller.cellScanner())); + } catch (Exception e) { + onError(actionsByRegion, tries, e, sn); + return; + } + } + }); + }); + } + + private void onError(Map actionsByRegion, int tries, Throwable t, + ServerName serverName) { + Throwable error = translateException(t); + logException(tries, () -> actionsByRegion.values().stream(), error, serverName); + if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { + failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error, + serverName); + return; + } + List copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream()) + .collect(Collectors.toList()); + addError(copiedActions, error, serverName); + tryResubmit(copiedActions.stream(), tries); + } + + private void tryResubmit(Stream actions, int tries) { + long delayNs; + if (operationTimeoutNs > 0) { + long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; + if (maxDelayNs <= 0) { + failAll(actions, tries); + return; + } + delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); + } else { + delayNs = getPauseTime(pauseNs, tries - 1); + } + retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS); + } + + private void groupAndSend(Stream actions, int tries) { + long locateTimeoutNs; + if (operationTimeoutNs > 0) { + locateTimeoutNs = remainingTimeNs(); + if (locateTimeoutNs <= 0) { + failAll(actions, tries); + return; + } + } else { + locateTimeoutNs = -1L; + } + ConcurrentMap> actionsByServer = + new ConcurrentHashMap<>(); + ConcurrentLinkedQueue locateFailed = new ConcurrentLinkedQueue<>(); + CompletableFuture.allOf(actions + .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(), + RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> { + if (error != null) { + error = translateException(error); + if (error instanceof DoNotRetryIOException) { + failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), ""); + return; + } + addError(action, error, null); + locateFailed.add(action); + } else { + ConcurrentMap actionsByRegion = + computeIfAbsent(actionsByServer, loc.getServerName(), + () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR)); + computeIfAbsent(actionsByRegion, loc.getRegionInfo().getRegionName(), + () -> new RegionRequest(loc)).actions.add(action); + } + })) + .toArray(CompletableFuture[]::new)).whenComplete((v, r) -> { + if (!actionsByServer.isEmpty()) { + send(actionsByServer, tries); + } + if (!locateFailed.isEmpty()) { + tryResubmit(locateFailed.stream(), tries); + } + }); + } + + public List> call() { + groupAndSend(actions.stream(), 1); + return futures; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java deleted file mode 100644 index e1208c2..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMultiGetRpcRetryingCaller.java +++ /dev/null @@ -1,407 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; -import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; -import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; -import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; -import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; -import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; - -import io.netty.util.HashedWheelTimer; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.IdentityHashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.MultiResponse.RegionResult; -import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; -import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; - -/** - * Retry caller for multi get. - *

- * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with - * other single operations - *

- * And the {@link #maxAttempts} is a limit for each single get in the batch logically. In the - * implementation, we will record a {@code tries} parameter for each operation group, and if it is - * split to several groups when retrying, the sub groups will inherit {@code tries}. You can imagine - * that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of the depth - * of the tree. - */ -@InterfaceAudience.Private -class AsyncMultiGetRpcRetryingCaller { - - private static final Log LOG = LogFactory.getLog(AsyncMultiGetRpcRetryingCaller.class); - - private final HashedWheelTimer retryTimer; - - private final AsyncConnectionImpl conn; - - private final TableName tableName; - - private final List gets; - - private final List> futures; - - private final IdentityHashMap> get2Future; - - private final IdentityHashMap> get2Errors; - - private final long pauseNs; - - private final int maxAttempts; - - private final long operationTimeoutNs; - - private final long rpcTimeoutNs; - - private final int startLogErrorsCnt; - - private final long startNs; - - // we can not use HRegionLocation as the map key because the hashCode and equals method of - // HRegionLocation only consider serverName. - private static final class RegionRequest { - - public final HRegionLocation loc; - - public final ConcurrentLinkedQueue gets = new ConcurrentLinkedQueue<>(); - - public RegionRequest(HRegionLocation loc) { - this.loc = loc; - } - } - - public AsyncMultiGetRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, - TableName tableName, List gets, long pauseNs, int maxRetries, long operationTimeoutNs, - long rpcTimeoutNs, int startLogErrorsCnt) { - this.retryTimer = retryTimer; - this.conn = conn; - this.tableName = tableName; - this.gets = gets; - this.pauseNs = pauseNs; - this.maxAttempts = retries2Attempts(maxRetries); - this.operationTimeoutNs = operationTimeoutNs; - this.rpcTimeoutNs = rpcTimeoutNs; - this.startLogErrorsCnt = startLogErrorsCnt; - - this.futures = new ArrayList<>(gets.size()); - this.get2Future = new IdentityHashMap<>(gets.size()); - gets.forEach( - get -> futures.add(get2Future.computeIfAbsent(get, k -> new CompletableFuture<>()))); - this.get2Errors = new IdentityHashMap<>(); - this.startNs = System.nanoTime(); - } - - private long remainingTimeNs() { - return operationTimeoutNs - (System.nanoTime() - startNs); - } - - private List removeErrors(Get get) { - synchronized (get2Errors) { - return get2Errors.remove(get); - } - } - - private void logException(int tries, Supplier> regionsSupplier, - Throwable error, ServerName serverName) { - if (tries > startLogErrorsCnt) { - String regions = - regionsSupplier.get().map(r -> "'" + r.loc.getRegionInfo().getRegionNameAsString() + "'") - .collect(Collectors.joining(",", "[", "]")); - LOG.warn("Get data for " + regions + " in " + tableName + " from " + serverName - + " failed, tries=" + tries, - error); - } - } - - private String getExtras(ServerName serverName) { - return serverName != null ? serverName.getServerName() : ""; - } - - private void addError(Get get, Throwable error, ServerName serverName) { - List errors; - synchronized (get2Errors) { - errors = get2Errors.computeIfAbsent(get, k -> new ArrayList<>()); - } - errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(), - serverName != null ? serverName.toString() : "")); - } - - private void addError(Iterable gets, Throwable error, ServerName serverName) { - gets.forEach(get -> addError(get, error, serverName)); - } - - private void failOne(Get get, int tries, Throwable error, long currentTime, String extras) { - CompletableFuture future = get2Future.get(get); - if (future.isDone()) { - return; - } - ThrowableWithExtraContext errorWithCtx = - new ThrowableWithExtraContext(error, currentTime, extras); - List errors = removeErrors(get); - if (errors == null) { - errors = Collections.singletonList(errorWithCtx); - } else { - errors.add(errorWithCtx); - } - future.completeExceptionally(new RetriesExhaustedException(tries, errors)); - } - - private void failAll(Stream gets, int tries, Throwable error, ServerName serverName) { - long currentTime = System.currentTimeMillis(); - String extras = getExtras(serverName); - gets.forEach(get -> failOne(get, tries, error, currentTime, extras)); - } - - private void failAll(Stream gets, int tries) { - gets.forEach(get -> { - CompletableFuture future = get2Future.get(get); - if (future.isDone()) { - return; - } - future.completeExceptionally(new RetriesExhaustedException(tries, - Optional.ofNullable(removeErrors(get)).orElse(Collections.emptyList()))); - }); - } - - private ClientProtos.MultiRequest buildReq(Map getsByRegion) - throws IOException { - ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder(); - for (Map.Entry entry : getsByRegion.entrySet()) { - ClientProtos.RegionAction.Builder regionActionBuilder = - ClientProtos.RegionAction.newBuilder().setRegion( - RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, entry.getKey())); - int index = 0; - for (Get get : entry.getValue().gets) { - regionActionBuilder.addAction( - ClientProtos.Action.newBuilder().setIndex(index).setGet(ProtobufUtil.toGet(get))); - index++; - } - multiRequestBuilder.addRegionAction(regionActionBuilder); - } - return multiRequestBuilder.build(); - } - - private void onComplete(Map getsByRegion, int tries, ServerName serverName, - MultiResponse resp) { - List failedGets = new ArrayList<>(); - getsByRegion.forEach((rn, regionReq) -> { - RegionResult regionResult = resp.getResults().get(rn); - if (regionResult != null) { - int index = 0; - for (Get get : regionReq.gets) { - Object result = regionResult.result.get(index); - if (result == null) { - LOG.error("Server sent us neither result nor exception for row '" - + Bytes.toStringBinary(get.getRow()) + "' of " + Bytes.toStringBinary(rn)); - addError(get, new RuntimeException("Invalid response"), serverName); - failedGets.add(get); - } else if (result instanceof Throwable) { - Throwable error = translateException((Throwable) result); - logException(tries, () -> Stream.of(regionReq), error, serverName); - if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { - failOne(get, tries, error, EnvironmentEdgeManager.currentTime(), - getExtras(serverName)); - } else { - failedGets.add(get); - } - } else { - get2Future.get(get).complete((Result) result); - } - index++; - } - } else { - Throwable t = resp.getException(rn); - Throwable error; - if (t == null) { - LOG.error( - "Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn)); - error = new RuntimeException("Invalid response"); - } else { - error = translateException(t); - logException(tries, () -> Stream.of(regionReq), error, serverName); - conn.getLocator().updateCachedLocation(regionReq.loc, error); - if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { - failAll(regionReq.gets.stream(), tries, error, serverName); - return; - } - addError(regionReq.gets, error, serverName); - failedGets.addAll(regionReq.gets); - } - } - }); - if (!failedGets.isEmpty()) { - tryResubmit(failedGets.stream(), tries); - } - } - - private void send(Map> getsByServer, int tries) { - long callTimeoutNs; - if (operationTimeoutNs > 0) { - long remainingNs = remainingTimeNs(); - if (remainingNs <= 0) { - failAll(getsByServer.values().stream().flatMap(m -> m.values().stream()) - .flatMap(r -> r.gets.stream()), - tries); - return; - } - callTimeoutNs = Math.min(remainingNs, rpcTimeoutNs); - } else { - callTimeoutNs = rpcTimeoutNs; - } - getsByServer.forEach((sn, getsByRegion) -> { - ClientService.Interface stub; - try { - stub = conn.getRegionServerStub(sn); - } catch (IOException e) { - onError(getsByRegion, tries, e, sn); - return; - } - ClientProtos.MultiRequest req; - try { - req = buildReq(getsByRegion); - } catch (IOException e) { - onError(getsByRegion, tries, e, sn); - return; - } - HBaseRpcController controller = conn.rpcControllerFactory.newController(); - resetController(controller, callTimeoutNs); - stub.multi(controller, req, resp -> { - if (controller.failed()) { - onError(getsByRegion, tries, controller.getFailed(), sn); - } else { - try { - onComplete(getsByRegion, tries, sn, - ResponseConverter.getResults(req, resp, controller.cellScanner())); - } catch (Exception e) { - onError(getsByRegion, tries, e, sn); - return; - } - } - }); - }); - } - - private void onError(Map getsByRegion, int tries, Throwable t, - ServerName serverName) { - Throwable error = translateException(t); - logException(tries, () -> getsByRegion.values().stream(), error, serverName); - if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { - failAll(getsByRegion.values().stream().flatMap(r -> r.gets.stream()), tries, error, - serverName); - return; - } - List copiedGets = - getsByRegion.values().stream().flatMap(r -> r.gets.stream()).collect(Collectors.toList()); - addError(copiedGets, error, serverName); - tryResubmit(copiedGets.stream(), tries); - } - - private void tryResubmit(Stream gets, int tries) { - long delayNs; - if (operationTimeoutNs > 0) { - long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; - if (maxDelayNs <= 0) { - failAll(gets, tries); - return; - } - delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); - } else { - delayNs = getPauseTime(pauseNs, tries - 1); - } - retryTimer.newTimeout(t -> groupAndSend(gets, tries + 1), delayNs, TimeUnit.NANOSECONDS); - } - - private void groupAndSend(Stream gets, int tries) { - long locateTimeoutNs; - if (operationTimeoutNs > 0) { - locateTimeoutNs = remainingTimeNs(); - if (locateTimeoutNs <= 0) { - failAll(gets, tries); - return; - } - } else { - locateTimeoutNs = -1L; - } - ConcurrentMap> getsByServer = - new ConcurrentHashMap<>(); - ConcurrentLinkedQueue locateFailed = new ConcurrentLinkedQueue<>(); - CompletableFuture.allOf(gets.map(get -> conn.getLocator() - .getRegionLocation(tableName, get.getRow(), RegionLocateType.CURRENT, locateTimeoutNs) - .whenComplete((loc, error) -> { - if (error != null) { - error = translateException(error); - if (error instanceof DoNotRetryIOException) { - failOne(get, tries, error, EnvironmentEdgeManager.currentTime(), ""); - return; - } - addError(get, error, null); - locateFailed.add(get); - } else { - ConcurrentMap getsByRegion = computeIfAbsent(getsByServer, - loc.getServerName(), () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR)); - computeIfAbsent(getsByRegion, loc.getRegionInfo().getRegionName(), - () -> new RegionRequest(loc)).gets.add(get); - } - })).toArray(CompletableFuture[]::new)).whenComplete((v, r) -> { - if (!getsByServer.isEmpty()) { - send(getsByServer, tries); - } - if (!locateFailed.isEmpty()) { - tryResubmit(locateFailed.stream(), tries); - } - }); - } - - public List> call() { - groupAndSend(gets.stream(), 1); - return futures; - } -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index d240fab..5a34d8b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -70,8 +70,8 @@ class AsyncRpcRetryingCallerFactory { return this; } - public SingleRequestCallerBuilder action( - AsyncSingleRequestRpcRetryingCaller.Callable callable) { + public SingleRequestCallerBuilder + action(AsyncSingleRequestRpcRetryingCaller.Callable callable) { this.callable = callable; return this; } @@ -258,48 +258,48 @@ class AsyncRpcRetryingCallerFactory { return new ScanSingleRegionCallerBuilder(); } - public class MultiGetCallerBuilder { + public class BatchCallerBuilder { private TableName tableName; - private List gets; + private List actions; private long operationTimeoutNs = -1L; private long rpcTimeoutNs = -1L; - public MultiGetCallerBuilder table(TableName tableName) { + public BatchCallerBuilder table(TableName tableName) { this.tableName = tableName; return this; } - public MultiGetCallerBuilder gets(List gets) { - this.gets = gets; + public BatchCallerBuilder actions(List actions) { + this.actions = actions; return this; } - public MultiGetCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) { + public BatchCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) { this.operationTimeoutNs = unit.toNanos(operationTimeout); return this; } - public MultiGetCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { + public BatchCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { this.rpcTimeoutNs = unit.toNanos(rpcTimeout); return this; } - public AsyncMultiGetRpcRetryingCaller build() { - return new AsyncMultiGetRpcRetryingCaller(retryTimer, conn, tableName, gets, + public AsyncBatchRpcRetryingCaller build() { + return new AsyncBatchRpcRetryingCaller(retryTimer, conn, tableName, actions, conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt()); } - public List> call() { - return build().call(); + public List> call() { + return this. build().call(); } } - public MultiGetCallerBuilder multiGet() { - return new MultiGetCallerBuilder(); + public BatchCallerBuilder batch() { + return new BatchCallerBuilder(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 81c806f..5bf6195 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -161,7 +161,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { if (closeScanner) { closeScanner(); } - future.completeExceptionally(new RetriesExhaustedException(tries, exceptions)); + future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions)); } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NONNULL_PARAM_VIOLATION", diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 0b4add1..04e69af 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -120,7 +120,7 @@ class AsyncSingleRequestRpcRetryingCaller { } private void completeExceptionally() { - future.completeExceptionally(new RetriesExhaustedException(tries, exceptions)); + future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions)); } private void onError(Throwable error, Supplier errMsg, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java index a2b5247..19a22c0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java @@ -17,12 +17,16 @@ */ package org.apache.hadoop.hbase.client; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly; +import static org.apache.hadoop.hbase.client.ConnectionUtils.voidBatch; +import static org.apache.hadoop.hbase.client.ConnectionUtils.voidBatchAll; + import com.google.common.base.Preconditions; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; @@ -30,7 +34,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ReflectionUtils; /** * The base interface for asynchronous version of Table. Obtain an instance from a @@ -126,11 +129,7 @@ public interface AsyncTableBase { * be wrapped by a {@link CompletableFuture}. */ default CompletableFuture exists(Get get) { - if (!get.isCheckExistenceOnly()) { - get = ReflectionUtils.newInstance(get.getClass(), get); - get.setCheckExistenceOnly(true); - } - return get(get).thenApply(r -> r.getExists()); + return get(toCheckExistenceOnly(get)).thenApply(r -> r.getExists()); } /** @@ -362,7 +361,9 @@ public interface AsyncTableBase { * @param gets The objects that specify what data to fetch and from which rows. * @return A list of {@link CompletableFuture}s that represent the result for each get. */ - List> get(List gets); + default List> get(List gets) { + return batch(gets); + } /** * A simple version for batch get. It will fail if there are any failures and you will get the @@ -371,8 +372,90 @@ public interface AsyncTableBase { * @return A {@link CompletableFuture} that wrapper the result list. */ default CompletableFuture> getAll(List gets) { - List> futures = get(gets); + return batchAll(gets); + } + + /** + * Test for the existence of columns in the table, as specified by the Gets. + *

+ * This will return a list of booleans. Each value will be true if the related Get matches one or + * more keys, false if not. + *

+ * This is a server-side call so it prevents any data from being transferred to the client. + * @param gets the Gets + * @return A list of {@link CompletableFuture}s that represent the existence for each get. + */ + default List> exists(List gets) { + return get(toCheckExistenceOnly(gets)).stream().map(f -> f.thenApply(r -> r.getExists())) + .collect(toList()); + } + + /** + * A simple version for batch exists. It will fail if there are any failures and you will get the + * whole result boolean list at once if the operation is succeeded. + * @param gets the Gets + * @return A {@link CompletableFuture} that wrapper the result boolean list. + */ + default CompletableFuture> existsAll(List gets) { + return getAll(toCheckExistenceOnly(gets)) + .thenApply(l -> l.stream().map(r -> r.getExists()).collect(toList())); + } + + /** + * Puts some data in the table, in batch. + * @param puts The list of mutations to apply. + * @return A list of {@link CompletableFuture}s that represent the result for each put. + */ + default List> put(List puts) { + return voidBatch(this, puts); + } + + /** + * A simple version of batch put. It will fail if there are any failures. + * @param puts The list of mutations to apply. + * @return A {@link CompletableFuture} that always returns null when complete normally. + */ + default CompletableFuture putAll(List puts) { + return voidBatchAll(this, puts); + } + + /** + * Deletes the specified cells/rows in bulk. + * @param deletes list of things to delete. + * @return A list of {@link CompletableFuture}s that represent the result for each delete. + */ + default List> delete(List deletes) { + return voidBatch(this, deletes); + } + + /** + * A simple version of batch delete. It will fail if there are any failures. + * @param deletes list of things to delete. + * @return A {@link CompletableFuture} that always returns null when complete normally. + */ + default CompletableFuture deleteAll(List deletes) { + return voidBatchAll(this, deletes); + } + + /** + * Method that does a batch call on Deletes, Gets, Puts, Increments and Appends. The ordering of + * execution of the actions is not defined. Meaning if you do a Put and a Get in the same + * {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the Put + * had put. + * @param actions list of Get, Put, Delete, Increment, Append objects + * @return A list of {@link CompletableFuture}s that represent the result for each action. + */ + List> batch(List actions); + + /** + * A simple version of batch. It will fail if there are any failures and you will get the whole + * result list at once if the operation is succeeded. + * @param actions list of Get, Put, Delete, Increment, Append objects + * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}. + */ + default CompletableFuture> batchAll(List actions) { + List> futures = batch(actions); return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) - .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(Collectors.toList())); + .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList())); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index 6cc2551..7281185 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -194,7 +194,7 @@ class AsyncTableImpl implements AsyncTable { } @Override - public List> get(List gets) { - return rawTable.get(gets).stream().map(this::wrap).collect(Collectors.toList()); + public List> batch(List actions) { + return rawTable. batch(actions).stream().map(this::wrap).collect(Collectors.toList()); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index cc27992..4355182 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; @@ -28,6 +29,8 @@ import java.lang.reflect.UndeclaredThrowableException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -49,6 +52,7 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.ipc.RemoteException; /** @@ -59,11 +63,11 @@ public final class ConnectionUtils { private static final Log LOG = LogFactory.getLog(ConnectionUtils.class); - private ConnectionUtils() {} + private ConnectionUtils() { + } /** - * Calculate pause time. - * Built on {@link HConstants#RETRY_BACKOFF}. + * Calculate pause time. Built on {@link HConstants#RETRY_BACKOFF}. * @param pause time to pause * @param tries amount of tries * @return How long to wait after tries retries @@ -83,7 +87,6 @@ public final class ConnectionUtils { return normalPause + jitter; } - /** * Adds / subs an up to 50% jitter to a pause time. Minimum is 1. * @param pause the expected pause. @@ -103,24 +106,23 @@ public final class ConnectionUtils { * @param cnm Replaces the nonce generator used, for testing. * @return old nonce generator. */ - public static NonceGenerator injectNonceGeneratorForTesting( - ClusterConnection conn, NonceGenerator cnm) { + public static NonceGenerator injectNonceGeneratorForTesting(ClusterConnection conn, + NonceGenerator cnm) { return ConnectionImplementation.injectNonceGeneratorForTesting(conn, cnm); } /** - * Changes the configuration to set the number of retries needed when using Connection - * internally, e.g. for updating catalog tables, etc. - * Call this method before we create any Connections. + * Changes the configuration to set the number of retries needed when using Connection internally, + * e.g. for updating catalog tables, etc. Call this method before we create any Connections. * @param c The Configuration instance to set the retries into. * @param log Used to log what we set in here. */ - public static void setServerSideHConnectionRetriesConfig( - final Configuration c, final String sn, final Log log) { + public static void setServerSideHConnectionRetriesConfig(final Configuration c, final String sn, + final Log log) { // TODO: Fix this. Not all connections from server side should have 10 times the retries. int hcRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - // Go big. Multiply by 10. If we can't get to meta after this many retries + // Go big. Multiply by 10. If we can't get to meta after this many retries // then something seriously wrong. int serversideMultiplier = c.getInt("hbase.client.serverside.retries.multiplier", 10); int retries = hcRetries * serversideMultiplier; @@ -141,9 +143,9 @@ public final class ConnectionUtils { * @throws IOException if IO failure occurred */ public static ClusterConnection createShortCircuitConnection(final Configuration conf, - ExecutorService pool, User user, final ServerName serverName, - final AdminService.BlockingInterface admin, final ClientService.BlockingInterface client) - throws IOException { + ExecutorService pool, User user, final ServerName serverName, + final AdminService.BlockingInterface admin, final ClientService.BlockingInterface client) + throws IOException { if (user == null) { user = UserProvider.instantiate(conf).getCurrent(); } @@ -166,8 +168,7 @@ public final class ConnectionUtils { */ @VisibleForTesting public static void setupMasterlessConnection(Configuration conf) { - conf.set(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL, - MasterlessConnection.class.getName()); + conf.set(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL, MasterlessConnection.class.getName()); } /** @@ -175,8 +176,7 @@ public final class ConnectionUtils { * region re-lookups. */ static class MasterlessConnection extends ConnectionImplementation { - MasterlessConnection(Configuration conf, - ExecutorService pool, User user) throws IOException { + MasterlessConnection(Configuration conf, ExecutorService pool, User user) throws IOException { super(conf, pool, user); } @@ -197,8 +197,7 @@ public final class ConnectionUtils { /** * Get a unique key for the rpc stub to the given server. */ - static String getStubKey(String serviceName, ServerName serverName, - boolean hostnameCanChange) { + static String getStubKey(String serviceName, ServerName serverName, boolean hostnameCanChange) { // Sometimes, servers go down and they come back up with the same hostname but a different // IP address. Force a resolution of the rsHostname by trying to instantiate an // InetSocketAddress, and this way we will rightfully get a new stubKey. @@ -327,4 +326,25 @@ public final class ConnectionUtils { // Add a delta to avoid timeout immediately after a retry sleeping. static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1); + + static Get toCheckExistenceOnly(Get get) { + if (get.isCheckExistenceOnly()) { + return get; + } + return ReflectionUtils.newInstance(get.getClass(), get).setCheckExistenceOnly(true); + } + + static List toCheckExistenceOnly(List gets) { + return gets.stream().map(ConnectionUtils::toCheckExistenceOnly).collect(toList()); + } + + static List> voidBatch(AsyncTableBase table, + List actions) { + return table. batch(actions).stream().map(f -> f. thenApply(r -> null)) + .collect(toList()); + } + + static CompletableFuture voidBatchAll(AsyncTableBase table, List actions) { + return table. batchAll(actions).thenApply(r -> null); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 6fad0da..d1035f1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -407,8 +407,8 @@ class RawAsyncTableImpl implements RawAsyncTable { } @Override - public List> get(List gets) { - return conn.callerFactory.multiGet().table(tableName).gets(gets) + public List> batch(List actions) { + return conn.callerFactory.batch().table(tableName).actions(actions) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index cd4712a..54d187d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -52,9 +52,8 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; @@ -668,7 +667,7 @@ public final class RequestConverter { * @throws IOException */ public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName, - final List actions, final List cells, + final Iterable actions, final List cells, final RegionAction.Builder regionActionBuilder, final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java index 3028111..5614d8e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java @@ -22,10 +22,8 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -62,12 +60,10 @@ public abstract class AbstractTestAsyncTableScan { TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); TEST_UTIL.waitTableAvailable(TABLE_NAME); ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); - RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME); - List> futures = new ArrayList<>(); - IntStream.range(0, COUNT).forEach( - i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i))) - .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))))); - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + ASYNC_CONN.getRawTable(TABLE_NAME).putAll(IntStream.range(0, COUNT) + .mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) + .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))) + .collect(Collectors.toList())).get(); } @AfterClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java deleted file mode 100644 index d24501d..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; -import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER; -import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY; -import static org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.TABLES_ON_MASTER; -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Random; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.IntStream; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.io.ByteBufferPool; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Threads; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -/** - * Will split the table, and move region randomly when testing. - */ -@Category({ LargeTests.class, ClientTests.class }) -public class TestAsyncGetMultiThread { - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - private static TableName TABLE_NAME = TableName.valueOf("async"); - - private static byte[] FAMILY = Bytes.toBytes("cf"); - - private static byte[] QUALIFIER = Bytes.toBytes("cq"); - - private static int COUNT = 1000; - - private static AsyncConnection CONN; - - private static byte[][] SPLIT_KEYS; - - @BeforeClass - public static void setUp() throws Exception { - TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none"); - TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L); - TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L); - TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000); - TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100); - TEST_UTIL.startMiniCluster(5); - SPLIT_KEYS = new byte[8][]; - for (int i = 111; i < 999; i += 111) { - SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); - } - TEST_UTIL.createTable(TABLE_NAME, FAMILY); - TEST_UTIL.waitTableAvailable(TABLE_NAME); - CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); - RawAsyncTable table = CONN.getRawTable(TABLE_NAME); - List> futures = new ArrayList<>(); - IntStream.range(0, COUNT) - .forEach(i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i))) - .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))))); - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); - } - - @AfterClass - public static void tearDown() throws Exception { - IOUtils.closeQuietly(CONN); - TEST_UTIL.shutdownMiniCluster(); - } - - private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException { - while (!stop.get()) { - int i = ThreadLocalRandom.current().nextInt(COUNT); - assertEquals(i, - Bytes.toInt( - CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))).get() - .getValue(FAMILY, QUALIFIER))); - } - } - - @Test - public void test() throws IOException, InterruptedException, ExecutionException { - int numThreads = 20; - AtomicBoolean stop = new AtomicBoolean(false); - ExecutorService executor = - Executors.newFixedThreadPool(numThreads, Threads.newDaemonThreadFactory("TestAsyncGet-")); - List> futures = new ArrayList<>(); - IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> { - run(stop); - return null; - }))); - Collections.shuffle(Arrays.asList(SPLIT_KEYS), new Random(123)); - Admin admin = TEST_UTIL.getAdmin(); - for (byte[] splitPoint : SPLIT_KEYS) { - admin.split(TABLE_NAME, splitPoint); - for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME)) { - region.compact(true); - } - Thread.sleep(5000); - admin.balancer(true); - Thread.sleep(5000); - ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); - ServerName newMetaServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() - .map(t -> t.getRegionServer().getServerName()).filter(s -> !s.equals(metaServer)) - .findAny().get(); - admin.move(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), - Bytes.toBytes(newMetaServer.getServerName())); - Thread.sleep(5000); - } - stop.set(true); - executor.shutdown(); - for (Future future : futures) { - future.get(); - } - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java new file mode 100644 index 0000000..308b9e5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +@Category({ LargeTests.class, ClientTests.class }) +public class TestAsyncTableBatch { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] CQ = Bytes.toBytes("cq"); + + private static int COUNT = 1000; + + private static AsyncConnection CONN; + + private static byte[][] SPLIT_KEYS; + + @Parameter(0) + public String tableType; + + @Parameter(1) + public Function tableGetter; + + private static RawAsyncTable getRawTable(TableName tableName) { + return CONN.getRawTable(tableName); + } + + private static AsyncTable getTable(TableName tableName) { + return CONN.getTable(tableName, ForkJoinPool.commonPool()); + } + + @Parameters(name = "{index}: type={0}") + public static List params() { + Function rawTableGetter = TestAsyncTableBatch::getRawTable; + Function tableGetter = TestAsyncTableBatch::getTable; + return Arrays.asList(new Object[] { "raw", rawTableGetter }, + new Object[] { "normal", tableGetter }); + } + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.startMiniCluster(3); + SPLIT_KEYS = new byte[8][]; + for (int i = 111; i < 999; i += 111) { + SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); + } + CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDown() throws Exception { + CONN.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUpBeforeTest() throws IOException, InterruptedException { + TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + } + + @After + public void tearDownAfterTest() throws IOException { + Admin admin = TEST_UTIL.getAdmin(); + if (admin.isTableEnabled(TABLE_NAME)) { + admin.disableTable(TABLE_NAME); + } + admin.deleteTable(TABLE_NAME); + } + + private byte[] getRow(int i) { + return Bytes.toBytes(String.format("%03d", i)); + } + + @Test + public void test() throws InterruptedException, ExecutionException, IOException { + AsyncTableBase table = tableGetter.apply(TABLE_NAME); + table.putAll(IntStream.range(0, COUNT) + .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i))) + .collect(Collectors.toList())).get(); + List results = + table + .getAll(IntStream.range(0, COUNT) + .mapToObj( + i -> Arrays.asList(new Get(getRow(i)), new Get(Arrays.copyOf(getRow(i), 4)))) + .flatMap(l -> l.stream()).collect(Collectors.toList())) + .get(); + assertEquals(2 * COUNT, results.size()); + for (int i = 0; i < COUNT; i++) { + assertEquals(i, Bytes.toInt(results.get(2 * i).getValue(FAMILY, CQ))); + assertTrue(results.get(2 * i + 1).isEmpty()); + } + Admin admin = TEST_UTIL.getAdmin(); + admin.flush(TABLE_NAME); + TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).forEach(r -> { + byte[] startKey = r.getRegionInfo().getStartKey(); + int number = startKey.length == 0 ? 55 : Integer.parseInt(Bytes.toString(startKey)); + byte[] splitPoint = Bytes.toBytes(String.format("%03d", number + 55)); + try { + admin.splitRegion(r.getRegionInfo().getRegionName(), splitPoint); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + // we are not going to test the function of split so no assertion here. Just wait for a while + // and then start our work. + Thread.sleep(5000); + table.deleteAll( + IntStream.range(0, COUNT).mapToObj(i -> new Delete(getRow(i))).collect(Collectors.toList())) + .get(); + results = table + .getAll( + IntStream.range(0, COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList())) + .get(); + assertEquals(COUNT, results.size()); + results.forEach(r -> assertTrue(r.isEmpty())); + } + + @Test + public void testMixed() throws InterruptedException, ExecutionException { + AsyncTableBase table = tableGetter.apply(TABLE_NAME); + table.putAll(IntStream.range(0, 5) + .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(FAMILY, CQ, Bytes.toBytes((long) i))) + .collect(Collectors.toList())).get(); + List actions = new ArrayList<>(); + actions.add(new Get(Bytes.toBytes(0))); + actions.add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, CQ, Bytes.toBytes((long) 2))); + actions.add(new Delete(Bytes.toBytes(2))); + actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1)); + actions.add(new Append(Bytes.toBytes(4)).add(FAMILY, CQ, Bytes.toBytes(4))); + List results = table.batchAll(actions).get(); + assertEquals(5, results.size()); + Result getResult = (Result) results.get(0); + assertEquals(0, Bytes.toLong(getResult.getValue(FAMILY, CQ))); + assertEquals(2, Bytes.toLong(table.get(new Get(Bytes.toBytes(1))).get().getValue(FAMILY, CQ))); + assertTrue(table.get(new Get(Bytes.toBytes(2))).get().isEmpty()); + Result incrementResult = (Result) results.get(3); + assertEquals(4, Bytes.toLong(incrementResult.getValue(FAMILY, CQ))); + Result appendResult = (Result) results.get(4); + byte[] appendValue = appendResult.getValue(FAMILY, CQ); + assertEquals(12, appendValue.length); + assertEquals(4, Bytes.toLong(appendValue)); + assertEquals(4, Bytes.toInt(appendValue, 8)); + } + + public static final class ErrorInjectObserver extends BaseRegionObserver { + + @Override + public void preGetOp(ObserverContext e, Get get, + List results) throws IOException { + if (e.getEnvironment().getRegionInfo().getEndKey().length == 0) { + throw new DoNotRetryRegionException("Inject Error"); + } + } + } + + @Test + public void testPartialSuccess() throws IOException, InterruptedException, ExecutionException { + Admin admin = TEST_UTIL.getAdmin(); + HTableDescriptor htd = admin.getTableDescriptor(TABLE_NAME); + htd.addCoprocessor(ErrorInjectObserver.class.getName()); + admin.modifyTable(TABLE_NAME, htd); + AsyncTableBase table = tableGetter.apply(TABLE_NAME); + table.putAll(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Put(k).addColumn(FAMILY, CQ, k)) + .collect(Collectors.toList())).get(); + List> futures = table + .get(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Get(k)).collect(Collectors.toList())); + for (int i = 0; i < SPLIT_KEYS.length - 1; i++) { + assertArrayEquals(SPLIT_KEYS[i], futures.get(i).get().getValue(FAMILY, CQ)); + } + try { + futures.get(SPLIT_KEYS.length - 1).get(); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java new file mode 100644 index 0000000..da8141b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER; +import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY; +import static org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.TABLES_ON_MASTER; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Will split the table, and move region randomly when testing. + */ +@Category({ LargeTests.class, ClientTests.class }) +public class TestAsyncTableGetMultiThreaded { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] QUALIFIER = Bytes.toBytes("cq"); + + private static int COUNT = 1000; + + private static AsyncConnection CONN; + + private static byte[][] SPLIT_KEYS; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none"); + TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L); + TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L); + TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000); + TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100); + TEST_UTIL.startMiniCluster(5); + SPLIT_KEYS = new byte[8][]; + for (int i = 111; i < 999; i += 111) { + SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); + } + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); + CONN.getRawTable(TABLE_NAME) + .putAll( + IntStream.range(0, COUNT).mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) + .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))).collect(Collectors.toList())) + .get(); + } + + @AfterClass + public static void tearDown() throws Exception { + IOUtils.closeQuietly(CONN); + TEST_UTIL.shutdownMiniCluster(); + } + + private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException { + while (!stop.get()) { + int i = ThreadLocalRandom.current().nextInt(COUNT); + assertEquals(i, + Bytes.toInt( + CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))).get() + .getValue(FAMILY, QUALIFIER))); + } + } + + @Test + public void test() throws IOException, InterruptedException, ExecutionException { + int numThreads = 20; + AtomicBoolean stop = new AtomicBoolean(false); + ExecutorService executor = + Executors.newFixedThreadPool(numThreads, Threads.newDaemonThreadFactory("TestAsyncGet-")); + List> futures = new ArrayList<>(); + IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> { + run(stop); + return null; + }))); + Collections.shuffle(Arrays.asList(SPLIT_KEYS), new Random(123)); + Admin admin = TEST_UTIL.getAdmin(); + for (byte[] splitPoint : SPLIT_KEYS) { + admin.split(TABLE_NAME, splitPoint); + for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME)) { + region.compact(true); + } + Thread.sleep(5000); + admin.balancer(true); + Thread.sleep(5000); + ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); + ServerName newMetaServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() + .map(t -> t.getRegionServer().getServerName()).filter(s -> !s.equals(metaServer)) + .findAny().get(); + admin.move(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), + Bytes.toBytes(newMetaServer.getServerName())); + Thread.sleep(5000); + } + stop.set(true); + executor.shutdown(); + for (Future future : futures) { + future.get(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableMultiGet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableMultiGet.java deleted file mode 100644 index 612e830..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableMultiGet.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ForkJoinPool; -import java.util.function.BiFunction; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; -import org.junit.runners.Parameterized.Parameters; - -@RunWith(Parameterized.class) -@Category({ MediumTests.class, ClientTests.class }) -public class TestAsyncTableMultiGet { - - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - private static TableName TABLE_NAME = TableName.valueOf("async"); - - private static byte[] FAMILY = Bytes.toBytes("cf"); - - private static byte[] CQ = Bytes.toBytes("cq"); - - private static int COUNT = 100; - - private static AsyncConnection ASYNC_CONN; - - @Parameter - public Supplier getTable; - - private static RawAsyncTable getRawTable() { - return ASYNC_CONN.getRawTable(TABLE_NAME); - } - - private static AsyncTable getTable() { - return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); - } - - @Parameters - public static List params() { - return Arrays.asList(new Supplier[] { TestAsyncTableMultiGet::getRawTable }, - new Supplier[] { TestAsyncTableMultiGet::getTable }); - } - - @BeforeClass - public static void setUp() throws Exception { - TEST_UTIL.startMiniCluster(3); - byte[][] splitKeys = new byte[8][]; - for (int i = 11; i < 99; i += 11) { - splitKeys[i / 11 - 1] = Bytes.toBytes(String.format("%02d", i)); - } - TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); - TEST_UTIL.waitTableAvailable(TABLE_NAME); - TEST_UTIL.getAdmin().setBalancerRunning(false, true); - ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); - RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME); - List> futures = new ArrayList<>(); - IntStream.range(0, COUNT).forEach(i -> futures.add(table.put( - new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i))))); - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); - } - - @AfterClass - public static void tearDown() throws Exception { - ASYNC_CONN.close(); - TEST_UTIL.shutdownMiniCluster(); - } - - private void move() throws IOException, InterruptedException { - HRegionServer src = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME); - HRegionServer dst = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() - .map(t -> t.getRegionServer()).filter(r -> r != src).findAny().get(); - Region region = src.getOnlineRegions(TABLE_NAME).stream().findAny().get(); - TEST_UTIL.getAdmin().move(region.getRegionInfo().getEncodedNameAsBytes(), - Bytes.toBytes(dst.getServerName().getServerName())); - Thread.sleep(1000); - } - - private void test(BiFunction, List> getFunc) - throws IOException, InterruptedException { - AsyncTableBase table = getTable.get(); - List gets = - IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(String.format("%02d", i)))) - .collect(Collectors.toList()); - List results = getFunc.apply(table, gets); - assertEquals(COUNT, results.size()); - for (int i = 0; i < COUNT; i++) { - Result result = results.get(i); - assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ))); - } - // test basic failure recovery - move(); - results = getFunc.apply(table, gets); - assertEquals(COUNT, results.size()); - for (int i = 0; i < COUNT; i++) { - Result result = results.get(i); - assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ))); - } - } - - @Test - public void testGet() throws InterruptedException, IOException { - test((table, gets) -> { - return table.get(gets).stream().map(f -> { - try { - return f.get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - }).collect(Collectors.toList()); - }); - - } - - @Test - public void testGetAll() throws InterruptedException, IOException { - test((table, gets) -> { - try { - return table.getAll(gets).get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - }); - } -} -- 2.7.4