From 1c8e771987a349088f7b6cf853606d7dbdd7e836 Mon Sep 17 00:00:00 2001 From: chenheng Date: Wed, 18 Nov 2015 23:29:03 +0800 Subject: [PATCH] HBASE-14703 not collect stats when call HTable.mutateRow --- .../apache/hadoop/hbase/client/AsyncProcess.java | 79 +++++++++++++++------- .../org/apache/hadoop/hbase/client/HTable.java | 39 ++++++++--- .../hadoop/hbase/client/MultiServerCallable.java | 17 +---- .../client/PayloadCarryingServerCallable.java | 47 +++++++++++++ .../hadoop/hbase/client/RetryingTimeTracker.java | 56 +++++++++++++++ .../hbase/client/RpcRetryingCallerFactory.java | 6 -- .../hadoop/hbase/client/RpcRetryingCallerImpl.java | 28 ++------ .../hadoop/hbase/protobuf/ResponseConverter.java | 4 +- .../hadoop/hbase/client/TestAsyncProcess.java | 11 +-- .../hadoop/hbase/regionserver/RSRpcServices.java | 4 +- .../hadoop/hbase/client/TestClientPushback.java | 29 ++++++++ .../hadoop/hbase/client/TestFromClientSide.java | 8 ++- .../hadoop/hbase/client/TestReplicasClient.java | 5 +- 13 files changed, 246 insertions(+), 87 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index f1fa3eb..671b6a7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -522,9 +522,13 @@ class AsyncProcess { */ public AsyncRequestFuture submitAll(TableName tableName, List rows, Batch.Callback callback, Object[] results) { - return submitAll(null, tableName, rows, callback, results); + return submitAll(null, tableName, rows, callback, results, null, timeout); } + public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, + List rows, Batch.Callback callback, Object[] results) { + return submitAll(pool, tableName, rows, callback, results, null, timeout); + } /** * Submit immediately the list of rows, whatever the server status. Kept for backward * compatibility: it allows to be used with the batch interface that return an array of objects. @@ -536,7 +540,8 @@ class AsyncProcess { * @param results Optional array to return the results thru; backward compat. */ public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, - List rows, Batch.Callback callback, Object[] results) { + List rows, Batch.Callback callback, Object[] results, + PayloadCarryingServerCallable callable, int curTimeout) { List> actions = new ArrayList>(rows.size()); // The position will be used by the processBatch to match the object array returned. @@ -555,7 +560,8 @@ class AsyncProcess { actions.add(action); } AsyncRequestFutureImpl ars = createAsyncRequestFuture( - tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null); + tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null, + callable, curTimeout); ars.groupAndSendMultiAction(actions, 1); return ars; } @@ -687,11 +693,11 @@ class AsyncProcess { private final MultiAction multiAction; private final int numAttempt; private final ServerName server; - private final Set> callsInProgress; + private final Set callsInProgress; private SingleServerRequestRunnable( MultiAction multiAction, int numAttempt, ServerName server, - Set> callsInProgress) { + Set callsInProgress) { this.multiAction = multiAction; this.numAttempt = numAttempt; this.server = server; @@ -701,19 +707,28 @@ class AsyncProcess { @Override public void run() { MultiResponse res; - MultiServerCallable callable = null; + PayloadCarryingServerCallable callable = currentCallable; try { - callable = createCallable(server, tableName, multiAction); + // setup the callable based on the actions, if we don't have one already from the request + if (callable == null) { + callable = createCallable(server, tableName, multiAction); + } + RpcRetryingCaller caller = createCaller(callable); try { - RpcRetryingCaller caller = createCaller(callable); - if (callsInProgress != null) callsInProgress.add(callable); - res = caller.callWithoutRetries(callable, timeout); - - if (res == null) { - // Cancelled - return; - } - + // we only track these callables b/c they are cancelable. The other + if (callsInProgress != null) callsInProgress.add(callable); + res = caller.callWithoutRetries(callable, currentCallTotalTimeout); + if (currentCallable != null) { + if (res == null|| res.getResults().size() == 0) { + // As mutateRow, if statistic off, nothing will return. So let's stop. + actionsInProgress.set(0); + return; + } + } else { + if (res == null) { + return; + } + } } catch (IOException e) { // The service itself failed . It may be an error coming from the communication // layer, but, as well, a functional error raised by the server. @@ -747,7 +762,7 @@ class AsyncProcess { private final BatchErrors errors; private final ConnectionImplementation.ServerErrorTracker errorsByServer; private final ExecutorService pool; - private final Set> callsInProgress; + private final Set callsInProgress; private final TableName tableName; @@ -768,10 +783,12 @@ class AsyncProcess { private final int[] replicaGetIndices; private final boolean hasAnyReplicaGets; private final long nonceGroup; + private PayloadCarryingServerCallable currentCallable; + private int currentCallTotalTimeout; public AsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, ExecutorService pool, boolean needResults, Object[] results, - Batch.Callback callback) { + Batch.Callback callback, PayloadCarryingServerCallable callable, int timeout) { this.pool = pool; this.callback = callback; this.nonceGroup = nonceGroup; @@ -833,13 +850,16 @@ class AsyncProcess { this.replicaGetIndices = null; } this.callsInProgress = !hasAnyReplicaGets ? null : - Collections.newSetFromMap(new ConcurrentHashMap, Boolean>()); + Collections.newSetFromMap( + new ConcurrentHashMap()); this.errorsByServer = createServerErrorTracker(); this.errors = (globalErrors != null) ? globalErrors : new BatchErrors(); + this.currentCallable = callable; + this.currentCallTotalTimeout = timeout; } - public Set> getCallsInProgress() { + public Set getCallsInProgress() { return callsInProgress; } @@ -1338,7 +1358,6 @@ class AsyncProcess { } } } - if (toReplay.isEmpty()) { logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped); } else { @@ -1570,7 +1589,7 @@ class AsyncProcess { throw new InterruptedIOException(iex.getMessage()); } finally { if (callsInProgress != null) { - for (MultiServerCallable clb : callsInProgress) { + for (PayloadCarryingServerCallable clb : callsInProgress) { clb.cancel(); } } @@ -1627,13 +1646,22 @@ class AsyncProcess { } } + protected AsyncRequestFutureImpl createAsyncRequestFuture( + TableName tableName, List> actions, long nonceGroup, ExecutorService pool, + Batch.Callback callback, Object[] results, boolean needResults, + PayloadCarryingServerCallable callable, int curTimeout) { + return new AsyncRequestFutureImpl( + tableName, actions, nonceGroup, getPool(pool), needResults, + results, callback, callable, curTimeout); + } + @VisibleForTesting /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */ protected AsyncRequestFutureImpl createAsyncRequestFuture( TableName tableName, List> actions, long nonceGroup, ExecutorService pool, Batch.Callback callback, Object[] results, boolean needResults) { - return new AsyncRequestFutureImpl( - tableName, actions, nonceGroup, getPool(pool), needResults, results, callback); + return createAsyncRequestFuture( + tableName, actions, nonceGroup, pool, callback, results, needResults, null, timeout); } /** @@ -1649,7 +1677,8 @@ class AsyncProcess { * Create a caller. Isolated to be easily overridden in the tests. */ @VisibleForTesting - protected RpcRetryingCaller createCaller(MultiServerCallable callable) { + protected RpcRetryingCaller createCaller( + PayloadCarryingServerCallable callable) { return rpcCallerFactory. newCaller(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 51a95e4..5333aa0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; @@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; @@ -262,7 +264,8 @@ public class HTable implements HTableInterface { */ @Override public HTableDescriptor getTableDescriptor() throws IOException { - HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, operationTimeout); + HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, + rpcCallerFactory, operationTimeout); if (htd != null) { return new UnmodifyableHTableDescriptor(htd); } @@ -586,13 +589,19 @@ public class HTable implements HTableInterface { */ @Override public void mutateRow(final RowMutations rm) throws IOException { - RegionServerCallable callable = - new RegionServerCallable(connection, getName(), rm.getRow()) { + final RetryingTimeTracker tracker = new RetryingTimeTracker(); + PayloadCarryingServerCallable callable = + new PayloadCarryingServerCallable(connection, getName(), rm.getRow(), + rpcControllerFactory) { @Override - public Void call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + public MultiResponse call(int callTimeout) throws IOException { + tracker.start(); controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); + int remainingTime = tracker.getRemainingTime(callTimeout); + if (remainingTime == 0) { + throw new DoNotRetryIOException("Timeout for mutate row"); + } + controller.setCallTimeout(remainingTime); try { RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( getLocation().getRegionInfo().getRegionName(), rm); @@ -608,13 +617,27 @@ public class HTable implements HTableInterface { } throw new IOException("Failed to mutate row: "+Bytes.toStringBinary(rm.getRow()), ex); } + + if (response.getRegionActionResultCount() == 1 && + response.getRegionActionResult(0).getResultOrExceptionCount() == 0) { + // Currently If there is no statistic tracker, + // server will return empty RegionActionResult. + // There is nothing in it. So we just return null. + return null; + } + return ResponseConverter.getResults(request, response, controller.cellScanner()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } - return null; } }; - rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + + AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), + null, null, callable, operationTimeout); + ars.waitUntilDone(); + if (ars.hasError()) { + throw ars.getErrors(); + } } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 72ae829..8e5db03 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -51,21 +50,19 @@ import com.google.protobuf.ServiceException; * {@link RegionServerCallable} that goes against multiple regions. * @param */ -class MultiServerCallable extends RegionServerCallable implements Cancellable { +class MultiServerCallable extends PayloadCarryingServerCallable { private final MultiAction multiAction; private final boolean cellBlock; - private final PayloadCarryingRpcController controller; MultiServerCallable(final ClusterConnection connection, final TableName tableName, final ServerName location, RpcControllerFactory rpcFactory, final MultiAction multi) { - super(connection, tableName, null); + super(connection, tableName, null, rpcFactory); this.multiAction = multi; // RegionServerCallable has HRegionLocation field, but this is a multi-region request. // Using region info from parent HRegionLocation would be a mistake for this class; so // we will store the server here, and throw if someone tries to obtain location/regioninfo. this.location = new HRegionLocation(null, location); this.cellBlock = isCellBlock(); - controller = rpcFactory.newController(); } @Override @@ -134,16 +131,6 @@ class MultiServerCallable extends RegionServerCallable impleme return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner()); } - @Override - public void cancel() { - controller.startCancel(); - } - - @Override - public boolean isCancelled() { - return controller.isCanceled(); - } - /** * @return True if we should send data in cellblocks. This is an expensive call. Cache the * result if you can rather than call each time. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java new file mode 100644 index 0000000..a5c34ba --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; + +/** + * + */ +@InterfaceAudience.Private +public abstract class PayloadCarryingServerCallable + extends RegionServerCallable implements Cancellable { + protected PayloadCarryingRpcController controller; + + public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row, + RpcControllerFactory rpcControllerFactory) { + super(connection, tableName, row); + this.controller = rpcControllerFactory.newController(); + } + + @Override + public void cancel() { + controller.startCancel(); + } + + @Override + public boolean isCancelled() { + return controller.isCanceled(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java new file mode 100644 index 0000000..c223b3e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * Tracks the amount of time remaining for an operation. + */ +class RetryingTimeTracker { + + private long globalStartTime = -1; + + public void start() { + if (this.globalStartTime < 0) { + this.globalStartTime = EnvironmentEdgeManager.currentTime(); + } + } + + public int getRemainingTime(int callTimeout) { + if (callTimeout <= 0) { + return 0; + } else { + if (callTimeout == Integer.MAX_VALUE) + return Integer.MAX_VALUE; + int remainingTime = (int) ( + callTimeout - + (EnvironmentEdgeManager.currentTime() - this.globalStartTime)); + if (remainingTime < 1) { + // If there is no time left, we're trying anyway. It's too late. + // 0 means no timeout, and it's not the intent here. So we secure both cases by + // resetting to the minimum. + remainingTime = 1; + } + return remainingTime; + } + } + + public long getStartTime() { + return this.globalStartTime; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 0af8210..550812f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -67,12 +67,6 @@ public class RpcRetryingCallerFactory { // is cheap as it does not require parsing a complex structure. RpcRetryingCaller caller = new RpcRetryingCallerImpl(pause, retries, interceptor, startLogErrorsCnt); - - // wrap it with stats, if we are tracking them - if (enableBackPressure && this.stats != null) { - caller = new StatsTrackingRpcRetryingCaller(caller, this.stats); - } - return caller; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java index 12abc6a..69a533d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java @@ -64,6 +64,7 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { private final AtomicBoolean cancelled = new AtomicBoolean(false); private final RetryingCallerInterceptor interceptor; private final RetryingCallerInterceptorContext context; + private final RetryingTimeTracker tracker; public RpcRetryingCallerImpl(long pause, int retries, int startLogErrorsCnt) { this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt); @@ -76,23 +77,7 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { this.interceptor = interceptor; context = interceptor.createEmptyContext(); this.startLogErrorsCnt = startLogErrorsCnt; - } - - private int getRemainingTime(int callTimeout) { - if (callTimeout <= 0) { - return 0; - } else { - if (callTimeout == Integer.MAX_VALUE) return Integer.MAX_VALUE; - int remainingTime = (int) (callTimeout - - (EnvironmentEdgeManager.currentTime() - this.globalStartTime)); - if (remainingTime < 1) { - // If there is no time left, we're trying anyway. It's too late. - // 0 means no timeout, and it's not the intent here. So we secure both cases by - // resetting to the minimum. - remainingTime = 1; - } - return remainingTime; - } + this.tracker = new RetryingTimeTracker(); } @Override @@ -108,14 +93,14 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { throws IOException, RuntimeException { List exceptions = new ArrayList(); - this.globalStartTime = EnvironmentEdgeManager.currentTime(); + tracker.start(); context.clear(); for (int tries = 0;; tries++) { long expectedSleep; try { callable.prepare(tries != 0); // if called with false, check table status on ZK interceptor.intercept(context.prepare(callable, tries)); - return callable.call(getRemainingTime(callTimeout)); + return callable.call(tracker.getRemainingTime(callTimeout)); } catch (PreemptiveFastFailException e) { throw e; } catch (Throwable t) { @@ -172,14 +157,13 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { * @return Calculate how long a single call took */ private long singleCallDuration(final long expectedSleep) { - return (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + expectedSleep; + return (EnvironmentEdgeManager.currentTime() - tracker.getStartTime()) + expectedSleep; } @Override public T callWithoutRetries(RetryingCallable callable, int callTimeout) throws IOException, RuntimeException { // The code of this method should be shared with withRetries. - this.globalStartTime = EnvironmentEdgeManager.currentTime(); try { callable.prepare(false); return callable.call(callTimeout); @@ -231,7 +215,7 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { @Override public String toString() { - return "RpcRetryingCaller{" + "globalStartTime=" + globalStartTime + + return "RpcRetryingCaller{" + "globalStartTime=" + tracker.getStartTime() + ", pause=" + pause + ", maxAttempts=" + maxAttempts + '}'; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java index 177b1c7..f1c8797 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java @@ -89,7 +89,7 @@ public final class ResponseConverter { int requestRegionActionCount = request.getRegionActionCount(); int responseRegionActionResultCount = response.getRegionActionResultCount(); if (requestRegionActionCount != responseRegionActionResultCount) { - throw new IllegalStateException("Request mutation count=" + responseRegionActionResultCount + + throw new IllegalStateException("Request mutation count=" + requestRegionActionCount + " does not match response mutation result count=" + responseRegionActionResultCount); } @@ -123,7 +123,7 @@ public final class ResponseConverter { Object responseValue; if (roe.hasException()) { responseValue = ProtobufUtil.toException(roe.getException()); - } else if (roe.hasResult()) { + } else if (roe.hasResult() || roe.hasLoadStats()) { responseValue = ProtobufUtil.toResult(roe.getResult(), cells); // add the load stats, if we got any if (roe.hasLoadStats()) { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 067f2ad..2022c12 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -184,10 +184,11 @@ public class TestAsyncProcess { } @Override - protected RpcRetryingCaller createCaller(MultiServerCallable callable) { + protected RpcRetryingCaller createCaller(PayloadCarryingServerCallable callable) { callsCt.incrementAndGet(); + MultiServerCallable callable1 = (MultiServerCallable) callable; final MultiResponse mr = createMultiResponse( - callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() { + callable1.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() { @Override public void addResponse(MultiResponse mr, byte[] regionName, Action a) { if (Arrays.equals(FAILS, a.getAction().getRow())) { @@ -237,7 +238,8 @@ public class TestAsyncProcess { } @Override - protected RpcRetryingCaller createCaller(MultiServerCallable callable) { + protected RpcRetryingCaller createCaller( + PayloadCarryingServerCallable callable) { callsCt.incrementAndGet(); return new CallerWithFailure(); } @@ -274,7 +276,8 @@ public class TestAsyncProcess { @Override protected RpcRetryingCaller createCaller( - MultiServerCallable callable) { + PayloadCarryingServerCallable payloadCallable) { + MultiServerCallable callable = (MultiServerCallable) payloadCallable; final MultiResponse mr = createMultiResponse( callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() { @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 5729334..e3df85b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2147,8 +2147,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, cellScanner); // add the stats to the request if(stats != null) { - responseBuilder.addRegionActionResult(RegionActionResult.newBuilder() - .addResultOrException(ResultOrException.newBuilder().setLoadStats(stats))); + regionActionResultBuilder + .addResultOrException(ResultOrException.newBuilder().setLoadStats(stats)); } processed = Boolean.TRUE; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java index 1efbe05..db1e490 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -165,4 +166,32 @@ public class TestClientPushback { assertNotEquals("AsyncProcess did not submit the work time", endTime.get(), 0); assertTrue("AsyncProcess did not delay long enough", endTime.get() - startTime >= backoffTime); } + + @Test + public void testMutateRowStats() throws IOException { + Configuration conf = UTIL.getConfiguration(); + ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(conf); + HTable table = (HTable) conn.getTable(tableName); + HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0); + Region region = rs.getOnlineRegions(tableName).get(0); + + RowMutations mutations = new RowMutations(Bytes.toBytes("row")); + Put p = new Put(Bytes.toBytes("row")); + p.addColumn(family, qualifier, Bytes.toBytes("value2")); + mutations.add(p); + table.mutateRow(mutations); + + ServerStatisticTracker stats = conn.getStatisticsTracker(); + assertNotNull( "No stats configured for the client!", stats); + // get the names so we can query the stats + ServerName server = rs.getServerName(); + byte[] regionName = region.getRegionInfo().getRegionName(); + + // check to see we found some load on the memstore + ServerStatistics serverStats = stats.getServerStatsForTesting(server); + ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName); + + assertNotNull(regionStats); + assertTrue(regionStats.getMemstoreLoadPercent() > 0); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 8734aea..188b0ce 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -4365,7 +4365,13 @@ public class TestFromClientSide { arm.add(p); t.mutateRow(arm); fail("Expected NoSuchColumnFamilyException"); - } catch(NoSuchColumnFamilyException e) { + } catch(RetriesExhaustedWithDetailsException e) { + for(Throwable rootCause: e.getCauses()){ + if(rootCause instanceof NoSuchColumnFamilyException){ + return; + } + } + throw e; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java index d2e775d..9690a89 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java @@ -573,10 +573,11 @@ public class TestReplicasClient { Assert.assertTrue(((Result)r).isStale()); Assert.assertTrue(((Result)r).getExists()); } - Set> set = ((AsyncRequestFutureImpl)reqs).getCallsInProgress(); + Set set = + ((AsyncRequestFutureImpl)reqs).getCallsInProgress(); // verify we did cancel unneeded calls Assert.assertTrue(!set.isEmpty()); - for (MultiServerCallable m : set) { + for (PayloadCarryingServerCallable m : set) { Assert.assertTrue(m.isCancelled()); } } finally { -- 2.3.8 (Apple Git-58)