diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 4514560..59d2878 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -26,7 +26,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -38,6 +37,9 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -60,6 +62,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.htrace.Trace; +import org.apache.hadoop.hbase.classification.InterfaceStability; import com.google.common.annotations.VisibleForTesting; @@ -218,6 +221,10 @@ class AsyncProcess { protected final int maxTotalConcurrentTasks; /** + * The max heap size of all tasks simultaneously executed on a server. + */ + protected final long maxHeapSizePerServer; + /** * The number of tasks we run in parallel on a single region. * With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start * a set of operations on a region before the previous one is done. As well, this limits @@ -307,7 +314,8 @@ class AsyncProcess { HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS); this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS); - + this.maxHeapSizePerServer = conf.getLong(HConstants.HBASE_CLIENT_MAX_PERSERVER_HEAPSIZE, + HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_HEAPSIZE); this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); @@ -357,16 +365,26 @@ class AsyncProcess { } throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService"); } - + public AsyncRequestFuture submit(TableName tableName, final List rows, + boolean atLeastOne, Batch.Callback callback, boolean needResults) + throws InterruptedIOException { + return submit(null, tableName, rows, atLeastOne, callback, needResults); + } /** * See {@link #submit(ExecutorService, TableName, List, boolean, Batch.Callback, boolean)}. * Uses default ExecutorService for this AP (must have been created with one). */ - public AsyncRequestFuture submit(TableName tableName, List rows, - boolean atLeastOne, Batch.Callback callback, boolean needResults) - throws InterruptedIOException { + public AsyncRequestFuture submit(TableName tableName, + final RowAccess rows, boolean atLeastOne, Batch.Callback callback, + boolean needResults) throws InterruptedIOException { return submit(null, tableName, rows, atLeastOne, callback, needResults); } + public AsyncRequestFuture submit(ExecutorService pool, TableName tableName, + List rows, boolean atLeastOne, Batch.Callback callback, + boolean needResults) throws InterruptedIOException { + return submit(pool, tableName, new ListRowAccess(rows), atLeastOne, + callback, needResults); + } /** * Extract from the rows list what we can submit. The rows we can not submit are kept in the @@ -381,7 +399,7 @@ class AsyncProcess { * @param atLeastOne true if we should submit at least a subset. */ public AsyncRequestFuture submit(ExecutorService pool, TableName tableName, - List rows, boolean atLeastOne, Batch.Callback callback, + RowAccess rows, boolean atLeastOne, Batch.Callback callback, boolean needResults) throws InterruptedIOException { if (rows.isEmpty()) { return NO_REQS_RESULT; @@ -390,7 +408,6 @@ class AsyncProcess { Map> actionsByServer = new HashMap>(); List> retainedActions = new ArrayList>(rows.size()); - NonceGenerator ng = this.connection.getNonceGenerator(); long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client. @@ -400,21 +417,12 @@ class AsyncProcess { do { // Wait until there is at least one slot for a new task. waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString()); - - // Remember the previous decisions about regions or region servers we put in the - // final multi. - Map regionIncluded = new HashMap(); - Map serverIncluded = new HashMap(); - int posInList = -1; - Iterator it = rows.iterator(); - while (it.hasNext()) { - Row r = it.next(); + Row r; + TaskCheckerHost checker = createTaskCheckerHost(tableName); + while ((r = rows.take()) != null) { HRegionLocation loc; try { - if (r == null) { - throw new IllegalArgumentException("#" + id + ", row cannot be null"); - } // Make sure we get 0-s replica. RegionLocations locs = connection.locateRegion( tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID); @@ -432,28 +440,46 @@ class AsyncProcess { retainedActions.add(new Action(r, ++posInList)); locationErrors.add(ex); locationErrorRows.add(posInList); - it.remove(); break; // Backward compat: we stop considering actions on location error. } - - if (canTakeOperation(loc, regionIncluded, serverIncluded)) { + long requestSize = (r instanceof Mutation) ? ((Mutation) r).heapSize() : 0; + TaskChecker.ReturnCode code = checker.canTakeOperation(loc, requestSize); + if (code == TaskChecker.ReturnCode.END) { + rows.restoreLast(); + break; + } else if (code == TaskChecker.ReturnCode.INCLUDE) { Action action = new Action(r, ++posInList); setNonce(ng, r, action); retainedActions.add(action); // TODO: replica-get is not supported on this path byte[] regionName = loc.getRegionInfo().getRegionName(); addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); - it.remove(); + } else { + rows.restoreLast(); } } + rows.recollect(); } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null)); - if (retainedActions.isEmpty()) return NO_REQS_RESULT; - return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults, locationErrors, locationErrorRows, actionsByServer, pool); } + private TaskCheckerHost createTaskCheckerHost(final TableName tableName) { + return new TaskCheckerHost(Arrays.asList( + new TaskCountChecker( + maxTotalConcurrentTasks, + maxConcurrentTasksPerServer, + maxConcurrentTasksPerRegion, + tasksInProgress, + taskCounterPerServer, + taskCounterPerRegion + ), new TaskSizeChecker( + connection, + tableName, + maxHeapSizePerServer) + )); + } AsyncRequestFuture submitMultiActions(TableName tableName, List> retainedActions, long nonceGroup, Batch.Callback callback, Object[] results, boolean needResults, List locationErrors, @@ -495,74 +521,6 @@ class AsyncProcess { multiAction.add(regionName, action); } - - /** - * Check if we should send new operations to this region or region server. - * We're taking into account the past decision; if we have already accepted - * operation on a given region, we accept all operations for this region. - * - * @param loc; the region and the server name we want to use. - * @return true if this region is considered as busy. - */ - protected boolean canTakeOperation(HRegionLocation loc, - Map regionsIncluded, - Map serversIncluded) { - HRegionInfo regionInfo = loc.getRegionInfo(); - Boolean regionPrevious = regionsIncluded.get(regionInfo); - - if (regionPrevious != null) { - // We already know what to do with this region. - return regionPrevious; - } - - Boolean serverPrevious = serversIncluded.get(loc.getServerName()); - if (Boolean.FALSE.equals(serverPrevious)) { - // It's a new region, on a region server that we have already excluded. - regionsIncluded.put(regionInfo, Boolean.FALSE); - return false; - } - - AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName()); - if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) { - // Too many tasks on this region already. - regionsIncluded.put(regionInfo, Boolean.FALSE); - return false; - } - - if (serverPrevious == null) { - // The region is ok, but we need to decide for this region server. - int newServers = 0; // number of servers we're going to contact so far - for (Map.Entry kv : serversIncluded.entrySet()) { - if (kv.getValue()) { - newServers++; - } - } - - // Do we have too many total tasks already? - boolean ok = (newServers + tasksInProgress.get()) < maxTotalConcurrentTasks; - - if (ok) { - // If the total is fine, is it ok for this individual server? - AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName()); - ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer); - } - - if (!ok) { - regionsIncluded.put(regionInfo, Boolean.FALSE); - serversIncluded.put(loc.getServerName(), Boolean.FALSE); - return false; - } - - serversIncluded.put(loc.getServerName(), Boolean.TRUE); - } else { - assert serverPrevious.equals(Boolean.TRUE); - } - - regionsIncluded.put(regionInfo, Boolean.TRUE); - - return true; - } - /** * See {@link #submitAll(ExecutorService, TableName, List, Batch.Callback, Object[])}. * Uses default ExecutorService for this AP (must have been created with one). @@ -751,6 +709,21 @@ class AsyncProcess { this.callsInProgress = callsInProgress; } + @VisibleForTesting + long heapSize() { + long heapSize = 0; + for (Map.Entry>> e: this.multiAction.actions.entrySet()) { + List> actions = e.getValue(); + for (Action action: actions) { + Row row = action.getAction(); + if (row instanceof Mutation) { + heapSize += ((Mutation) row).heapSize(); + } + } + } + return heapSize; + } + @Override public void run() { MultiResponse res; @@ -832,7 +805,7 @@ class AsyncProcess { private final long nonceGroup; private PayloadCarryingServerCallable currentCallable; private int currentCallTotalTimeout; - + private final Map> heapSizesByServer = new HashMap<>(); public AsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, ExecutorService pool, boolean needResults, Object[] results, Batch.Callback callback, PayloadCarryingServerCallable callable, int timeout) { @@ -911,7 +884,21 @@ class AsyncProcess { public Set getCallsInProgress() { return callsInProgress; } + @VisibleForTesting + Map> getRequestHeapSize() { + return heapSizesByServer; + } + private SingleServerRequestRunnable addSingleServerRequestHeapSize(ServerName server, + SingleServerRequestRunnable runnable) { + List heapCount = heapSizesByServer.get(server); + if (heapCount == null) { + heapCount = new LinkedList<>(); + heapSizesByServer.put(server, heapCount); + } + heapCount.add(runnable.heapSize()); + return runnable; + } /** * Group a list of actions per region servers, and send them. * @@ -1081,8 +1068,9 @@ class AsyncProcess { if (connection.getConnectionMetrics() != null) { connection.getConnectionMetrics().incrNormalRunners(); } - return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", - new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress))); + SingleServerRequestRunnable runnable = addSingleServerRequestHeapSize(server, + new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)); + return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable)); } // group the actions by the amount of delay @@ -1103,9 +1091,8 @@ class AsyncProcess { List toReturn = new ArrayList(actions.size()); for (DelayingRunner runner : actions.values()) { String traceText = "AsyncProcess.sendMultiAction"; - Runnable runnable = - new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, - callsInProgress); + Runnable runnable = addSingleServerRequestHeapSize(server, + new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress)); // use a delay runner only if we need to sleep for some time if (runner.getSleepTime() > 0) { runner.setRunner(runnable); @@ -1942,4 +1929,324 @@ class AsyncProcess { NO_RETRIES_EXHAUSTED, NO_OTHER_SUCCEEDED } + + @VisibleForTesting + static class TaskCheckerHost { + private final List checkers; + private boolean isEnd = false; + TaskCheckerHost(final List checkers) { + this.checkers = checkers; + } + TaskChecker.ReturnCode canTakeOperation(HRegionLocation loc, long requestSize) { + if (isEnd) { + return TaskChecker.ReturnCode.END; + } + TaskChecker.ReturnCode code = TaskChecker.ReturnCode.INCLUDE; + for (TaskChecker checker : checkers) { + switch (checker.canTakeOperation(loc, requestSize)) { + case END: + isEnd = true; + code = TaskChecker.ReturnCode.END; + break; + case SKIP: + code = TaskChecker.ReturnCode.SKIP; + break; + case INCLUDE: + default: + break; + } + if (code != TaskChecker.ReturnCode.INCLUDE) { + break; + } + } + + for (TaskChecker checker : checkers) { + checker.visit(code, loc, requestSize); + } + return code; + } + } + + @VisibleForTesting + interface TaskChecker { + enum ReturnCode { + INCLUDE, SKIP, END + }; + + ReturnCode canTakeOperation(HRegionLocation loc, long requestSize); + void visit(ReturnCode code, HRegionLocation loc, long requestSize); + } + + @VisibleForTesting + static class TaskCountChecker implements TaskChecker { +// private final Map regionsIncluded = new HashMap<>(); +// private final Map serversIncluded = new HashMap<>(); + private final Set regionsIncluded = new HashSet<>(); + private final Set serversIncluded = new HashSet<>(); + private final int maxConcurrentTasksPerRegion; + private final int maxTotalConcurrentTasks; + private final int maxConcurrentTasksPerServer; + private final Map taskCounterPerRegion; + private final Map taskCounterPerServer; + private final AtomicLong tasksInProgress; + private boolean newLocLast = false; + TaskCountChecker(final int maxTotalConcurrentTasks, + final int maxConcurrentTasksPerServer, + final int maxConcurrentTasksPerRegion, + final AtomicLong tasksInProgress, + final Map taskCounterPerServer, + final Map taskCounterPerRegion) { + this.maxTotalConcurrentTasks = maxTotalConcurrentTasks; + this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion; + this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer; + this.taskCounterPerRegion = taskCounterPerRegion; + this.taskCounterPerServer = taskCounterPerServer; + this.tasksInProgress = tasksInProgress; + } + + /** + * 1) check the regions is allowed. + * 2) check the concurrent tasks for regions. + * 3) check the total concurrent tasks. + * 4) check the concurrent tasks for server. + * @param loc + * @param requestSize + * @return + */ + @Override + public ReturnCode canTakeOperation(HRegionLocation loc, long requestSize) { + newLocLast = false; + HRegionInfo regionInfo = loc.getRegionInfo(); + if (regionsIncluded.contains(regionInfo)) { + // We already know what to do with this region. + return ReturnCode.INCLUDE; + } + + AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName()); + if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) { + // Too many tasks on this region already. + return ReturnCode.SKIP; + } + int newServers = serversIncluded.size() + (serversIncluded.contains(loc.getServerName()) ? 0 : 1); + if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) { + // Too many tasks. + return ReturnCode.SKIP; + } + AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName()); + if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) { + // Too many tasks for this individual server + return ReturnCode.SKIP; + } + newLocLast = true; + return ReturnCode.INCLUDE; + } + + @Override + public void visit(ReturnCode code, HRegionLocation loc, long requestSize) { + // only save the result. + if (code == ReturnCode.INCLUDE && newLocLast) { + regionsIncluded.add(loc.getRegionInfo()); + serversIncluded.add(loc.getServerName()); + newLocLast = false; + } + } + } + @VisibleForTesting + static class TaskSizeChecker implements TaskChecker { + private enum LOCATED {DONE, NONE, FAIL}; + private final long maxHeapSizePerServer; + private final Map serverRequestSizes = new HashMap<>(); + private final Map busyServers = new HashMap<>(); + private final ClusterConnection con; + private final TableName tableName; + private LOCATED hasLocated = LOCATED.NONE; + private boolean addNewOne = false; + private boolean isFull = false; + TaskSizeChecker(final ClusterConnection con, final TableName tableName, final long maxHeapSizePerServer) { + this.maxHeapSizePerServer = maxHeapSizePerServer; + this.con = con; + this.tableName = tableName; + } + boolean isUlimited() { + return maxHeapSizePerServer <= 0; + } + @Override + public ReturnCode canTakeOperation(HRegionLocation loc, long requestSize) { + addNewOne = false; + isFull = false; + if (isUlimited()) { + return ReturnCode.INCLUDE; + } + if (isFull(requestSize)) { + return ReturnCode.END; + } + // Is it ok for limit of request size? + long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) ? serverRequestSizes.get(loc.getServerName()) : 0L; + // accept at least one request + if (currentRequestSize == 0 || currentRequestSize + requestSize <= maxHeapSizePerServer) { + addNewOne = true; + return ReturnCode.INCLUDE; + } + isFull = true; + return ReturnCode.SKIP; + } + boolean isAnyBusyServer() { + for (Map.Entry entry : busyServers.entrySet()) { + if (entry.getValue()) { + return true; + } + } + return false; + } + boolean isFull(final long requestSize) { + // Locates the servers if any server is busy. + if (hasLocated == LOCATED.NONE && isAnyBusyServer()) { + hasLocated = LOCATED.DONE; + try { + List locs = con.locateRegions(tableName, true, false); + if (locs == null || locs.isEmpty()) { + hasLocated = LOCATED.FAIL; + } else { + for (HRegionLocation loc : locs) { + if (!busyServers.containsKey(loc.getServerName())) { + busyServers.put(loc.getServerName(), false); + } + } + } + } catch (IOException ex) { + LOG.error("Failed to locate the regions for table:" + tableName); + hasLocated = LOCATED.FAIL; + } + } + if (hasLocated == LOCATED.FAIL || busyServers.isEmpty()) { + return false; + } + for (Map.Entry entry : busyServers.entrySet()) { + if (!entry.getValue()) { + return false; + } + } + return true; + } + + @Override + public void visit(ReturnCode code, HRegionLocation loc, long requestSize) { + if (code == ReturnCode.INCLUDE && addNewOne) { + long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) ? serverRequestSizes.get(loc.getServerName()) : 0L; + serverRequestSizes.put(loc.getServerName(), currentRequestSize + requestSize); + } + if (code != ReturnCode.INCLUDE && isFull) { + busyServers.put(loc.getServerName(), true); + } + addNewOne = false; + isFull = false; + } + } + /** + * Provide a way to access the internal buffer. Example: + *
+   * BufferAccess access = get();
+   * T v;
+   * while ((v = access.take()) != null) {
+   *   // Use the element
+   *   if (unused(v)) {
+   *     access.restoreLast();
+   *   }
+   * }
+   * 
+ * + * @param + */ + @InterfaceAudience.Public + @InterfaceStability.Evolving + public interface RowAccess { + /** + * The behavior is equivalent to, for this access: + *
 {@code
+     * access.size() == 0;
+     * }
+ * @return true if there are no elements. + */ + boolean isEmpty(); + /** + * @return the number of elements which are untaken or restored. + */ + int size(); + + /** + * @return the number of elements which can be taken + */ + int getUntakedSize(); + + /** + * Retrieves and removes a element. The order of element is unspecified. The + * null value present no more data. + * + * @return a element + */ + T take(); + + /** + * Restore the last element returned by this BufferCollector. The element + * can't be took again until the {@link BufferAccess#recollect()} is called. + */ + void restoreLast(); + + /** + * Recollect the elements from the internal buffer. + */ + void recollect(); + } + @VisibleForTesting + static class ListRowAccess implements RowAccess { + private final List data; + private int countDown = 0; + private T last = null; + ListRowAccess(final List data) { + this.data = data; + countDown = data.size(); + } + + @Override + public int size() { + return data.size(); + } + + @Override + public T take() { + if (countDown <= 0) { + return null; + } + if (data.isEmpty()) { + countDown = 0; + return null; + } + last = data.remove(0); + --countDown; + return last; + } + + @Override + public void restoreLast() { + if (last != null) { + data.add(last); + last = null; + } + } + + @Override + public void recollect() { + countDown = data.size(); + } + + @Override + public int getUntakedSize() { + return countDown; + } + + @Override + public boolean isEmpty() { + return data.isEmpty(); + } + } } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index e98ad4e..7dfc068 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -27,11 +27,11 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import java.io.IOException; import java.io.InterruptedIOException; import java.util.Arrays; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** @@ -68,6 +68,12 @@ public class BufferedMutatorImpl implements BufferedMutator { @VisibleForTesting AtomicLong currentWriteBufferSize = new AtomicLong(0); + /** + * Count the size of {@link BufferedMutatorImpl#writeAsyncBuffer}. + * The {@link ConcurrentLinkedQueue#size()} is NOT a constant-time operation. + */ + @VisibleForTesting + AtomicInteger undealtMutationCount = new AtomicInteger(0); private long writeBufferSize; private final int maxKeyValueSize; private boolean closed = false; @@ -123,11 +129,13 @@ public class BufferedMutatorImpl implements BufferedMutator { } long toAddSize = 0; + int toAddCount = 0; for (Mutation m : ms) { if (m instanceof Put) { validatePut((Put) m); } toAddSize += m.heapSize(); + ++toAddCount; } // This behavior is highly non-intuitive... it does not protect us against @@ -136,10 +144,12 @@ public class BufferedMutatorImpl implements BufferedMutator { if (ap.hasError()) { currentWriteBufferSize.addAndGet(toAddSize); writeAsyncBuffer.addAll(ms); + undealtMutationCount.addAndGet(toAddCount); backgroundFlushCommits(true); } else { currentWriteBufferSize.addAndGet(toAddSize); writeAsyncBuffer.addAll(ms); + undealtMutationCount.addAndGet(toAddCount); } // Now try and queue what needs to be queued. @@ -202,41 +212,27 @@ public class BufferedMutatorImpl implements BufferedMutator { private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException, RetriesExhaustedWithDetailsException { - - LinkedList buffer = new LinkedList<>(); - // Keep track of the size so that this thread doesn't spin forever - long dequeuedSize = 0; - + if (!synchronous && writeAsyncBuffer.isEmpty()) { + return; + } + QueueRowAccess taker = null; try { - // Grab all of the available mutations. - Mutation m; - - // If there's no buffer size drain everything. If there is a buffersize drain up to twice - // that amount. This should keep the loop from continually spinning if there are threads - // that keep adding more data to the buffer. - while ( - (writeBufferSize <= 0 || dequeuedSize < (writeBufferSize * 2) || synchronous) - && (m = writeAsyncBuffer.poll()) != null) { - buffer.add(m); - long size = m.heapSize(); - dequeuedSize += size; - currentWriteBufferSize.addAndGet(-size); - } - - if (!synchronous && dequeuedSize == 0) { - return; - } - if (!synchronous) { - ap.submit(tableName, buffer, true, null, false); + taker = new QueueRowAccess(); + ap.submit(tableName, taker, true, null, false); if (ap.hasError()) { LOG.debug(tableName + ": One or more of the operations have failed -" + " waiting for all operation in progress to finish (successfully or not)"); } + taker.restoreRemainder(); + taker = null; } if (synchronous || ap.hasError()) { - while (!buffer.isEmpty()) { - ap.submit(tableName, buffer, true, null, false); + while (undealtMutationCount.get() != 0) { + taker = new QueueRowAccess(); + ap.submit(tableName, taker, true, null, false); + taker.restoreRemainder(); + taker = null; } RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString()); @@ -249,11 +245,8 @@ public class BufferedMutatorImpl implements BufferedMutator { } } } finally { - for (Mutation mut : buffer) { - long size = mut.heapSize(); - currentWriteBufferSize.addAndGet(size); - dequeuedSize -= size; - writeAsyncBuffer.add(mut); + if (taker != null) { + taker.restoreRemainder(); } } } @@ -279,4 +272,61 @@ public class BufferedMutatorImpl implements BufferedMutator { public long getWriteBufferSize() { return this.writeBufferSize; } + + private class QueueRowAccess implements AsyncProcess.RowAccess { + private int remainder = undealtMutationCount.getAndSet(0); + private Mutation last = null; + private int countDown = remainder; + @Override + public int size() { + return remainder; + } + void restoreRemainder() { + if (remainder > 0) { + undealtMutationCount.addAndGet(remainder); + remainder = 0; + countDown = 0; + } + } + @Override + public Row take() { + if (countDown <= 0) { + return null; + } + last = writeAsyncBuffer.poll(); + if (last == null) { + countDown = 0; + return null; + } + currentWriteBufferSize.addAndGet(-last.heapSize()); + --remainder; + --countDown; + return last; + } + + @Override + public void restoreLast() { + if (last != null) { + writeAsyncBuffer.add(last); + currentWriteBufferSize.addAndGet(last.heapSize()); + ++remainder; + last = null; + } + } + + @Override + public void recollect() { + countDown = remainder; + } + + @Override + public int getUntakedSize() { + return countDown; + } + + @Override + public boolean isEmpty() { + return remainder <= 0; + } + } } diff --git hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index d943316..c03e5d4 100644 --- hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -25,8 +25,10 @@ import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.BlockingQueue; @@ -57,6 +59,13 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; +import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl; +import org.apache.hadoop.hbase.client.AsyncProcess.ListRowAccess; +import org.apache.hadoop.hbase.client.AsyncProcess.TaskCountChecker; +import org.apache.hadoop.hbase.client.AsyncProcess.TaskChecker; +import org.apache.hadoop.hbase.client.AsyncProcess.TaskChecker.ReturnCode; +import org.apache.hadoop.hbase.client.AsyncProcess.TaskCheckerHost; +import org.apache.hadoop.hbase.client.AsyncProcess.TaskSizeChecker; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -65,13 +74,16 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.junit.Assert; -import static org.junit.Assert.assertTrue; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; import org.mockito.Mockito; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; @Category({ClientTests.class, MediumTests.class}) public class TestAsyncProcess { @@ -180,6 +192,13 @@ public class TestAsyncProcess { } @Override + public AsyncRequestFuture submit(TableName tableName, RowAccess rows, + boolean atLeastOne, Callback callback, boolean needResults) + throws InterruptedIOException { + // We use results in tests to check things, so override to always save them. + return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true); + } + @Override public AsyncRequestFuture submit(TableName tableName, List rows, boolean atLeastOne, Callback callback, boolean needResults) throws InterruptedIOException { @@ -434,8 +453,180 @@ public class TestAsyncProcess { return null; } } + @Test + public void testListRowAccess() { + int count = 10; + List values = new LinkedList<>(); + for (int i = 0; i != count; ++i) { + values.add(String.valueOf(i)); + } + ListRowAccess taker = new ListRowAccess(values); + + int restoreCount = 0; + int takeCount = 0; + String v; + while ((v = taker.take()) != null) { + assertEquals(String.valueOf(takeCount), v); + ++takeCount; + boolean restore = Math.random() > 0.5; + if (restore) { + ++restoreCount; + taker.restoreLast(); + } + } + assertEquals(count, takeCount); + assertEquals(values.size(), restoreCount); + + // Take the remainder. + taker.recollect(); + for (int i = 0; i != restoreCount; ++i) { + assertNotNull(taker.take()); + } + } + private static long calculateRequestCount(long putSizePerServer, long maxHeapSizePerServer) { + if (putSizePerServer <= maxHeapSizePerServer) { + return 1; + } else if (putSizePerServer % maxHeapSizePerServer == 0) { + return putSizePerServer / maxHeapSizePerServer; + } else { + return putSizePerServer / maxHeapSizePerServer + 1; + } + } + + @Test + public void testSubmitSameSizeOfRequest() throws Exception { + long writeBuffer = 2 * 1024 * 1024; + long putsHeapSize = writeBuffer; + doSubmitRequest(writeBuffer, putsHeapSize); + } @Test + public void testSubmitLargeRequestWithUnlimitedSize() throws Exception { + long maxHeapSizePerServer = -1; + long putsHeapSize = 2 * 1024 * 1024; + doSubmitRequest(maxHeapSizePerServer, putsHeapSize); + } + + @Test(timeout=300000) + public void testSubmitRandomSizeRequest() throws Exception { + Random rn = new Random(); + final long limit = 10 * 1024 * 1024; + for (int count = 0; count != 2; ++count) { + long maxHeapSizePerServer = Math.min(limit, (Math.abs(rn.nextLong()) % limit) + 1); + long putsHeapSize = Math.min(limit, (Math.abs(rn.nextLong()) % limit) + 1); + LOG.info("[testSubmitRandomSizeRequest] maxHeapSizePerServer=" + maxHeapSizePerServer + ", putsHeapSize=" + putsHeapSize); + doSubmitRequest(maxHeapSizePerServer, putsHeapSize); + } + } + + @Test + public void testSubmitSmallRequest() throws Exception { + long maxHeapSizePerServer = 2 * 1024 * 1024; + long putsHeapSize = 100; + doSubmitRequest(maxHeapSizePerServer, putsHeapSize); + } + + @Test(timeout=120000) + public void testSubmitLargeRequest() throws Exception { + long maxHeapSizePerServer = 2 * 1024 * 1024; + long putsHeapSize = maxHeapSizePerServer * 2; + doSubmitRequest(maxHeapSizePerServer, putsHeapSize); + } + + private void doSubmitRequest(long maxHeapSizePerServer, long putsHeapSize) throws Exception { + ClusterConnection conn = createHConnection(); + final long defaultPerServerHeapSize = conn.getConfiguration().getLong(HConstants.HBASE_CLIENT_MAX_PERSERVER_HEAPSIZE, + HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_HEAPSIZE); + conn.getConfiguration().setLong(HConstants.HBASE_CLIENT_MAX_PERSERVER_HEAPSIZE, maxHeapSizePerServer); + BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE); + + // sn has two regions + long putSizeSN = 0; + long putSizeSN2 = 0; + List puts = new ArrayList<>(); + while ((putSizeSN + putSizeSN2) <= putsHeapSize) { + Put put1 = new Put(DUMMY_BYTES_1); + put1.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1); + Put put2 = new Put(DUMMY_BYTES_2); + put2.addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2); + Put put3 = new Put(DUMMY_BYTES_3); + put3.addColumn(DUMMY_BYTES_3, DUMMY_BYTES_3, DUMMY_BYTES_3); + putSizeSN += (put1.heapSize() + put2.heapSize()); + putSizeSN2 += put3.heapSize(); + puts.add(put1); + puts.add(put2); + puts.add(put3); + } + LOG.info("Total put count:" + puts.size() + ", putSizeSN:"+ putSizeSN + ", putSizeSN2:" + putSizeSN2 + + ", maxHeapSizePerServer:" + maxHeapSizePerServer + + ", putsHeapSize:" + putsHeapSize); + + int expectedSnReqCount = maxHeapSizePerServer <= 0 ? 1 : (int) calculateRequestCount(putSizeSN, maxHeapSizePerServer); + int expectedSn2ReqCount = maxHeapSizePerServer <= 0 ? 1 : (int) calculateRequestCount(putSizeSN2, maxHeapSizePerServer); + + try (HTable ht = new HTable(conn, bufferParam)) { + MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); + ht.mutator.ap = ap; + + Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get()); + ht.put(puts); + List reqs = ap.allReqs; + + int actualSnReqCount = 0; + int actualSn2ReqCount = 0; + for (AsyncRequestFuture req : reqs) { + if (!(req instanceof AsyncRequestFutureImpl)) { + continue; + } + AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req; + if (ars.getRequestHeapSize().containsKey(sn)) { + ++actualSnReqCount; + } + if (ars.getRequestHeapSize().containsKey(sn2)) { + ++actualSn2ReqCount; + } + } + assertEquals(expectedSnReqCount, actualSnReqCount); + assertEquals(expectedSn2ReqCount, actualSn2ReqCount); + Map sizePerServers = new HashMap<>(); + for (AsyncRequestFuture req : reqs) { + if (!(req instanceof AsyncRequestFutureImpl)) { + continue; + } + AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req; + Map> requestHeapSize = ars.getRequestHeapSize(); + for (Map.Entry> entry : requestHeapSize.entrySet()) { + long sum = 0; + for (long size : entry.getValue()) { + if (maxHeapSizePerServer > 0) { + assertEquals(true, size <= maxHeapSizePerServer); + } + sum += size; + } + if (maxHeapSizePerServer <= 0) { + if (entry.getKey().equals(sn)) { + assertEquals(putSizeSN, sum); + } + if (entry.getKey().equals(sn2)) { + assertEquals(putSizeSN2, sum); + } + } else { + assertEquals(true, sum <= maxHeapSizePerServer); + } + long value = sizePerServers.containsKey(entry.getKey()) ? sizePerServers.get(entry.getKey()) : 0L; + sizePerServers.put(entry.getKey(), value + sum); + } + } + assertEquals(true, sizePerServers.containsKey(sn)); + assertEquals(true, sizePerServers.containsKey(sn2)); + assertEquals(false, sizePerServers.containsKey(sn3)); + assertEquals(putSizeSN, (long) sizePerServers.get(sn)); + assertEquals(putSizeSN2, (long) sizePerServers.get(sn2)); + } + // restore heapsize to default settings. + conn.getConfiguration().setLong(HConstants.HBASE_CLIENT_MAX_PERSERVER_HEAPSIZE, defaultPerServerHeapSize); + } + @Test public void testSubmit() throws Exception { ClusterConnection hc = createHConnection(); AsyncProcess ap = new MyAsyncProcess(hc, conf); @@ -476,7 +667,9 @@ public class TestAsyncProcess { List puts = new ArrayList(); puts.add(createPut(1, true)); - ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn); + for (int i = 0; i != ap.maxConcurrentTasksPerRegion; ++i) { + ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn); + } ap.submit(DUMMY_TABLE, puts, false, null, false); Assert.assertEquals(puts.size(), 1); @@ -537,7 +730,7 @@ public class TestAsyncProcess { public void testSubmitTrue() throws IOException { final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); ap.tasksInProgress.incrementAndGet(); - final AtomicInteger ai = new AtomicInteger(1); + final AtomicInteger ai = new AtomicInteger(ap.maxConcurrentTasksPerRegion); ap.taskCounterPerRegion.put(hri1.getRegionName(), ai); final AtomicBoolean checkPoint = new AtomicBoolean(false); @@ -671,6 +864,8 @@ public class TestAsyncProcess { setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1)); setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2)); setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3)); + Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean())) + .thenReturn(Arrays.asList(loc1, loc2, loc3)); setMockLocation(hc, FAILS, new RegionLocations(loc2)); return hc; } @@ -680,6 +875,18 @@ public class TestAsyncProcess { setMockLocation(hc, DUMMY_BYTES_1, hrls1); setMockLocation(hc, DUMMY_BYTES_2, hrls2); setMockLocation(hc, DUMMY_BYTES_3, hrls3); + List locations = new ArrayList(); + for (HRegionLocation loc : hrls1.getRegionLocations()) { + locations.add(loc); + } + for (HRegionLocation loc : hrls2.getRegionLocations()) { + locations.add(loc); + } + for (HRegionLocation loc : hrls3.getRegionLocations()) { + locations.add(loc); + } + Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean())) + .thenReturn(locations); return hc; } @@ -687,6 +894,8 @@ public class TestAsyncProcess { RegionLocations result) throws IOException { Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result); + Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), + Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn(result); } private static ClusterConnection createHConnectionCommon() { @@ -788,6 +997,146 @@ public class TestAsyncProcess { } @Test + public void testTaskCheckerHost() throws IOException { + final int maxTotalConcurrentTasks = 100; + final int maxConcurrentTasksPerServer = 2; + final int maxConcurrentTasksPerRegion = 1; + final AtomicLong tasksInProgress = new AtomicLong(0); + final Map taskCounterPerServer = new HashMap<>(); + final Map taskCounterPerRegion = new HashMap<>(); + TaskCountChecker countChecker = new TaskCountChecker( + maxTotalConcurrentTasks, + maxConcurrentTasksPerServer, + maxConcurrentTasksPerRegion, + tasksInProgress, taskCounterPerServer, taskCounterPerRegion); + final long maxHeapSizePerServer = 2 * 1024 * 1024; + // unlimiited + final ClusterConnection conn = createHConnection(); + TaskSizeChecker sizeChecker = new TaskSizeChecker(conn, DUMMY_TABLE, maxHeapSizePerServer); + TaskCheckerHost checkerHost = new TaskCheckerHost(Arrays.asList(countChecker, sizeChecker)); + + ReturnCode loc1Code = checkerHost.canTakeOperation(loc1, maxHeapSizePerServer); + assertEquals(TaskChecker.ReturnCode.INCLUDE, loc1Code); + + ReturnCode loc1Code_2 = checkerHost.canTakeOperation(loc1, maxHeapSizePerServer); + // rejected for size + assertNotEquals(TaskChecker.ReturnCode.INCLUDE, loc1Code_2); + + ReturnCode loc2Code = checkerHost.canTakeOperation(loc2, maxHeapSizePerServer); + // rejected for size + assertNotEquals(TaskChecker.ReturnCode.INCLUDE, loc2Code); + + // fill the task slots for loc3. + taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(100)); + taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(100)); + + ReturnCode loc3Code = checkerHost.canTakeOperation(loc3, 1L); + // rejected for count + assertNotEquals(TaskChecker.ReturnCode.INCLUDE, loc3Code); + + // release the task slots for loc3. + taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(0)); + taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(0)); + + ReturnCode loc3Code_2 = checkerHost.canTakeOperation(loc3, 1L); + assertEquals(TaskChecker.ReturnCode.INCLUDE, loc3Code_2); + } + + @Test + public void testTaskSizeChecker() throws IOException { + final long maxHeapSizePerServer = 2 * 1024 * 1024; + final ClusterConnection conn = createHConnection(); + TaskSizeChecker checker = new TaskSizeChecker(conn, DUMMY_TABLE, maxHeapSizePerServer); + + // inner state is unchanged. + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerServer); + assertEquals(TaskChecker.ReturnCode.INCLUDE, code); + } + + // add task to loc1 region. + ReturnCode acceptCode = checker.canTakeOperation(loc1, maxHeapSizePerServer); + assertEquals(TaskChecker.ReturnCode.INCLUDE, acceptCode); + checker.visit(acceptCode, loc1, maxHeapSizePerServer); + + // the sn server reachs the limit. + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(loc2, maxHeapSizePerServer); + assertNotEquals(TaskChecker.ReturnCode.INCLUDE, code); + } + + // the request to sn2 server should be accepted. + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(loc3, maxHeapSizePerServer); + assertEquals(TaskChecker.ReturnCode.INCLUDE, code); + } + } + + @Test + public void testTaskCountChecker() { + long requestSize = 12345; + int maxTotalConcurrentTasks = 100; + int maxConcurrentTasksPerServer = 2; + int maxConcurrentTasksPerRegion = 1; + AtomicLong tasksInProgress = new AtomicLong(0); + Map taskCounterPerServer = new HashMap<>(); + Map taskCounterPerRegion = new HashMap<>(); + TaskCountChecker checker = new TaskCountChecker( + maxTotalConcurrentTasks, + maxConcurrentTasksPerServer, + maxConcurrentTasksPerRegion, + tasksInProgress, taskCounterPerServer, taskCounterPerRegion); + + // inner state is unchanged. + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(loc1, requestSize); + assertEquals(TaskChecker.ReturnCode.INCLUDE, code); + } + // add loc1 region. + ReturnCode code = checker.canTakeOperation(loc1, requestSize); + assertEquals(TaskChecker.ReturnCode.INCLUDE, code); + checker.visit(code, loc1, requestSize); + + // fill the task slots for loc1. + taskCounterPerRegion.put(loc1.getRegionInfo().getRegionName(), new AtomicInteger(100)); + taskCounterPerServer.put(loc1.getServerName(), new AtomicInteger(100)); + + // the region was previously accepted, so it must be accpted now. + for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { + ReturnCode includeCode = checker.canTakeOperation(loc1, requestSize); + assertEquals(TaskChecker.ReturnCode.INCLUDE, includeCode); + checker.visit(includeCode, loc1, requestSize); + } + + // fill the task slots for loc3. + taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(100)); + taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(100)); + + // no task slots. + for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { + ReturnCode excludeCode = checker.canTakeOperation(loc3, requestSize); + assertNotEquals(TaskChecker.ReturnCode.INCLUDE, excludeCode); + checker.visit(excludeCode, loc3, requestSize); + } + + // release the tasks for loc3. + taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(0)); + taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(0)); + + // add loc3 region. + ReturnCode code3 = checker.canTakeOperation(loc3, requestSize); + assertEquals(TaskChecker.ReturnCode.INCLUDE, code3); + checker.visit(code3, loc3, requestSize); + + // the region was previously accepted, so it must be accpted now. + for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { + ReturnCode includeCode = checker.canTakeOperation(loc3, requestSize); + assertEquals(TaskChecker.ReturnCode.INCLUDE, includeCode); + checker.visit(includeCode, loc3, requestSize); + } + } + + @Test public void testBatch() throws IOException, InterruptedException { ClusterConnection conn = new MyConnectionImpl(conf); HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE)); @@ -817,7 +1166,6 @@ public class TestAsyncProcess { Assert.assertEquals(res[5], success); Assert.assertEquals(res[6], failure); } - @Test public void testErrorsServers() throws IOException { Configuration configuration = new Configuration(conf); diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 256c374..2c1b52f 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -715,7 +715,17 @@ public final class HConstants { /** * Default value of {@link #HBASE_CLIENT_MAX_PERREGION_TASKS}. */ - public static final int DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS = 1; + public static final int DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS = 2; + + /** + * The maximum size of task the client will maintain to a single RegionServer. + */ + public static final String HBASE_CLIENT_MAX_PERSERVER_HEAPSIZE = "hbase.client.max.perserver.heapsize"; + + /** + * Default value of {@link #HBASE_CLIENT_MAX_PERSERVER_HEAPSIZE}. + */ + public static final long DEFAULT_HBASE_CLIENT_MAX_PERSERVER_HEAPSIZE = 4194304; /** * Parameter name for server pause value, used mostly as value to wait before diff --git hbase-common/src/main/resources/hbase-default.xml hbase-common/src/main/resources/hbase-default.xml index 116c7d9..891c836 100644 --- hbase-common/src/main/resources/hbase-default.xml +++ hbase-common/src/main/resources/hbase-default.xml @@ -498,7 +498,7 @@ possible configurations would overwhelm and obscure the important. hbase.client.max.perregion.tasks - 1 + 2 The maximum number of concurrent connections the client will maintain to a single Region. That is, if there is already hbase.client.max.perregion.tasks writes in progress for this region, new puts diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java index e771a92..e84b9e4 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java @@ -69,7 +69,7 @@ import com.google.protobuf.Service; * mrmBuilder.addMutationRequest(m1); * mrmBuilder.addMutationRequest(m2); * CoprocessorRpcChannel channel = t.coprocessorService(ROW); - * MultiRowMutationService.BlockingInterface service = + * MultiRowMutationService.BlockingInterface service = * MultiRowMutationService.newBlockingStub(channel); * MutateRowsRequest mrm = mrmBuilder.build(); * service.mutateRows(null, mrm);