From 4391fdd7a558d1dbd15cefbdd8b2fb3240871648 Mon Sep 17 00:00:00 2001 From: chenheng Date: Wed, 11 Nov 2015 15:40:01 +0800 Subject: [PATCH] HBASE-14703 not collect stats when call HTable.mutateRow --- .../apache/hadoop/hbase/client/AsyncProcess.java | 56 +++++++++++++++++----- .../org/apache/hadoop/hbase/client/HTable.java | 25 ++++++++-- .../hbase/client/RpcRetryingCallerFactory.java | 6 --- .../hadoop/hbase/protobuf/ResponseConverter.java | 4 +- .../hadoop/hbase/regionserver/RSRpcServices.java | 4 +- .../hadoop/hbase/client/TestClientPushback.java | 29 +++++++++++ .../hadoop/hbase/client/TestFromClientSide.java | 8 +++- 7 files changed, 103 insertions(+), 29 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index f1fa3eb..06f6e48 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -522,9 +522,13 @@ class AsyncProcess { */ public AsyncRequestFuture submitAll(TableName tableName, List rows, Batch.Callback callback, Object[] results) { - return submitAll(null, tableName, rows, callback, results); + return submitAll(null, tableName, rows, callback, results, null, timeout); } + public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, + List rows, Batch.Callback callback, Object[] results) { + return submitAll(pool, tableName, rows, callback, results, null, timeout); + } /** * Submit immediately the list of rows, whatever the server status. Kept for backward * compatibility: it allows to be used with the batch interface that return an array of objects. @@ -536,7 +540,8 @@ class AsyncProcess { * @param results Optional array to return the results thru; backward compat. */ public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, - List rows, Batch.Callback callback, Object[] results) { + List rows, Batch.Callback callback, Object[] results, + RetryingCallable callable, int curTimeout) { List> actions = new ArrayList>(rows.size()); // The position will be used by the processBatch to match the object array returned. @@ -555,7 +560,8 @@ class AsyncProcess { actions.add(action); } AsyncRequestFutureImpl ars = createAsyncRequestFuture( - tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null); + tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null, + callable, curTimeout); ars.groupAndSendMultiAction(actions, 1); return ars; } @@ -703,15 +709,28 @@ class AsyncProcess { MultiResponse res; MultiServerCallable callable = null; try { - callable = createCallable(server, tableName, multiAction); + if (currentCallable == null) { + callable = createCallable(server, tableName, multiAction); + } try { RpcRetryingCaller caller = createCaller(callable); if (callsInProgress != null) callsInProgress.add(callable); - res = caller.callWithoutRetries(callable, timeout); - - if (res == null) { - // Cancelled - return; + if (callable != null) { + res = caller.callWithoutRetries(callable, timeout); + if (res == null) { + return; + } + } else { + // Currently this path is only for mutateRow and CheckMutateRow. + if (numAttempt > 1 && (numAttempt - 1) * timeout >= currentTimeout) { + actionsInProgress.set(0); + return; + } + res = caller.callWithoutRetries(currentCallable, timeout); + if (res == null || res.getResults().size() == 0) { + actionsInProgress.set(0); + return; + } } } catch (IOException e) { @@ -768,10 +787,12 @@ class AsyncProcess { private final int[] replicaGetIndices; private final boolean hasAnyReplicaGets; private final long nonceGroup; + private RetryingCallable currentCallable; + private int currentTimeout; public AsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, ExecutorService pool, boolean needResults, Object[] results, - Batch.Callback callback) { + Batch.Callback callback, RetryingCallable callable, int timeout) { this.pool = pool; this.callback = callback; this.nonceGroup = nonceGroup; @@ -837,6 +858,8 @@ class AsyncProcess { this.errorsByServer = createServerErrorTracker(); this.errors = (globalErrors != null) ? globalErrors : new BatchErrors(); + this.currentCallable = callable; + this.currentTimeout = timeout; } public Set> getCallsInProgress() { @@ -1338,7 +1361,6 @@ class AsyncProcess { } } } - if (toReplay.isEmpty()) { logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped); } else { @@ -1627,13 +1649,21 @@ class AsyncProcess { } } + protected AsyncRequestFutureImpl createAsyncRequestFuture( + TableName tableName, List> actions, long nonceGroup, ExecutorService pool, + Batch.Callback callback, Object[] results, boolean needResults, + RetryingCallable callable, int curTimeout) { + return new AsyncRequestFutureImpl( + tableName, actions, nonceGroup, getPool(pool), needResults, results, callback, callable, curTimeout); + } + @VisibleForTesting /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */ protected AsyncRequestFutureImpl createAsyncRequestFuture( TableName tableName, List> actions, long nonceGroup, ExecutorService pool, Batch.Callback callback, Object[] results, boolean needResults) { - return new AsyncRequestFutureImpl( - tableName, actions, nonceGroup, getPool(pool), needResults, results, callback); + return createAsyncRequestFuture( + tableName, actions, nonceGroup, pool, callback, results, needResults, null, timeout); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 51a95e4..a51f2f4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; @@ -586,10 +587,11 @@ public class HTable implements HTableInterface { */ @Override public void mutateRow(final RowMutations rm) throws IOException { - RegionServerCallable callable = - new RegionServerCallable(connection, getName(), rm.getRow()) { + + RegionServerCallable callable = + new RegionServerCallable(connection, getName(), rm.getRow()) { @Override - public Void call(int callTimeout) throws IOException { + public MultiResponse call(int callTimeout) throws IOException { PayloadCarryingRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); controller.setCallTimeout(callTimeout); @@ -608,13 +610,26 @@ public class HTable implements HTableInterface { } throw new IOException("Failed to mutate row: "+Bytes.toStringBinary(rm.getRow()), ex); } + + if (response.getRegionActionResultCount() == 1 && + response.getRegionActionResult(0).getResultOrExceptionCount() == 0) { + // Currently If there is no statistic tracker, server will return empty RegionActionResult. + // There is nothing in it. So we just return null. + return null; + } + return ResponseConverter.getResults(request, response, controller.cellScanner()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } - return null; } }; - rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + + AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), + null, null, callable, operationTimeout); + ars.waitUntilDone(); + if (ars.hasError()) { + throw ars.getErrors(); + } } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 0af8210..550812f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -67,12 +67,6 @@ public class RpcRetryingCallerFactory { // is cheap as it does not require parsing a complex structure. RpcRetryingCaller caller = new RpcRetryingCallerImpl(pause, retries, interceptor, startLogErrorsCnt); - - // wrap it with stats, if we are tracking them - if (enableBackPressure && this.stats != null) { - caller = new StatsTrackingRpcRetryingCaller(caller, this.stats); - } - return caller; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java index 177b1c7..f1c8797 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java @@ -89,7 +89,7 @@ public final class ResponseConverter { int requestRegionActionCount = request.getRegionActionCount(); int responseRegionActionResultCount = response.getRegionActionResultCount(); if (requestRegionActionCount != responseRegionActionResultCount) { - throw new IllegalStateException("Request mutation count=" + responseRegionActionResultCount + + throw new IllegalStateException("Request mutation count=" + requestRegionActionCount + " does not match response mutation result count=" + responseRegionActionResultCount); } @@ -123,7 +123,7 @@ public final class ResponseConverter { Object responseValue; if (roe.hasException()) { responseValue = ProtobufUtil.toException(roe.getException()); - } else if (roe.hasResult()) { + } else if (roe.hasResult() || roe.hasLoadStats()) { responseValue = ProtobufUtil.toResult(roe.getResult(), cells); // add the load stats, if we got any if (roe.hasLoadStats()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 5729334..e3df85b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2147,8 +2147,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, cellScanner); // add the stats to the request if(stats != null) { - responseBuilder.addRegionActionResult(RegionActionResult.newBuilder() - .addResultOrException(ResultOrException.newBuilder().setLoadStats(stats))); + regionActionResultBuilder + .addResultOrException(ResultOrException.newBuilder().setLoadStats(stats)); } processed = Boolean.TRUE; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java index 1efbe05..db1e490 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -165,4 +166,32 @@ public class TestClientPushback { assertNotEquals("AsyncProcess did not submit the work time", endTime.get(), 0); assertTrue("AsyncProcess did not delay long enough", endTime.get() - startTime >= backoffTime); } + + @Test + public void testMutateRowStats() throws IOException { + Configuration conf = UTIL.getConfiguration(); + ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(conf); + HTable table = (HTable) conn.getTable(tableName); + HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0); + Region region = rs.getOnlineRegions(tableName).get(0); + + RowMutations mutations = new RowMutations(Bytes.toBytes("row")); + Put p = new Put(Bytes.toBytes("row")); + p.addColumn(family, qualifier, Bytes.toBytes("value2")); + mutations.add(p); + table.mutateRow(mutations); + + ServerStatisticTracker stats = conn.getStatisticsTracker(); + assertNotNull( "No stats configured for the client!", stats); + // get the names so we can query the stats + ServerName server = rs.getServerName(); + byte[] regionName = region.getRegionInfo().getRegionName(); + + // check to see we found some load on the memstore + ServerStatistics serverStats = stats.getServerStatsForTesting(server); + ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName); + + assertNotNull(regionStats); + assertTrue(regionStats.getMemstoreLoadPercent() > 0); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 8734aea..188b0ce 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -4365,7 +4365,13 @@ public class TestFromClientSide { arm.add(p); t.mutateRow(arm); fail("Expected NoSuchColumnFamilyException"); - } catch(NoSuchColumnFamilyException e) { + } catch(RetriesExhaustedWithDetailsException e) { + for(Throwable rootCause: e.getCauses()){ + if(rootCause instanceof NoSuchColumnFamilyException){ + return; + } + } + throw e; } } -- 1.9.3 (Apple Git-50)