diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 780de18..f9037cd 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -26,7 +26,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -38,6 +37,9 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -59,8 +61,9 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.htrace.Trace; - import com.google.common.annotations.VisibleForTesting; +import java.util.TreeSet; +import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode; /** * This class allows a continuous flow of requests. It's written to be compatible with a @@ -126,6 +129,25 @@ class AsyncProcess { private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2; /** + * The maximum size of single RegionServer. + */ + public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = "hbase.client.max.perrequest.heapsize"; + + /** + * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE}. + */ + public static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304; + + /** + * The maximum size of submit. + */ + public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize"; + /** + * Default value of {@link #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE}. + */ + public static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE; + + /** * The context used to wait for results from one submit call. * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts), * then errors and failed operations in this object will reflect global errors. @@ -208,7 +230,6 @@ class AsyncProcess { new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); protected final ConcurrentMap taskCounterPerServer = new ConcurrentHashMap(); - // Start configuration settings. private final int startLogErrorsCnt; @@ -218,6 +239,11 @@ class AsyncProcess { protected final int maxTotalConcurrentTasks; /** + * The max heap size of all tasks simultaneously executed on a server. + */ + protected final long maxHeapSizePerRequest; + protected final long maxHeapSizeSubmit; + /** * The number of tasks we run in parallel on a single region. * With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start * a set of operations on a region before the previous one is done. As well, this limits @@ -278,7 +304,6 @@ class AsyncProcess { addresses.addAll(other.addresses); } } - public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory, int rpcTimeout) { @@ -305,7 +330,9 @@ class AsyncProcess { HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS); this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS); - + this.maxHeapSizePerRequest = conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, + DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); + this.maxHeapSizeSubmit = conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE); this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); @@ -320,7 +347,15 @@ class AsyncProcess { throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" + maxConcurrentTasksPerRegion); } + if (this.maxHeapSizePerRequest <= 0) { + throw new IllegalArgumentException("maxHeapSizePerServer=" + + maxHeapSizePerRequest); + } + if (this.maxHeapSizeSubmit <= 0) { + throw new IllegalArgumentException("maxHeapSizeSubmit=" + + maxHeapSizeSubmit); + } // Server tracker allows us to do faster, and yet useful (hopefully), retries. // However, if we are too useful, we might fail very quickly due to retry count limit. // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum @@ -365,7 +400,17 @@ class AsyncProcess { throws InterruptedIOException { return submit(null, tableName, rows, atLeastOne, callback, needResults); } - + public AsyncRequestFuture submit(TableName tableName, + final RowAccess rows, boolean atLeastOne, Batch.Callback callback, + boolean needResults) throws InterruptedIOException { + return submit(null, tableName, rows, atLeastOne, callback, needResults); + } + public AsyncRequestFuture submit(ExecutorService pool, TableName tableName, + List rows, boolean atLeastOne, Batch.Callback callback, + boolean needResults) throws InterruptedIOException { + return submit(pool, tableName, new ListRowAccess(rows), atLeastOne, + callback, needResults); + } /** * Extract from the rows list what we can submit. The rows we can not submit are kept in the * list. Does not send requests to replicas (not currently used for anything other @@ -379,7 +424,7 @@ class AsyncProcess { * @param atLeastOne true if we should submit at least a subset. */ public AsyncRequestFuture submit(ExecutorService pool, TableName tableName, - List rows, boolean atLeastOne, Batch.Callback callback, + RowAccess rows, boolean atLeastOne, Batch.Callback callback, boolean needResults) throws InterruptedIOException { if (rows.isEmpty()) { return NO_REQS_RESULT; @@ -388,31 +433,26 @@ class AsyncProcess { Map> actionsByServer = new HashMap>(); List> retainedActions = new ArrayList>(rows.size()); - NonceGenerator ng = this.connection.getNonceGenerator(); long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client. // Location errors that happen before we decide what requests to take. List locationErrors = null; List locationErrorRows = null; + RowCheckerHost checker = createRowCheckerHost(); + boolean firstItar = true; do { // Wait until there is at least one slot for a new task. waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString()); - - // Remember the previous decisions about regions or region servers we put in the - // final multi. - Map regionIncluded = new HashMap(); - Map serverIncluded = new HashMap(); - int posInList = -1; - Iterator it = rows.iterator(); - while (it.hasNext()) { - Row r = it.next(); + Row r; + if (!firstItar) { + rows.recollect(); + checker.reset(); + } + while ((r = rows.take()) != null) { HRegionLocation loc; try { - if (r == null) { - throw new IllegalArgumentException("#" + id + ", row cannot be null"); - } // Make sure we get 0-s replica. RegionLocations locs = connection.locateRegion( tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID); @@ -430,28 +470,43 @@ class AsyncProcess { retainedActions.add(new Action(r, ++posInList)); locationErrors.add(ex); locationErrorRows.add(posInList); - it.remove(); break; // Backward compat: we stop considering actions on location error. } - - if (canTakeOperation(loc, regionIncluded, serverIncluded)) { + long rowSize = (r instanceof Mutation) ? ((Mutation) r).heapSize() : 0; + ReturnCode code = checker.canTakeOperation(loc, rowSize); + if (code == ReturnCode.END) { + rows.restoreLast(); + break; + } else if (code == ReturnCode.INCLUDE) { Action action = new Action(r, ++posInList); setNonce(ng, r, action); retainedActions.add(action); // TODO: replica-get is not supported on this path byte[] regionName = loc.getRegionInfo().getRegionName(); addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); - it.remove(); + } else { + rows.restoreLast(); } } + firstItar = false; } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null)); - if (retainedActions.isEmpty()) return NO_REQS_RESULT; - return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults, locationErrors, locationErrorRows, actionsByServer, pool); } + private RowCheckerHost createRowCheckerHost() { + return new RowCheckerHost(Arrays.asList( + new TaskCountChecker(maxTotalConcurrentTasks, + maxConcurrentTasksPerServer, + maxConcurrentTasksPerRegion, + tasksInProgress, + taskCounterPerServer, + taskCounterPerRegion) + , new RequestSizeChecker(maxHeapSizePerRequest) + , new SubmittedSizeChecker(maxHeapSizeSubmit) + )); + } AsyncRequestFuture submitMultiActions(TableName tableName, List> retainedActions, long nonceGroup, Batch.Callback callback, Object[] results, boolean needResults, List locationErrors, @@ -493,74 +548,6 @@ class AsyncProcess { multiAction.add(regionName, action); } - - /** - * Check if we should send new operations to this region or region server. - * We're taking into account the past decision; if we have already accepted - * operation on a given region, we accept all operations for this region. - * - * @param loc; the region and the server name we want to use. - * @return true if this region is considered as busy. - */ - protected boolean canTakeOperation(HRegionLocation loc, - Map regionsIncluded, - Map serversIncluded) { - HRegionInfo regionInfo = loc.getRegionInfo(); - Boolean regionPrevious = regionsIncluded.get(regionInfo); - - if (regionPrevious != null) { - // We already know what to do with this region. - return regionPrevious; - } - - Boolean serverPrevious = serversIncluded.get(loc.getServerName()); - if (Boolean.FALSE.equals(serverPrevious)) { - // It's a new region, on a region server that we have already excluded. - regionsIncluded.put(regionInfo, Boolean.FALSE); - return false; - } - - AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName()); - if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) { - // Too many tasks on this region already. - regionsIncluded.put(regionInfo, Boolean.FALSE); - return false; - } - - if (serverPrevious == null) { - // The region is ok, but we need to decide for this region server. - int newServers = 0; // number of servers we're going to contact so far - for (Map.Entry kv : serversIncluded.entrySet()) { - if (kv.getValue()) { - newServers++; - } - } - - // Do we have too many total tasks already? - boolean ok = (newServers + tasksInProgress.get()) < maxTotalConcurrentTasks; - - if (ok) { - // If the total is fine, is it ok for this individual server? - AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName()); - ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer); - } - - if (!ok) { - regionsIncluded.put(regionInfo, Boolean.FALSE); - serversIncluded.put(loc.getServerName(), Boolean.FALSE); - return false; - } - - serversIncluded.put(loc.getServerName(), Boolean.TRUE); - } else { - assert serverPrevious.equals(Boolean.TRUE); - } - - regionsIncluded.put(regionInfo, Boolean.TRUE); - - return true; - } - /** * See {@link #submitAll(ExecutorService, TableName, List, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback, Object[])}. * Uses default ExecutorService for this AP (must have been created with one). @@ -739,7 +726,7 @@ class AsyncProcess { private final int numAttempt; private final ServerName server; private final Set callsInProgress; - + private Long heapSize = null; private SingleServerRequestRunnable( MultiAction multiAction, int numAttempt, ServerName server, Set callsInProgress) { @@ -749,6 +736,24 @@ class AsyncProcess { this.callsInProgress = callsInProgress; } + @VisibleForTesting + long heapSize() { + if (heapSize != null) { + return heapSize; + } + heapSize = 0L; + for (Map.Entry>> e: this.multiAction.actions.entrySet()) { + List> actions = e.getValue(); + for (Action action: actions) { + Row row = action.getAction(); + if (row instanceof Mutation) { + heapSize += ((Mutation) row).heapSize(); + } + } + } + return heapSize; + } + @Override public void run() { MultiResponse res; @@ -830,7 +835,7 @@ class AsyncProcess { private final long nonceGroup; private PayloadCarryingServerCallable currentCallable; private int currentCallTotalTimeout; - + private final Map> heapSizesByServer = new HashMap<>(); public AsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, ExecutorService pool, boolean needResults, Object[] results, Batch.Callback callback, PayloadCarryingServerCallable callable, int timeout) { @@ -909,7 +914,21 @@ class AsyncProcess { public Set getCallsInProgress() { return callsInProgress; } + @VisibleForTesting + Map> getRequestHeapSize() { + return heapSizesByServer; + } + private SingleServerRequestRunnable addSingleServerRequestHeapSize(ServerName server, + SingleServerRequestRunnable runnable) { + List heapCount = heapSizesByServer.get(server); + if (heapCount == null) { + heapCount = new LinkedList<>(); + heapSizesByServer.put(server, heapCount); + } + heapCount.add(runnable.heapSize()); + return runnable; + } /** * Group a list of actions per region servers, and send them. * @@ -1079,8 +1098,9 @@ class AsyncProcess { if (connection.getConnectionMetrics() != null) { connection.getConnectionMetrics().incrNormalRunners(); } - return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", - new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress))); + SingleServerRequestRunnable runnable = addSingleServerRequestHeapSize(server, + new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)); + return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable)); } // group the actions by the amount of delay @@ -1101,9 +1121,8 @@ class AsyncProcess { List toReturn = new ArrayList(actions.size()); for (DelayingRunner runner : actions.values()) { String traceText = "AsyncProcess.sendMultiAction"; - Runnable runnable = - new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, - callsInProgress); + Runnable runnable = addSingleServerRequestHeapSize(server, + new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress)); // use a delay runner only if we need to sleep for some time if (runner.getSleepTime() > 0) { runner.setRunner(runnable); @@ -1941,4 +1960,314 @@ class AsyncProcess { NO_RETRIES_EXHAUSTED, NO_OTHER_SUCCEEDED } + + /** + * Collect all advices from checkers and make the final decision. + */ + @VisibleForTesting + static class RowCheckerHost { + private final List checkers; + private boolean isEnd = false; + RowCheckerHost(final List checkers) { + this.checkers = checkers; + } + void reset() throws InterruptedIOException { + isEnd = false; + InterruptedIOException e = null; + for (RowChecker checker : checkers) { + try { + checker.reset(); + } catch (InterruptedIOException ex) { + e = ex; + } + } + if (e != null) { + throw e; + } + } + ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) { + if (isEnd) { + return ReturnCode.END; + } + ReturnCode code = ReturnCode.INCLUDE; + for (RowChecker checker : checkers) { + switch (checker.canTakeOperation(loc, rowSize)) { + case END: + isEnd = true; + code = ReturnCode.END; + break; + case SKIP: + code = ReturnCode.SKIP; + break; + case INCLUDE: + default: + break; + } + if (code == ReturnCode.END) { + break; + } + } + for (RowChecker checker : checkers) { + checker.visit(code, loc, rowSize); + } + return code; + } + } + + /** + * Provide a way to control the flow of rows iteration. + */ + @VisibleForTesting + interface RowChecker { + enum ReturnCode { + /** + * Accept current row. + */ + INCLUDE, + /** + * Skip current row. + */ + SKIP, + /** + * No more row can be included. + */ + END + }; + ReturnCode canTakeOperation(HRegionLocation loc, long rowSize); + /** + * Add the final ReturnCode to the checker. + * The ReturnCode may be reversed, so the checker need the final decision to update + * the inner state. + */ + void visit(ReturnCode code, HRegionLocation loc, long rowSize); + /** + * Reset the inner state. + */ + void reset() throws InterruptedIOException ; + } + + /** + * limit the heapsize of total submitted data. + * Reduce the limit of heapsize for submitting quickly + * if there is no running task. + */ + @VisibleForTesting + static class SubmittedSizeChecker implements RowChecker { + private final long maxHeapSizeSubmit; + private long heapSize = 0; + SubmittedSizeChecker(final long maxHeapSizeSubmit) { + this.maxHeapSizeSubmit = maxHeapSizeSubmit; + } + @Override + public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) { + if (heapSize >= maxHeapSizeSubmit) { + return ReturnCode.END; + } + return ReturnCode.INCLUDE; + } + + @Override + public void visit(ReturnCode code, HRegionLocation loc, long rowSize) { + if (code == ReturnCode.INCLUDE) { + heapSize += rowSize; + } + } + + @Override + public void reset() { + heapSize = 0; + } + } + /** + * limit the max number of task in a AsyncProcess. + */ + @VisibleForTesting + static class TaskCountChecker implements RowChecker { + private static final long MAX_WAITING_TIME = 1000; //ms + private final Set regionsIncluded = new HashSet<>(); + private final Set serversIncluded = new HashSet<>(); + private final int maxConcurrentTasksPerRegion; + private final int maxTotalConcurrentTasks; + private final int maxConcurrentTasksPerServer; + private final Map taskCounterPerRegion; + private final Map taskCounterPerServer; + private final Set busyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + private final AtomicLong tasksInProgress; + TaskCountChecker(final int maxTotalConcurrentTasks, + final int maxConcurrentTasksPerServer, + final int maxConcurrentTasksPerRegion, + final AtomicLong tasksInProgress, + final Map taskCounterPerServer, + final Map taskCounterPerRegion) { + this.maxTotalConcurrentTasks = maxTotalConcurrentTasks; + this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion; + this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer; + this.taskCounterPerRegion = taskCounterPerRegion; + this.taskCounterPerServer = taskCounterPerServer; + this.tasksInProgress = tasksInProgress; + } + @Override + public void reset() throws InterruptedIOException { + // prevent the busy-waiting + waitForRegion(); + regionsIncluded.clear(); + serversIncluded.clear(); + busyRegions.clear(); + } + private void waitForRegion() throws InterruptedIOException { + if (busyRegions.isEmpty()) { + return; + } + final long start = System.currentTimeMillis(); + while ((System.currentTimeMillis() - start) <= MAX_WAITING_TIME) { + for (byte[] region : busyRegions) { + AtomicInteger count = taskCounterPerRegion.get(region); + if (count == null || count.get() < maxConcurrentTasksPerRegion) { + return; + } + } + try { + synchronized (tasksInProgress) { + tasksInProgress.wait(10); + } + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted." + + " tasksInProgress=" + tasksInProgress); + } + } + } + /** + * 1) check the regions is allowed. + * 2) check the concurrent tasks for regions. + * 3) check the total concurrent tasks. + * 4) check the concurrent tasks for server. + * @param loc + * @param rowSize + * @return + */ + @Override + public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) { + + HRegionInfo regionInfo = loc.getRegionInfo(); + if (regionsIncluded.contains(regionInfo)) { + // We already know what to do with this region. + return ReturnCode.INCLUDE; + } + AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName()); + if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) { + // Too many tasks on this region already. + return ReturnCode.SKIP; + } + int newServers = serversIncluded.size() + + (serversIncluded.contains(loc.getServerName()) ? 0 : 1); + if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) { + // Too many tasks. + return ReturnCode.SKIP; + } + AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName()); + if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) { + // Too many tasks for this individual server + return ReturnCode.SKIP; + } + return ReturnCode.INCLUDE; + } + + @Override + public void visit(ReturnCode code, HRegionLocation loc, long rowSize) { + if (code == ReturnCode.INCLUDE) { + regionsIncluded.add(loc.getRegionInfo()); + serversIncluded.add(loc.getServerName()); + } + busyRegions.add(loc.getRegionInfo().getRegionName()); + } + } + + /** + * limit the request size for each regionserver. + */ + @VisibleForTesting + static class RequestSizeChecker implements RowChecker { + private final long maxHeapSizePerRequest; + private final Map serverRequestSizes = new HashMap<>(); + RequestSizeChecker(final long maxHeapSizePerRequest) { + this.maxHeapSizePerRequest = maxHeapSizePerRequest; + } + @Override + public void reset() { + serverRequestSizes.clear(); + } + @Override + public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) { + // Is it ok for limit of request size? + long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) ? + serverRequestSizes.get(loc.getServerName()) : 0L; + // accept at least one request + if (currentRequestSize == 0 || currentRequestSize + rowSize <= maxHeapSizePerRequest) { + return ReturnCode.INCLUDE; + } + return ReturnCode.SKIP; + } + + @Override + public void visit(ReturnCode code, HRegionLocation loc, long rowSize) { + if (code == ReturnCode.INCLUDE) { + long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) ? + serverRequestSizes.get(loc.getServerName()) : 0L; + serverRequestSizes.put(loc.getServerName(), currentRequestSize + rowSize); + } + } + } + + @VisibleForTesting + static class ListRowAccess implements RowAccess { + private final List data; + private int countDown = 0; + private T last = null; + ListRowAccess(final List data) { + this.data = data; + countDown = data.size(); + } + + @Override + public int size() { + return data.size(); + } + + @Override + public T take() { + if (countDown <= 0) { + return null; + } + if (data.isEmpty()) { + countDown = 0; + return null; + } + last = data.remove(0); + --countDown; + return last; + } + + @Override + public void restoreLast() { + if (last != null) { + data.add(last); + last = null; + } + } + + @Override + public void recollect() { + countDown = data.size(); + } + + @Override + public int getUntakedSize() { + return countDown; + } + + @Override + public boolean isEmpty() { + return data.isEmpty(); + } + } } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 37a38be..07408c7 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -28,11 +28,11 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import java.io.IOException; import java.io.InterruptedIOException; import java.util.Arrays; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** @@ -68,6 +68,12 @@ public class BufferedMutatorImpl implements BufferedMutator { @VisibleForTesting AtomicLong currentWriteBufferSize = new AtomicLong(0); + /** + * Count the size of {@link BufferedMutatorImpl#writeAsyncBuffer}. + * The {@link ConcurrentLinkedQueue#size()} is NOT a constant-time operation. + */ + @VisibleForTesting + AtomicInteger undealtMutationCount = new AtomicInteger(0); private long writeBufferSize; private final int maxKeyValueSize; private boolean closed = false; @@ -128,11 +134,13 @@ public class BufferedMutatorImpl implements BufferedMutator { } long toAddSize = 0; + int toAddCount = 0; for (Mutation m : ms) { if (m instanceof Put) { validatePut((Put) m); } toAddSize += m.heapSize(); + ++toAddCount; } // This behavior is highly non-intuitive... it does not protect us against @@ -141,14 +149,17 @@ public class BufferedMutatorImpl implements BufferedMutator { if (ap.hasError()) { currentWriteBufferSize.addAndGet(toAddSize); writeAsyncBuffer.addAll(ms); + undealtMutationCount.addAndGet(toAddCount); backgroundFlushCommits(true); } else { currentWriteBufferSize.addAndGet(toAddSize); writeAsyncBuffer.addAll(ms); + undealtMutationCount.addAndGet(toAddCount); } // Now try and queue what needs to be queued. - while (currentWriteBufferSize.get() > writeBufferSize) { + while (undealtMutationCount.get() != 0 + && currentWriteBufferSize.get() > writeBufferSize) { backgroundFlushCommits(false); } } @@ -207,41 +218,27 @@ public class BufferedMutatorImpl implements BufferedMutator { private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException, RetriesExhaustedWithDetailsException { - - LinkedList buffer = new LinkedList<>(); - // Keep track of the size so that this thread doesn't spin forever - long dequeuedSize = 0; - + if (!synchronous && writeAsyncBuffer.isEmpty()) { + return; + } + QueueRowAccess taker = null; try { - // Grab all of the available mutations. - Mutation m; - - // If there's no buffer size drain everything. If there is a buffersize drain up to twice - // that amount. This should keep the loop from continually spinning if there are threads - // that keep adding more data to the buffer. - while ( - (writeBufferSize <= 0 || dequeuedSize < (writeBufferSize * 2) || synchronous) - && (m = writeAsyncBuffer.poll()) != null) { - buffer.add(m); - long size = m.heapSize(); - dequeuedSize += size; - currentWriteBufferSize.addAndGet(-size); - } - - if (!synchronous && dequeuedSize == 0) { - return; - } - if (!synchronous) { - ap.submit(tableName, buffer, true, null, false); + taker = new QueueRowAccess(); + ap.submit(tableName, taker, true, null, false); if (ap.hasError()) { LOG.debug(tableName + ": One or more of the operations have failed -" + " waiting for all operation in progress to finish (successfully or not)"); } + taker.restoreRemainder(); + taker = null; } if (synchronous || ap.hasError()) { - while (!buffer.isEmpty()) { - ap.submit(tableName, buffer, true, null, false); + while (undealtMutationCount.get() != 0) { + taker = new QueueRowAccess(); + ap.submit(tableName, taker, true, null, false); + taker.restoreRemainder(); + taker = null; } RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString()); @@ -254,11 +251,8 @@ public class BufferedMutatorImpl implements BufferedMutator { } } } finally { - for (Mutation mut : buffer) { - long size = mut.heapSize(); - currentWriteBufferSize.addAndGet(size); - dequeuedSize -= size; - writeAsyncBuffer.add(mut); + if (taker != null) { + taker.restoreRemainder(); } } } @@ -294,4 +288,61 @@ public class BufferedMutatorImpl implements BufferedMutator { public List getWriteBuffer() { return Arrays.asList(writeAsyncBuffer.toArray(new Row[0])); } + + private class QueueRowAccess implements RowAccess { + private int remainder = undealtMutationCount.getAndSet(0); + private Mutation last = null; + private int countDown = remainder; + @Override + public int size() { + return remainder; + } + void restoreRemainder() { + if (remainder > 0) { + undealtMutationCount.addAndGet(remainder); + remainder = 0; + countDown = 0; + } + } + @Override + public Row take() { + if (countDown <= 0) { + return null; + } + last = writeAsyncBuffer.poll(); + if (last == null) { + countDown = 0; + return null; + } + currentWriteBufferSize.addAndGet(-last.heapSize()); + --remainder; + --countDown; + return last; + } + + @Override + public void restoreLast() { + if (last != null) { + writeAsyncBuffer.add(last); + currentWriteBufferSize.addAndGet(last.heapSize()); + ++remainder; + last = null; + } + } + + @Override + public void recollect() { + countDown = remainder; + } + + @Override + public int getUntakedSize() { + return countDown; + } + + @Override + public boolean isEmpty() { + return remainder <= 0; + } + } } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java new file mode 100644 index 0000000..0fb7b0f --- /dev/null +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Provide a way to access the inner buffer. + * The purpose is to reduce the elapsed time to move a large number + * of elements between collections. + * Use example: + *
+ * BufferAccess access = get();
+ * T v;
+ * while ((v = access.take()) != null) {
+ *   // Use the element
+ *   if (unused(v)) {
+ *     access.restoreLast();
+ *   }
+ * }
+ * 
+ * @param + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +@VisibleForTesting +interface RowAccess { + + /** + * The behavior is equivalent to, for this access: + *
 {@code
+   * access.size() == 0;
+   * }
+ * + * @return true if there are no elements. + */ + boolean isEmpty(); + + /** + * @return the number of elements which are untaken or restored. + */ + int size(); + + /** + * @return the number of elements which can be taken + */ + int getUntakedSize(); + + /** + * Retrieves and removes a element. The order of element is unspecified. The null value present no + * more data. + * + * @return a element + */ + T take(); + + /** + * Restore the last element returned by this BufferCollector. The element can't be took again + * until the {@link BufferAccess#recollect()} is called. + */ + void restoreLast(); + + /** + * Recollect the elements from the internal buffer. + * It can be viewed as to renew a iterator from a collection. + */ + void recollect(); +} diff --git hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 8fb2e8b..43470c1 100644 --- hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -58,6 +58,13 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; +import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl; +import org.apache.hadoop.hbase.client.AsyncProcess.TaskCountChecker; +import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode; +import org.apache.hadoop.hbase.client.AsyncProcess.RowCheckerHost; +import org.apache.hadoop.hbase.client.AsyncProcess.RequestSizeChecker; +import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker; +import org.apache.hadoop.hbase.client.AsyncProcess.SubmittedSizeChecker; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -65,6 +72,8 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.junit.Assert; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -179,6 +188,13 @@ public class TestAsyncProcess { } @Override + public AsyncRequestFuture submit(TableName tableName, RowAccess rows, + boolean atLeastOne, Callback callback, boolean needResults) + throws InterruptedIOException { + // We use results in tests to check things, so override to always save them. + return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true); + } + @Override public AsyncRequestFuture submit(TableName tableName, List rows, boolean atLeastOne, Callback callback, boolean needResults) throws InterruptedIOException { @@ -483,7 +499,9 @@ public class TestAsyncProcess { List puts = new ArrayList(); puts.add(createPut(1, true)); - ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn); + for (int i = 0; i != ap.maxConcurrentTasksPerRegion; ++i) { + ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn); + } ap.submit(DUMMY_TABLE, puts, false, null, false); Assert.assertEquals(puts.size(), 1); @@ -544,7 +562,7 @@ public class TestAsyncProcess { public void testSubmitTrue() throws IOException { final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); ap.tasksInProgress.incrementAndGet(); - final AtomicInteger ai = new AtomicInteger(1); + final AtomicInteger ai = new AtomicInteger(ap.maxConcurrentTasksPerRegion); ap.taskCounterPerRegion.put(hri1.getRegionName(), ai); final AtomicBoolean checkPoint = new AtomicBoolean(false); @@ -678,6 +696,8 @@ public class TestAsyncProcess { setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1)); setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2)); setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3)); + Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean())) + .thenReturn(Arrays.asList(loc1, loc2, loc3)); setMockLocation(hc, FAILS, new RegionLocations(loc2)); return hc; } @@ -687,6 +707,18 @@ public class TestAsyncProcess { setMockLocation(hc, DUMMY_BYTES_1, hrls1); setMockLocation(hc, DUMMY_BYTES_2, hrls2); setMockLocation(hc, DUMMY_BYTES_3, hrls3); + List locations = new ArrayList(); + for (HRegionLocation loc : hrls1.getRegionLocations()) { + locations.add(loc); + } + for (HRegionLocation loc : hrls2.getRegionLocations()) { + locations.add(loc); + } + for (HRegionLocation loc : hrls3.getRegionLocations()) { + locations.add(loc); + } + Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean())) + .thenReturn(locations); return hc; } @@ -694,6 +726,8 @@ public class TestAsyncProcess { RegionLocations result) throws IOException { Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result); + Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), + Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn(result); } private static ClusterConnection createHConnectionCommon() { @@ -793,6 +827,195 @@ public class TestAsyncProcess { } @Test + public void testTaskCheckerHost() throws IOException { + final int maxTotalConcurrentTasks = 100; + final int maxConcurrentTasksPerServer = 2; + final int maxConcurrentTasksPerRegion = 1; + final AtomicLong tasksInProgress = new AtomicLong(0); + final Map taskCounterPerServer = new HashMap<>(); + final Map taskCounterPerRegion = new HashMap<>(); + TaskCountChecker countChecker = new TaskCountChecker( + maxTotalConcurrentTasks, + maxConcurrentTasksPerServer, + maxConcurrentTasksPerRegion, + tasksInProgress, taskCounterPerServer, taskCounterPerRegion); + final long maxHeapSizePerRequest = 2 * 1024 * 1024; + // unlimiited + RequestSizeChecker sizeChecker = new RequestSizeChecker(maxHeapSizePerRequest); + RowCheckerHost checkerHost = new RowCheckerHost(Arrays.asList(countChecker, sizeChecker)); + + ReturnCode loc1Code = checkerHost.canTakeOperation(loc1, maxHeapSizePerRequest); + assertEquals(RowChecker.ReturnCode.INCLUDE, loc1Code); + + ReturnCode loc1Code_2 = checkerHost.canTakeOperation(loc1, maxHeapSizePerRequest); + // rejected for size + assertNotEquals(RowChecker.ReturnCode.INCLUDE, loc1Code_2); + + ReturnCode loc2Code = checkerHost.canTakeOperation(loc2, maxHeapSizePerRequest); + // rejected for size + assertNotEquals(RowChecker.ReturnCode.INCLUDE, loc2Code); + + // fill the task slots for loc3. + taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(100)); + taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(100)); + + ReturnCode loc3Code = checkerHost.canTakeOperation(loc3, 1L); + // rejected for count + assertNotEquals(RowChecker.ReturnCode.INCLUDE, loc3Code); + + // release the task slots for loc3. + taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(0)); + taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(0)); + + ReturnCode loc3Code_2 = checkerHost.canTakeOperation(loc3, 1L); + assertEquals(RowChecker.ReturnCode.INCLUDE, loc3Code_2); + } + + @Test + public void testRequestSizeCheckerr() throws IOException { + final long maxHeapSizePerRequest = 2 * 1024 * 1024; + final ClusterConnection conn = createHConnection(); + RequestSizeChecker checker = new RequestSizeChecker(maxHeapSizePerRequest); + + // inner state is unchanged. + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerRequest); + assertEquals(RowChecker.ReturnCode.INCLUDE, code); + code = checker.canTakeOperation(loc2, maxHeapSizePerRequest); + assertEquals(RowChecker.ReturnCode.INCLUDE, code); + } + + // accept the data located on loc1 region. + ReturnCode acceptCode = checker.canTakeOperation(loc1, maxHeapSizePerRequest); + assertEquals(RowChecker.ReturnCode.INCLUDE, acceptCode); + checker.visit(acceptCode, loc1, maxHeapSizePerRequest); + + // the sn server reachs the limit. + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerRequest); + assertNotEquals(RowChecker.ReturnCode.INCLUDE, code); + code = checker.canTakeOperation(loc2, maxHeapSizePerRequest); + assertNotEquals(RowChecker.ReturnCode.INCLUDE, code); + } + + // the request to sn2 server should be accepted. + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(loc3, maxHeapSizePerRequest); + assertEquals(RowChecker.ReturnCode.INCLUDE, code); + } + + checker.reset(); + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerRequest); + assertEquals(RowChecker.ReturnCode.INCLUDE, code); + code = checker.canTakeOperation(loc2, maxHeapSizePerRequest); + assertEquals(RowChecker.ReturnCode.INCLUDE, code); + } + } + + @Test + public void testSubmittedSizeChecker() { + final long maxHeapSizeSubmit = 2 * 1024 * 1024; + SubmittedSizeChecker checker = new SubmittedSizeChecker(maxHeapSizeSubmit); + + for (int i = 0; i != 10; ++i) { + ReturnCode include = checker.canTakeOperation(loc1, 100000); + assertEquals(ReturnCode.INCLUDE, include); + } + + for (int i = 0; i != 10; ++i) { + checker.visit(ReturnCode.INCLUDE, loc1, maxHeapSizeSubmit); + } + + for (int i = 0; i != 10; ++i) { + ReturnCode include = checker.canTakeOperation(loc1, 100000); + assertEquals(ReturnCode.END, include); + } + for (int i = 0; i != 10; ++i) { + ReturnCode include = checker.canTakeOperation(loc2, 100000); + assertEquals(ReturnCode.END, include); + } + checker.reset(); + for (int i = 0; i != 10; ++i) { + ReturnCode include = checker.canTakeOperation(loc1, 100000); + assertEquals(ReturnCode.INCLUDE, include); + } + } + @Test + public void testTaskCountChecker() throws InterruptedIOException { + long rowSize = 12345; + int maxTotalConcurrentTasks = 100; + int maxConcurrentTasksPerServer = 2; + int maxConcurrentTasksPerRegion = 1; + AtomicLong tasksInProgress = new AtomicLong(0); + Map taskCounterPerServer = new HashMap<>(); + Map taskCounterPerRegion = new HashMap<>(); + TaskCountChecker checker = new TaskCountChecker( + maxTotalConcurrentTasks, + maxConcurrentTasksPerServer, + maxConcurrentTasksPerRegion, + tasksInProgress, taskCounterPerServer, taskCounterPerRegion); + + // inner state is unchanged. + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(loc1, rowSize); + assertEquals(RowChecker.ReturnCode.INCLUDE, code); + } + // add loc1 region. + ReturnCode code = checker.canTakeOperation(loc1, rowSize); + assertEquals(RowChecker.ReturnCode.INCLUDE, code); + checker.visit(code, loc1, rowSize); + + // fill the task slots for loc1. + taskCounterPerRegion.put(loc1.getRegionInfo().getRegionName(), new AtomicInteger(100)); + taskCounterPerServer.put(loc1.getServerName(), new AtomicInteger(100)); + + // the region was previously accepted, so it must be accpted now. + for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { + ReturnCode includeCode = checker.canTakeOperation(loc1, rowSize); + assertEquals(RowChecker.ReturnCode.INCLUDE, includeCode); + checker.visit(includeCode, loc1, rowSize); + } + + // fill the task slots for loc3. + taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(100)); + taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(100)); + + // no task slots. + for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { + ReturnCode excludeCode = checker.canTakeOperation(loc3, rowSize); + assertNotEquals(RowChecker.ReturnCode.INCLUDE, excludeCode); + checker.visit(excludeCode, loc3, rowSize); + } + + // release the tasks for loc3. + taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(0)); + taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(0)); + + // add loc3 region. + ReturnCode code3 = checker.canTakeOperation(loc3, rowSize); + assertEquals(RowChecker.ReturnCode.INCLUDE, code3); + checker.visit(code3, loc3, rowSize); + + // the region was previously accepted, so it must be accpted now. + for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { + ReturnCode includeCode = checker.canTakeOperation(loc3, rowSize); + assertEquals(RowChecker.ReturnCode.INCLUDE, includeCode); + checker.visit(includeCode, loc3, rowSize); + } + + checker.reset(); + // the region was previously accepted, + // but checker have reseted and task slots for loc1 is full. + // So it must be rejected now. + for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { + ReturnCode includeCode = checker.canTakeOperation(loc1, rowSize); + assertNotEquals(RowChecker.ReturnCode.INCLUDE, includeCode); + checker.visit(includeCode, loc1, rowSize); + } + } + + @Test public void testBatch() throws IOException, InterruptedException { ClusterConnection conn = new MyConnectionImpl(conf); HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE)); @@ -822,7 +1045,6 @@ public class TestAsyncProcess { Assert.assertEquals(res[5], success); Assert.assertEquals(res[6], failure); } - @Test public void testErrorsServers() throws IOException { Configuration configuration = new Configuration(conf); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java index e771a92..e84b9e4 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java @@ -69,7 +69,7 @@ import com.google.protobuf.Service; * mrmBuilder.addMutationRequest(m1); * mrmBuilder.addMutationRequest(m2); * CoprocessorRpcChannel channel = t.coprocessorService(ROW); - * MultiRowMutationService.BlockingInterface service = + * MultiRowMutationService.BlockingInterface service = * MultiRowMutationService.newBlockingStub(channel); * MutateRowsRequest mrm = mrmBuilder.build(); * service.mutateRows(null, mrm);