From 4f3492faf4b1b80d0fa434ace5bd42c484cc714e Mon Sep 17 00:00:00 2001 From: chenheng Date: Thu, 5 Nov 2015 15:49:02 +0800 Subject: [PATCH] HBASE-14703 not collect stats when call HTable.mutateRow --- .../apache/hadoop/hbase/client/AsyncProcess.java | 66 +++++++++++++++++----- .../org/apache/hadoop/hbase/client/HTable.java | 46 +++++++++++---- .../apache/hadoop/hbase/client/MultiResponse.java | 11 ++++ .../hadoop/hbase/client/MultiServerCallable.java | 1 + .../hbase/client/RpcRetryingCallerFactory.java | 6 -- .../hadoop/hbase/protobuf/ResponseConverter.java | 2 +- .../hadoop/hbase/client/TestAsyncProcess.java | 5 +- .../hadoop/hbase/client/TestCheckAndMutate.java | 8 ++- .../hadoop/hbase/client/TestFromClientSide.java | 8 ++- 9 files changed, 119 insertions(+), 34 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 7c7fc3e..317c754 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -124,6 +124,7 @@ class AsyncProcess { public Object[] getResults() throws InterruptedIOException; /** Wait until all tasks are executed, successfully or not. */ public void waitUntilDone() throws InterruptedIOException; + public boolean getProcessed(); } /** Return value from a submit that didn't contain any requests. */ @@ -139,6 +140,11 @@ class AsyncProcess { public Object[] getResults() { return result; } @Override public void waitUntilDone() throws InterruptedIOException {} + + @Override + public boolean getProcessed() { + return true; + } }; /** Sync point for calls to multiple replicas for the same user request (Get). @@ -413,7 +419,7 @@ class AsyncProcess { List locationErrorRows, Map> actionsByServer, ExecutorService pool) { AsyncRequestFutureImpl ars = createAsyncRequestFuture( - tableName, retainedActions, nonceGroup, pool, callback, results, needResults); + tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, timeout); // Add location errors if any if (locationErrors != null) { for (int i = 0; i < locationErrors.size(); ++i) { @@ -522,9 +528,13 @@ class AsyncProcess { */ public AsyncRequestFuture submitAll(TableName tableName, List rows, Batch.Callback callback, Object[] results) { - return submitAll(null, tableName, rows, callback, results); + return submitAll(null, tableName, rows, callback, results, null, timeout); } + public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, + List rows, Batch.Callback callback, Object[] results) { + return submitAll(pool, tableName, rows, callback, results, null, timeout); + } /** * Submit immediately the list of rows, whatever the server status. Kept for backward * compatibility: it allows to be used with the batch interface that return an array of objects. @@ -536,7 +546,8 @@ class AsyncProcess { * @param results Optional array to return the results thru; backward compat. */ public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, - List rows, Batch.Callback callback, Object[] results) { + List rows, Batch.Callback callback, Object[] results, + RetryingCallable callable, int curTimeout) { List> actions = new ArrayList>(rows.size()); // The position will be used by the processBatch to match the object array returned. @@ -555,7 +566,8 @@ class AsyncProcess { actions.add(action); } AsyncRequestFutureImpl ars = createAsyncRequestFuture( - tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null); + tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null, + callable, curTimeout); ars.groupAndSendMultiAction(actions, 1); return ars; } @@ -703,15 +715,30 @@ class AsyncProcess { MultiResponse res; MultiServerCallable callable = null; try { - callable = createCallable(server, tableName, multiAction); + if (currentCallable == null) { + callable = createCallable(server, tableName, multiAction); + } try { RpcRetryingCaller caller = createCaller(callable); if (callsInProgress != null) callsInProgress.add(callable); - res = caller.callWithoutRetries(callable, timeout); - - if (res == null) { - // Cancelled - return; + if (callable != null) { + res = caller.callWithoutRetries(callable, timeout); + if (res == null) { + return; + } + } else { + // Currently this path is only for mutateRow and CheckMutateRow. + res = caller.callWithRetries(currentCallable, currentTimeout); + // If statistic Off, we no need to receive multi action. + if (connection.getStatisticsTracker() == null && + (res == null || res.getResults().size() == 0)) { + // because we call with retries, so no need to wait any more. + if (res != null) { + processed = res.isProcessed(); + } + actionsInProgress.set(0); + return; + } } } catch (IOException e) { @@ -768,10 +795,13 @@ class AsyncProcess { private final int[] replicaGetIndices; private final boolean hasAnyReplicaGets; private final long nonceGroup; + private RetryingCallable currentCallable; + private int currentTimeout; + private boolean processed; public AsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, ExecutorService pool, boolean needResults, Object[] results, - Batch.Callback callback) { + Batch.Callback callback, RetryingCallable callable, int timeout) { this.pool = pool; this.callback = callback; this.nonceGroup = nonceGroup; @@ -837,6 +867,8 @@ class AsyncProcess { this.errorsByServer = createServerErrorTracker(); this.errors = (globalErrors != null) ? globalErrors : new BatchErrors(); + this.currentCallable = callable; + this.currentTimeout = timeout; } public Set> getCallsInProgress() { @@ -1322,6 +1354,8 @@ class AsyncProcess { } } + this.processed = responses.isProcessed(); + if (toReplay.isEmpty()) { logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped); } else { @@ -1560,6 +1594,11 @@ class AsyncProcess { } } + @Override + public boolean getProcessed() { + return processed; + } + private boolean waitUntilDone(long cutoff) throws InterruptedException { boolean hasWait = cutoff != Long.MAX_VALUE; long lastLog = EnvironmentEdgeManager.currentTime(); @@ -1614,9 +1653,10 @@ class AsyncProcess { /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */ protected AsyncRequestFutureImpl createAsyncRequestFuture( TableName tableName, List> actions, long nonceGroup, ExecutorService pool, - Batch.Callback callback, Object[] results, boolean needResults) { + Batch.Callback callback, Object[] results, boolean needResults, + RetryingCallable callable, int curTimeout) { return new AsyncRequestFutureImpl( - tableName, actions, nonceGroup, getPool(pool), needResults, results, callback); + tableName, actions, nonceGroup, getPool(pool), needResults, results, callback, callable, curTimeout); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 51a95e4..80fc2ed 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; @@ -586,10 +587,10 @@ public class HTable implements HTableInterface { */ @Override public void mutateRow(final RowMutations rm) throws IOException { - RegionServerCallable callable = - new RegionServerCallable(connection, getName(), rm.getRow()) { + RegionServerCallable callable = + new RegionServerCallable(connection, getName(), rm.getRow()) { @Override - public Void call(int callTimeout) throws IOException { + public MultiResponse call(int callTimeout) throws IOException { PayloadCarryingRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); controller.setCallTimeout(callTimeout); @@ -608,13 +609,22 @@ public class HTable implements HTableInterface { } throw new IOException("Failed to mutate row: "+Bytes.toStringBinary(rm.getRow()), ex); } + if (((ClusterConnection)connection).getStatisticsTracker() == null) { + return null; + } + return ResponseConverter.getResults(request, response, controller.cellScanner()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } - return null; } }; - rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + + AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), + null, null, callable, operationTimeout); + ars.waitUntilDone(); + if (ars.hasError()) { + throw ars.getErrors(); + } } /** @@ -861,10 +871,10 @@ public class HTable implements HTableInterface { public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final RowMutations rm) throws IOException { - RegionServerCallable callable = - new RegionServerCallable(connection, getName(), row) { + RegionServerCallable callable = + new RegionServerCallable(connection, getName(), row) { @Override - public Boolean call(int callTimeout) throws IOException { + public MultiResponse call(int callTimeout) throws IOException { PayloadCarryingRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); controller.setCallTimeout(callTimeout); @@ -883,13 +893,29 @@ public class HTable implements HTableInterface { throw new IOException("Failed to checkAndMutate row: "+ Bytes.toStringBinary(rm.getRow()), ex); } - return Boolean.valueOf(response.getProcessed()); + if (((ClusterConnection)connection).getStatisticsTracker() == null) { + // If statisitcs is off, the action number will not equals resultOrExceptoins number. + // Exception will be thrown if we use ResponseConverter.getResults, + // So we do convert here. + org.apache.hadoop.hbase.client.MultiResponse results = + new org.apache.hadoop.hbase.client.MultiResponse(); + results.setProcessed(response.getProcessed()); + return results; + } + return ResponseConverter.getResults(request, response, controller.cellScanner()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } } }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + //return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), + null, null, callable, operationTimeout); + ars.waitUntilDone(); + if (ars.hasError()) { + throw ars.getErrors(); + } + return ars.getProcessed(); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java index 089ccff..23dbf90 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java @@ -43,6 +43,8 @@ public class MultiResponse { private Map exceptions = new TreeMap(Bytes.BYTES_COMPARATOR); + private boolean processed; + public MultiResponse() { super(); } @@ -92,4 +94,13 @@ public class MultiResponse { public Map getExceptions() { return exceptions; } + + public boolean isProcessed() { + return processed; + } + + public void setProcessed(boolean processed) { + this.processed = processed; + } + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 72ae829..0f4f44d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -131,6 +131,7 @@ class MultiServerCallable extends RegionServerCallable impleme throw ProtobufUtil.getRemoteException(e); } if (responseProto == null) return null; // Occurs on cancel + return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner()); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 0af8210..550812f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -67,12 +67,6 @@ public class RpcRetryingCallerFactory { // is cheap as it does not require parsing a complex structure. RpcRetryingCaller caller = new RpcRetryingCallerImpl(pause, retries, interceptor, startLogErrorsCnt); - - // wrap it with stats, if we are tracking them - if (enableBackPressure && this.stats != null) { - caller = new StatsTrackingRpcRetryingCaller(caller, this.stats); - } - return caller; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java index 177b1c7..46dfcca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java @@ -95,7 +95,7 @@ public final class ResponseConverter { org.apache.hadoop.hbase.client.MultiResponse results = new org.apache.hadoop.hbase.client.MultiResponse(); - + results.setProcessed(response.getProcessed()); for (int i = 0; i < responseRegionActionResultCount; i++) { RegionAction actions = request.getRegionAction(i); RegionActionResult actionResult = response.getRegionActionResult(i); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 067f2ad..b4e9e4d 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -138,10 +138,11 @@ public class TestAsyncProcess { @Override protected AsyncRequestFutureImpl createAsyncRequestFuture(TableName tableName, List> actions, long nonceGroup, ExecutorService pool, - Batch.Callback callback, Object[] results, boolean needResults) { + Batch.Callback callback, Object[] results, boolean needResults, + RetryingCallable callable, int t) { // Test HTable has tableName of null, so pass DUMMY_TABLE AsyncRequestFutureImpl r = super.createAsyncRequestFuture( - DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults); + DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults, null, t); allReqs.add(r); return r; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java index 082de09..a8a0239 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java @@ -109,7 +109,13 @@ public class TestCheckAndMutate { table.checkAndMutate(rowKey, family, Bytes.toBytes("A"), CompareFilter.CompareOp.EQUAL, Bytes.toBytes("a"), rm); fail("Expected NoSuchColumnFamilyException"); - } catch(NoSuchColumnFamilyException e) { + } catch(RetriesExhaustedWithDetailsException e) { + for(Throwable rootCause: e.getCauses()){ + if(rootCause instanceof NoSuchColumnFamilyException){ + return; + } + } + throw e; } } finally { table.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 9c9ec87..3b6ff4e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -4366,7 +4366,13 @@ public class TestFromClientSide { arm.add(p); t.mutateRow(arm); fail("Expected NoSuchColumnFamilyException"); - } catch(NoSuchColumnFamilyException e) { + } catch(RetriesExhaustedWithDetailsException e) { + for(Throwable rootCause: e.getCauses()){ + if(rootCause instanceof NoSuchColumnFamilyException){ + return; + } + } + throw e; } } -- 1.9.3 (Apple Git-50)