From f8fa6f79b0538c7a6c727bf10712f8ed3878c07f Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Mon, 7 Dec 2015 18:33:35 -0800 Subject: [PATCH] HBASE-14946 Don't allow multi's to over run the max result size. --- .../org/apache/hadoop/hbase/client/Result.java | 3 + .../apache/hadoop/hbase/ipc/RpcCallContext.java | 14 +++ .../org/apache/hadoop/hbase/ipc/RpcServer.java | 10 ++ .../hadoop/hbase/regionserver/RSRpcServices.java | 114 ++++++++++++--------- 4 files changed, 95 insertions(+), 46 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java index 702983b..d2a49c2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -839,6 +839,9 @@ public class Result implements CellScannable, CellScanner { */ public static long getTotalSizeOfCells(Result result) { long size = 0; + if (result.isEmpty()) { + return size; + } for (Cell c : result.rawCells()) { size += CellUtil.estimatedHeapSizeOf(c); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java index 60e5f5d..ad7eb1f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java @@ -71,4 +71,18 @@ public interface RpcCallContext extends Delayable { * @param callback */ void setCallBack(RpcCallback callback); + + /** + * The size of response cells that have been accumulated so far. + * This along with the corresponding increment call is used to ensure that multi's or + * scans dont get too excessively large + */ + long getResponseCellSize(); + + /** + * Add on the given amount to the retained cell size. + * + * This is not thread safe and not synchronized at all. + */ + void incrementCellSize(long cellSize); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 402bca0..6ecf943 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -317,6 +317,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private InetAddress remoteAddress; private RpcCallback callback; + private long responseCellSize = 0; + Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, Connection connection, Responder responder, long size, TraceInfo tinfo, final InetAddress remoteAddress) { @@ -538,6 +540,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return this.size; } + public long getResponseCellSize() { + return responseCellSize; + } + + public void incrementCellSize(long cellSize) { + responseCellSize += cellSize; + } + /** * If we have a response, and delay is not set, then respond * immediately. Otherwise, do not respond to client. This is diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index d94e11c..c636601 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -500,13 +500,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler, rm = new RowMutations(action.getMutation().getRow().toByteArray()); } switch (type) { - case PUT: - rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner)); - break; - case DELETE: - rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner)); - break; - default: + case PUT: + rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner)); + break; + case DELETE: + rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner)); + break; + default: throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); } } @@ -543,14 +543,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler, rm = new RowMutations(action.getMutation().getRow().toByteArray()); } switch (type) { - case PUT: - rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner)); - break; - case DELETE: - rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner)); - break; - default: - throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); + case PUT: + rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner)); + break; + case DELETE: + rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner)); + break; + default: + throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); } } return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, Boolean.TRUE); @@ -655,10 +655,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // ResultOrException instance that matches each Put or Delete is then added down in the // doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched List mutations = null; + long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); for (ClientProtos.Action action : actions.getActionList()) { ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null; try { Result r = null; + + if (context != null && context.getResponseCellSize() > maxQuotaResultSize) { + // Yes this sucks. + // I'm sorry. However creating a specialized exception for this would + // break backwards compat since the client might not have the exception type. + // That would then mean that the message is lossed. + throw new HBaseIOException("Response size would be too large"); + } if (action.hasGet()) { Get get = ProtobufUtil.toGet(action.getGet()); if (context != null) { @@ -690,22 +699,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler, mutations.clear(); } switch (type) { - case APPEND: - r = append(region, quota, action.getMutation(), cellScanner, nonceGroup); - break; - case INCREMENT: - r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup); - break; - case PUT: - case DELETE: - // Collect the individual mutations and apply in a batch - if (mutations == null) { - mutations = new ArrayList(actions.getActionCount()); - } - mutations.add(action); - break; - default: - throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); + case APPEND: + r = append(region, quota, action.getMutation(), cellScanner, nonceGroup); + break; + case INCREMENT: + r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup); + break; + case PUT: + case DELETE: + // Collect the individual mutations and apply in a batch + if (mutations == null) { + mutations = new ArrayList(actions.getActionCount()); + } + mutations.add(action); + break; + default: + throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); } } else { throw new HBaseIOException("Unexpected Action type"); @@ -715,11 +724,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (isClientCellBlockSupport(context)) { pbResult = ProtobufUtil.toResultNoData(r); // Hard to guess the size here. Just make a rough guess. - if (cellsToReturn == null) cellsToReturn = new ArrayList(); + if (cellsToReturn == null) { + cellsToReturn = new ArrayList(); + } cellsToReturn.add(r); } else { pbResult = ProtobufUtil.toResult(r); } + if (context != null) { + context.incrementCellSize(Result.getTotalSizeOfCells(r)); + } resultOrExceptionBuilder = ClientProtos.ResultOrException.newBuilder().setResult(pbResult); } @@ -801,8 +815,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, case SUCCESS: builder.addResultOrException(getResultOrException( - ClientProtos.Result.getDefaultInstance(), index, - ((HRegion)region).getRegionStats())); + ClientProtos.Result.getDefaultInstance(), index, + ((HRegion) region).getRegionStats())); break; } } @@ -2104,7 +2118,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // It is also the conduit via which we pass back data. PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc; CellScanner cellScanner = controller != null ? controller.cellScanner(): null; - if (controller != null) controller.setCellScanner(null); + if (controller != null) { + controller.setCellScanner(null); + } long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; @@ -2178,7 +2194,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); } - if (processed != null) responseBuilder.setProcessed(processed); + if (processed != null) { + responseBuilder.setProcessed(processed); + } return responseBuilder.build(); } @@ -2195,10 +2213,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. // It is also the conduit via which we pass back data. PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc; - CellScanner cellScanner = controller != null? controller.cellScanner(): null; + CellScanner cellScanner = controller != null ? controller.cellScanner() : null; OperationQuota quota = null; // Clear scanner so we are not holding on to reference across call. - if (controller != null) controller.setCellScanner(null); + if (controller != null) { + controller.setCellScanner(null); + } try { checkOpen(); requestCount.increment(); @@ -2446,8 +2466,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // where processing of request takes > lease expiration time. lease = regionServer.leases.removeLease(scannerName); List results = new ArrayList(); - long totalCellSize = 0; - long currentScanResultSize = 0; boolean done = false; // Call coprocessor. Get region info from scanner. @@ -2457,8 +2475,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (!results.isEmpty()) { for (Result r : results) { for (Cell cell : r.rawCells()) { - totalCellSize += CellUtil.estimatedSerializedSizeOf(cell); - currentScanResultSize += CellUtil.estimatedHeapSizeOf(cell); + if (context != null) { + context.incrementCellSize(CellUtil.estimatedSerializedSizeOf(cell)); + } } } } @@ -2491,7 +2510,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // If the coprocessor host is adding to the result list, we cannot guarantee the // correct ordering of partial results and so we prevent partial results from being // formed. - boolean serverGuaranteesOrderOfPartials = currentScanResultSize == 0; + boolean serverGuaranteesOrderOfPartials = results.isEmpty(); boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan; boolean moreRows = false; @@ -2557,7 +2576,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (!values.isEmpty()) { for (Cell cell : values) { - totalCellSize += CellUtil.estimatedSerializedSizeOf(cell); + if (context != null) { + context.incrementCellSize(CellUtil.estimatedSerializedSizeOf(cell)); + } } final boolean partial = scannerContext.partialResultFormed(); results.add(Result.create(values, null, stale, partial)); @@ -2612,9 +2633,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } region.updateReadRequestsCount(i); - region.getMetrics().updateScanNext(totalCellSize); + long responseCellSize = context != null ? context.getResponseCellSize() : 0; + region.getMetrics().updateScanNext(responseCellSize); if (regionServer.metricsRegionServer != null) { - regionServer.metricsRegionServer.updateScannerNext(totalCellSize); + regionServer.metricsRegionServer.updateScannerNext(responseCellSize); } } finally { region.closeRegionOperation(); -- 2.6.3