From 14767f12fb77d23d2cbc451cedaeada0bb96105f Mon Sep 17 00:00:00 2001 From: chenheng Date: Fri, 20 Nov 2015 17:30:56 +0800 Subject: [PATCH] add --- .../apache/hadoop/hbase/client/AsyncProcess.java | 24 ++++--------------- .../hadoop/hbase/protobuf/ResponseConverter.java | 6 +---- .../hadoop/hbase/regionserver/RSRpcServices.java | 27 ++++++++++++++++++---- 3 files changed, 28 insertions(+), 29 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 2a97e2a..22ca71f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -720,7 +720,7 @@ class AsyncProcess { callsInProgress.add(callable); } res = caller.callWithoutRetries(callable, currentCallTotalTimeout); - if (res == null|| res.getResults().size() == 0) { + if (res == null) { return; } } catch (IOException e) { @@ -1256,12 +1256,11 @@ class AsyncProcess { // update the stats about the region, if its a user table. We don't want to slow down // updates to meta tables, especially from internal updates (master, etc). - - // Go by original action. int failed = 0, stopped = 0; + // Go by original action. for (Map.Entry>> regionEntry : multiAction.actions.entrySet()) { byte[] regionName = regionEntry.getKey(); - Map regionResults = results.get(regionName).result; + Map regionResults = results.get(regionName) == null ? null : results.get(regionName).result; if (regionResults == null) { if (!responses.getExceptions().containsKey(regionName)) { LOG.error("Server sent us neither results nor exceptions for " @@ -1289,7 +1288,7 @@ class AsyncProcess { } ++failureCount; Retry retry = manageError(sentAction.getOriginalIndex(), row, - canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable)result, server); + canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable) result, server); if (retry == Retry.YES) { toReplay.add(sentAction); } else if (retry == Retry.NO_OTHER_SUCCEEDED) { @@ -1298,24 +1297,11 @@ class AsyncProcess { ++failed; } } else { - - if (AsyncProcess.this.connection.getConnectionMetrics() != null) { - AsyncProcess.this.connection.getConnectionMetrics(). - updateServerStats(server, regionName, result); - } - - // update the stats about the region, if its a user table. We don't want to slow down - // updates to meta tables, especially from internal updates (master, etc). - if (AsyncProcess.this.connection.getStatisticsTracker() != null) { - result = ResultStatsUtil.updateStats(result, - AsyncProcess.this.connection.getStatisticsTracker(), server, regionName); - } - if (callback != null) { try { //noinspection unchecked // TODO: would callback expect a replica region name if it gets one? - this.callback.update(regionName, sentAction.getAction().getRow(), (CResult)result); + this.callback.update(regionName, sentAction.getAction().getRow(), (CResult) result); } catch (Throwable t) { LOG.error("User callback threw an exception for " + Bytes.toStringBinary(regionName) + ", ignoring", t); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java index ad96e4c..664849f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java @@ -123,12 +123,8 @@ public final class ResponseConverter { Object responseValue; if (roe.hasException()) { responseValue = ProtobufUtil.toException(roe.getException()); - } else if (roe.hasResult() || roe.hasLoadStats()) { + } else if (roe.hasResult()) { responseValue = ProtobufUtil.toResult(roe.getResult(), cells); - // add the load stats, if we got any - if (roe.hasLoadStats()) { - ((Result) responseValue).addResults(roe.getLoadStats()); - } } else if (roe.hasServiceResult()) { responseValue = roe.getServiceResult(); } else if (response.getProcessed()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index b5150ca..6599c5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -485,11 +485,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ private void mutateRows(final Region region, final List actions, - final CellScanner cellScanner) throws IOException { + final CellScanner cellScanner, RegionActionResult.Builder builder) throws IOException { if (!region.getRegionInfo().isMetaTable()) { regionServer.cacheFlusher.reclaimMemStoreMemory(); } RowMutations rm = null; + int i = 0; + ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = ClientProtos.ResultOrException.newBuilder(); for (ClientProtos.Action action: actions) { if (action.hasGet()) { throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + @@ -509,6 +511,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, default: throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); } + // To unify the response format with doNonAtomicRegionMutation, + // so we add an empty instance + resultOrExceptionOrBuilder.clear(); + resultOrExceptionOrBuilder.setIndex(i++); + builder.addResultOrException( + resultOrExceptionOrBuilder.build()); } region.mutateRow(rm); } @@ -527,11 +535,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ private boolean checkAndRowMutate(final Region region, final List actions, final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, ByteArrayComparable comparator) throws IOException { + CompareOp compareOp, ByteArrayComparable comparator, + RegionActionResult.Builder builder) throws IOException { if (!region.getRegionInfo().isMetaTable()) { regionServer.cacheFlusher.reclaimMemStoreMemory(); } RowMutations rm = null; + int i = 0; + ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = ClientProtos.ResultOrException.newBuilder(); for (ClientProtos.Action action: actions) { if (action.hasGet()) { throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + @@ -551,6 +562,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, default: throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); } + // To unify the response format with doNonAtomicRegionMutation, + // so we add an empty instance + resultOrExceptionOrBuilder.clear(); + resultOrExceptionOrBuilder.setIndex(i++); + builder.addResultOrException( + resultOrExceptionOrBuilder.build()); } return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, Boolean.TRUE); } @@ -2142,10 +2159,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); processed = checkAndRowMutate(region, regionAction.getActionList(), - cellScanner, row, family, qualifier, compareOp, comparator); + cellScanner, row, family, qualifier, compareOp, + comparator, regionActionResultBuilder); } else { - mutateRows(region, regionAction.getActionList(), cellScanner); - regionActionResultBuilder.addResultOrException(ResultOrException.getDefaultInstance()); + mutateRows(region, regionAction.getActionList(), cellScanner, regionActionResultBuilder); processed = Boolean.TRUE; } } catch (IOException e) { -- 1.9.3 (Apple Git-50)