From cd72ae70569be79e2a3f0c4a432087e7ea633eea Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Mon, 7 Dec 2015 18:33:35 -0800 Subject: [PATCH] HBASE-14946 Allow batching of Table.get(List) into manageable chunks --- .../apache/hadoop/hbase/client/AsyncProcess.java | 42 ++++++++++++++++++++-- .../java/org/apache/hadoop/hbase/HConstants.java | 12 +++++++ .../apache/hadoop/hbase/ipc/RpcCallContext.java | 14 ++++++++ .../org/apache/hadoop/hbase/ipc/RpcServer.java | 10 ++++++ .../hadoop/hbase/regionserver/RSRpcServices.java | 21 ++++++----- 5 files changed, 89 insertions(+), 10 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index f1fa3eb..268b8b6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -203,6 +203,7 @@ class AsyncProcess { protected int serverTrackerTimeout; protected int timeout; protected long primaryCallTimeoutMicroseconds; + protected final int maxActionsPerRequest; // End configuration settings. protected static class BatchErrors { @@ -272,6 +273,9 @@ class AsyncProcess { this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS); + this.maxActionsPerRequest = conf.getInt(HConstants.HBASE_CLIENT_MAX_ACTIONS_PER_MULTI, + HConstants.DEFAULT_HBASE_CLIENT_MAX_ACTIONS_PER_MULTI); + this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); @@ -1012,8 +1016,42 @@ class AsyncProcess { if (connection.getConnectionMetrics() != null) { connection.getConnectionMetrics().incrNormalRunners(); } - return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", - new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress))); + // Now in order to keep from asking for too much data in one multi + // we group maxActionsPerRequest into a mutliAction + if (multiAction.size() < maxActionsPerRequest) { + + // There were less than maxActionsPerRequest so don't create new objects. + // This is the default case and hopefully the normal case. + return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", + new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress))); + } + // Oh no. Someone put in a list of actions that's larger than one multi should take. + // So lets group them. + // The sizing is just a guess But it's the best we can do. + List toReturn = new ArrayList<>(multiAction.size() / maxActionsPerRequest); + // Create the new multi. One per region. + MultiAction nMulti = new MultiAction<>(); + for (Map.Entry>> actionEntry : multiAction.actions.entrySet()) { + + + // Now since any given list could be really large iterate through all of them. + for (Action action : actionEntry.getValue()) { + + if (nMulti.size() >= maxActionsPerRequest) { + // add the large multi to the list since it's reached the max size. + toReturn.add(Trace.wrap("AsyncProcess.sendMultiAction", + new SingleServerRequestRunnable(nMulti, numAttempt, server, callsInProgress))); + + // Now create a new one. + nMulti = new MultiAction<>(); + } + nMulti.add(actionEntry.getKey(), action); + } + } + // Add the last multication to the return value. + toReturn.add(Trace.wrap("AsyncProcess.sendMultiAction", + new SingleServerRequestRunnable(nMulti, numAttempt, server, callsInProgress))); + return toReturn; } // group the actions by the amount of delay diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index ac57514..66cd073 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -79,6 +79,7 @@ public final class HConstants { /** Just an array of bytes of the right size. */ public static final byte[] HFILEBLOCK_DUMMY_HEADER = new byte[HFILEBLOCK_HEADER_SIZE]; + //End HFileBlockConstants. /** @@ -715,6 +716,17 @@ public final class HConstants { public static final int DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS = 1; /** + * Maximum number of actions (Puts or Gets) that will be grouped together into a single + * multiaction. + */ + public static final String HBASE_CLIENT_MAX_ACTIONS_PER_MULTI = "hbase.client.max.actions.per.multi"; + + /** + * Default value of {@Link #HBASE_CLIENT_MAX_ACTIONS_PER_MULTI}. + */ + public static final int DEFAULT_HBASE_CLIENT_MAX_ACTIONS_PER_MULTI = 1000 ; + + /** * Parameter name for server pause value, used mostly as value to wait before * running a retry of a failed operation. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java index 60e5f5d..ad7eb1f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java @@ -71,4 +71,18 @@ public interface RpcCallContext extends Delayable { * @param callback */ void setCallBack(RpcCallback callback); + + /** + * The size of response cells that have been accumulated so far. + * This along with the corresponding increment call is used to ensure that multi's or + * scans dont get too excessively large + */ + long getResponseCellSize(); + + /** + * Add on the given amount to the retained cell size. + * + * This is not thread safe and not synchronized at all. + */ + void incrementCellSize(long cellSize); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 402bca0..6ecf943 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -317,6 +317,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private InetAddress remoteAddress; private RpcCallback callback; + private long responseCellSize = 0; + Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, Connection connection, Responder responder, long size, TraceInfo tinfo, final InetAddress remoteAddress) { @@ -538,6 +540,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return this.size; } + public long getResponseCellSize() { + return responseCellSize; + } + + public void incrementCellSize(long cellSize) { + responseCellSize += cellSize; + } + /** * If we have a response, and delay is not set, then respond * immediately. Otherwise, do not respond to client. This is diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index d94e11c..08155a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -655,10 +655,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // ResultOrException instance that matches each Put or Delete is then added down in the // doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched List mutations = null; + long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); for (ClientProtos.Action action : actions.getActionList()) { ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null; try { Result r = null; + + if (context != null && context.getResponseCellSize() > maxQuotaResultSize) { + throw new HBaseIOException("Response size would be too large"); + } if (action.hasGet()) { Get get = ProtobufUtil.toGet(action.getGet()); if (context != null) { @@ -720,6 +725,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } else { pbResult = ProtobufUtil.toResult(r); } + if (context != null) { + context.incrementCellSize(Result.getTotalSizeOfCells(r)); + } resultOrExceptionBuilder = ClientProtos.ResultOrException.newBuilder().setResult(pbResult); } @@ -2446,8 +2454,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // where processing of request takes > lease expiration time. lease = regionServer.leases.removeLease(scannerName); List results = new ArrayList(); - long totalCellSize = 0; - long currentScanResultSize = 0; boolean done = false; // Call coprocessor. Get region info from scanner. @@ -2457,8 +2463,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (!results.isEmpty()) { for (Result r : results) { for (Cell cell : r.rawCells()) { - totalCellSize += CellUtil.estimatedSerializedSizeOf(cell); - currentScanResultSize += CellUtil.estimatedHeapSizeOf(cell); + context.incrementCellSize(CellUtil.estimatedSerializedSizeOf(cell)); } } } @@ -2491,7 +2496,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // If the coprocessor host is adding to the result list, we cannot guarantee the // correct ordering of partial results and so we prevent partial results from being // formed. - boolean serverGuaranteesOrderOfPartials = currentScanResultSize == 0; + boolean serverGuaranteesOrderOfPartials = context.getResponseCellSize() == 0; boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan; boolean moreRows = false; @@ -2557,7 +2562,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (!values.isEmpty()) { for (Cell cell : values) { - totalCellSize += CellUtil.estimatedSerializedSizeOf(cell); + context.incrementCellSize(CellUtil.estimatedSerializedSizeOf(cell)); } final boolean partial = scannerContext.partialResultFormed(); results.add(Result.create(values, null, stale, partial)); @@ -2612,9 +2617,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } region.updateReadRequestsCount(i); - region.getMetrics().updateScanNext(totalCellSize); + region.getMetrics().updateScanNext(context.getResponseCellSize()); if (regionServer.metricsRegionServer != null) { - regionServer.metricsRegionServer.updateScannerNext(totalCellSize); + regionServer.metricsRegionServer.updateScannerNext(context.getResponseCellSize()); } } finally { region.closeRegionOperation(); -- 2.6.3