From 3b17e71c028f81dfa39b4e35cb617b9502a7b3a8 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Mon, 7 Dec 2015 18:33:35 -0800 Subject: [PATCH] HBASE-14946 Don't allow multi's to over run the max result size. --- .../apache/hadoop/hbase/ipc/RpcCallContext.java | 14 +++++++++++ .../org/apache/hadoop/hbase/ipc/RpcServer.java | 10 ++++++++ .../hadoop/hbase/regionserver/RSRpcServices.java | 29 ++++++++++++++++------ 3 files changed, 45 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java index 60e5f5d..ad7eb1f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java @@ -71,4 +71,18 @@ public interface RpcCallContext extends Delayable { * @param callback */ void setCallBack(RpcCallback callback); + + /** + * The size of response cells that have been accumulated so far. + * This along with the corresponding increment call is used to ensure that multi's or + * scans dont get too excessively large + */ + long getResponseCellSize(); + + /** + * Add on the given amount to the retained cell size. + * + * This is not thread safe and not synchronized at all. + */ + void incrementCellSize(long cellSize); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 402bca0..6ecf943 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -317,6 +317,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private InetAddress remoteAddress; private RpcCallback callback; + private long responseCellSize = 0; + Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, Connection connection, Responder responder, long size, TraceInfo tinfo, final InetAddress remoteAddress) { @@ -538,6 +540,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return this.size; } + public long getResponseCellSize() { + return responseCellSize; + } + + public void incrementCellSize(long cellSize) { + responseCellSize += cellSize; + } + /** * If we have a response, and delay is not set, then respond * immediately. Otherwise, do not respond to client. This is diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index d94e11c..c019128 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -655,10 +655,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // ResultOrException instance that matches each Put or Delete is then added down in the // doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched List mutations = null; + long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); for (ClientProtos.Action action : actions.getActionList()) { ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null; try { Result r = null; + + if (context != null && context.getResponseCellSize() > maxQuotaResultSize) { + // Yes this sucks. + // I'm sorry. However creating a specialized exception for this would + // break backwards compat since the client might not have the exception type. + // That would then mean that the message is lossed. + throw new HBaseIOException("Response size would be too large"); + } if (action.hasGet()) { Get get = ProtobufUtil.toGet(action.getGet()); if (context != null) { @@ -720,6 +729,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } else { pbResult = ProtobufUtil.toResult(r); } + if (context != null) { + context.incrementCellSize(Result.getTotalSizeOfCells(r)); + } resultOrExceptionBuilder = ClientProtos.ResultOrException.newBuilder().setResult(pbResult); } @@ -2446,8 +2458,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // where processing of request takes > lease expiration time. lease = regionServer.leases.removeLease(scannerName); List results = new ArrayList(); - long totalCellSize = 0; - long currentScanResultSize = 0; boolean done = false; // Call coprocessor. Get region info from scanner. @@ -2457,8 +2467,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (!results.isEmpty()) { for (Result r : results) { for (Cell cell : r.rawCells()) { - totalCellSize += CellUtil.estimatedSerializedSizeOf(cell); - currentScanResultSize += CellUtil.estimatedHeapSizeOf(cell); + if (context != null) { + context.incrementCellSize(CellUtil.estimatedSerializedSizeOf(cell)); + } } } } @@ -2491,7 +2502,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // If the coprocessor host is adding to the result list, we cannot guarantee the // correct ordering of partial results and so we prevent partial results from being // formed. - boolean serverGuaranteesOrderOfPartials = currentScanResultSize == 0; + boolean serverGuaranteesOrderOfPartials = results.isEmpty(); boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan; boolean moreRows = false; @@ -2557,7 +2568,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (!values.isEmpty()) { for (Cell cell : values) { - totalCellSize += CellUtil.estimatedSerializedSizeOf(cell); + if (context != null) { + context.incrementCellSize(CellUtil.estimatedSerializedSizeOf(cell)); + } } final boolean partial = scannerContext.partialResultFormed(); results.add(Result.create(values, null, stale, partial)); @@ -2612,9 +2625,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } region.updateReadRequestsCount(i); - region.getMetrics().updateScanNext(totalCellSize); + region.getMetrics().updateScanNext(context!= null?context.getResponseCellSize():0); if (regionServer.metricsRegionServer != null) { - regionServer.metricsRegionServer.updateScannerNext(totalCellSize); + regionServer.metricsRegionServer.updateScannerNext(context!=null?context.getResponseCellSize():0); } } finally { region.closeRegionOperation(); -- 2.6.3