From 12eadf221e20d79de9da65dc258a1178df14a23c Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Mon, 7 Dec 2015 18:33:35 -0800 Subject: [PATCH] HBASE-14946 Allow batching of Table.get(List) into manageable chunks --- .../apache/hadoop/hbase/client/AsyncProcess.java | 43 +++++++++++++++++++++- .../java/org/apache/hadoop/hbase/HConstants.java | 12 ++++++ 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index f1fa3eb..16f6974 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -203,6 +203,7 @@ class AsyncProcess { protected int serverTrackerTimeout; protected int timeout; protected long primaryCallTimeoutMicroseconds; + protected final int maxActionsPerRequest; // End configuration settings. protected static class BatchErrors { @@ -272,6 +273,9 @@ class AsyncProcess { this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS); + this.maxActionsPerRequest = conf.getInt(HConstants.HBASE_CLIENT_MAX_ACTIONS_PER_MULTI, + HConstants.DEFAULT_HBASE_CLIENT_MAX_ACTIONS_PER_MULTI); + this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); @@ -1012,8 +1016,43 @@ class AsyncProcess { if (connection.getConnectionMetrics() != null) { connection.getConnectionMetrics().incrNormalRunners(); } - return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", - new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress))); + // Now in order to keep from asking for too much data in one multi + // we group maxActionsPerRequest into a mutliAction + if (multiAction.size() < maxActionsPerRequest) { + + // There were less than maxActionsPerRequest so don't create new objects. + // This is the default case and hopefully the normal case. + return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", + new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress))); + } + // Oh no. Someone put in a list of actions that's larger than one multi should take. + // So lets group them. + // The sizing is just a guess But it's the best we can do. + List toReturn = new ArrayList<>(multiAction.size() / maxActionsPerRequest); + for (Map.Entry>> actionEntry : multiAction.actions.entrySet()) { + + // Create the new multi. One per region. + MultiAction nMulti = new MultiAction<>(); + + // Now since any given list could be really large iterate through all of them. + for (Action action : actionEntry.getValue()) { + + if (nMulti.size() >= maxActionsPerRequest) { + // add the large multi to the list since it's reached the max size. + toReturn.add(Trace.wrap("AsyncProcess.sendMultiAction", + new SingleServerRequestRunnable(nMulti, numAttempt, server, callsInProgress))); + + // Now create a new one. + nMulti = new MultiAction<>(); + } + nMulti.add(actionEntry.getKey(), action); + } + + // Add the last multication to the return value. + toReturn.add(Trace.wrap("AsyncProcess.sendMultiAction", + new SingleServerRequestRunnable(nMulti, numAttempt, server, callsInProgress))); + } + return toReturn; } // group the actions by the amount of delay diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index ac57514..a7dbbc1b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -79,6 +79,7 @@ public final class HConstants { /** Just an array of bytes of the right size. */ public static final byte[] HFILEBLOCK_DUMMY_HEADER = new byte[HFILEBLOCK_HEADER_SIZE]; + //End HFileBlockConstants. /** @@ -715,6 +716,17 @@ public final class HConstants { public static final int DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS = 1; /** + * Maximum number of actions (Puts or Gets) that will be grouped together into a single + * multiaction. + */ + public static final String HBASE_CLIENT_MAX_ACTIONS_PER_MULTI = "hbase.client.max.actions.per.multi"; + + /** + * Default value of {@Link #HBASE_CLIENT_MAX_ACTIONS_PER_MULTI}. + */ + public static final int DEFAULT_HBASE_CLIENT_MAX_ACTIONS_PER_MULTI = 100; + + /** * Parameter name for server pause value, used mostly as value to wait before * running a retry of a failed operation. */ -- 2.6.3