From 0c5cdf2e9bb009f3c0a8bf7cea160da7872a7de6 Mon Sep 17 00:00:00 2001 From: chenheng Date: Wed, 14 Sep 2016 15:52:35 +0800 Subject: [PATCH] HBASE-16631 Extract AsyncRequestFuture relates code from AsyncProcess --- .../apache/hadoop/hbase/client/AsyncProcess.java | 1281 +------------------ .../hadoop/hbase/client/AsyncRequestFuture.java | 40 + .../hbase/client/AsyncRequestFutureImpl.java | 1289 ++++++++++++++++++++ .../org/apache/hadoop/hbase/client/HTable.java | 1 - .../hadoop/hbase/client/HTableMultiplexer.java | 1 - .../hadoop/hbase/client/TestAsyncProcess.java | 42 +- 6 files changed, 1389 insertions(+), 1265 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFuture.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 93b17bc..e431b8d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -25,13 +25,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.Date; @@ -124,7 +121,7 @@ class AsyncProcess { */ public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details"; - private final int thresholdToLogUndoneTaskDetails; + protected final int thresholdToLogUndoneTaskDetails; private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = "hbase.client.threshold.log.details"; private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10; @@ -150,21 +147,6 @@ class AsyncProcess { public static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE; /** - * The context used to wait for results from one submit call. - * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts), - * then errors and failed operations in this object will reflect global errors. - * 2) If submit call is made with needResults false, results will not be saved. - * */ - public static interface AsyncRequestFuture { - public boolean hasError(); - public RetriesExhaustedWithDetailsException getErrors(); - public List getFailedOperations(); - public Object[] getResults() throws InterruptedIOException; - /** Wait until all tasks are executed, successfully or not. */ - public void waitUntilDone() throws InterruptedIOException; - } - - /** * Return value from a submit that didn't contain any requests. */ private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() { @@ -195,28 +177,6 @@ class AsyncProcess { } }; - /** Sync point for calls to multiple replicas for the same user request (Get). - * Created and put in the results array (we assume replica calls require results) when - * the replica calls are launched. See results for details of this process. - * POJO, all fields are public. To modify them, the object itself is locked. */ - private static class ReplicaResultState { - public ReplicaResultState(int callCount) { - this.callCount = callCount; - } - - /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */ - int callCount; - /** Errors for which it is not decided whether we will report them to user. If one of the - * calls succeeds, we will discard the errors that may have happened in the other calls. */ - BatchErrors replicaErrors = null; - - @Override - public String toString() { - return "[call count " + callCount + "; errors " + replicaErrors + "]"; - } - } - - // TODO: many of the fields should be made private protected final long id; @@ -232,7 +192,7 @@ class AsyncProcess { protected final ConcurrentMap taskCounterPerServer = new ConcurrentHashMap(); // Start configuration settings. - private final int startLogErrorsCnt; + protected final int startLogErrorsCnt; /** * The number of tasks simultaneously executed on the cluster. @@ -262,13 +222,13 @@ class AsyncProcess { protected int timeout; protected long primaryCallTimeoutMicroseconds; /** Whether to log details for batch errors */ - private final boolean logBatchErrorDetails; + protected final boolean logBatchErrorDetails; // End configuration settings. - protected static class BatchErrors { - private final List throwables = new ArrayList(); - private final List actions = new ArrayList(); - private final List addresses = new ArrayList(); + static class BatchErrors { + final List throwables = new ArrayList(); + final List actions = new ArrayList(); + final List addresses = new ArrayList(); public synchronized void add(Throwable ex, Row row, ServerName serverName) { if (row == null){ @@ -284,7 +244,7 @@ class AsyncProcess { return !throwables.isEmpty(); } - private synchronized RetriesExhaustedWithDetailsException makeException(boolean logDetails) { + synchronized RetriesExhaustedWithDetailsException makeException(boolean logDetails) { if (logDetails) { LOG.error("Exception occurred! Exception details: " + throwables + ";\nActions: " + actions); @@ -305,6 +265,7 @@ class AsyncProcess { addresses.addAll(other.addresses); } } + public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory, int rpcTimeout) { @@ -383,7 +344,7 @@ class AsyncProcess { * @return pool if non null, otherwise returns this.pool if non null, otherwise throws * RuntimeException */ - private ExecutorService getPool(ExecutorService pool) { + protected ExecutorService getPool(ExecutorService pool) { if (pool != null) { return pool; } @@ -528,7 +489,7 @@ class AsyncProcess { int originalIndex = locationErrorRows.get(i); Row row = retainedActions.get(originalIndex).getAction(); ars.manageError(originalIndex, row, - Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null); + AsyncRequestFutureImpl.Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null); } } ars.sendMultiAction(actionsByServer, 1, null, false); @@ -538,12 +499,13 @@ class AsyncProcess { /** * Helper that is used when grouping the actions per region server. * - * @param loc - the destination. Must not be null. + * @param server - server + * @param regionName - regionName * @param action - the action to add to the multiaction * @param actionsByServer the multiaction per server * @param nonceGroup Nonce group. */ - private static void addAction(ServerName server, byte[] regionName, Action action, + public static void addAction(ServerName server, byte[] regionName, Action action, Map> actionsByServer, long nonceGroup) { MultiAction multiAction = actionsByServer.get(server); if (multiAction == null) { @@ -603,1195 +565,13 @@ class AsyncProcess { action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled. } - /** - * The context, and return value, for a single submit/submitAll call. - * Note on how this class (one AP submit) works. Initially, all requests are split into groups - * by server; request is sent to each server in parallel; the RPC calls are not async so a - * thread per server is used. Every time some actions fail, regions/locations might have - * changed, so we re-group them by server and region again and send these groups in parallel - * too. The result, in case of retries, is a "tree" of threads, with parent exiting after - * scheduling children. This is why lots of code doesn't require any synchronization. - */ - protected class AsyncRequestFutureImpl implements AsyncRequestFuture { - - /** - * Runnable (that can be submitted to thread pool) that waits for when it's time - * to issue replica calls, finds region replicas, groups the requests by replica and - * issues the calls (on separate threads, via sendMultiAction). - * This is done on a separate thread because we don't want to wait on user thread for - * our asynchronous call, and usually we have to wait before making replica calls. - */ - private final class ReplicaCallIssuingRunnable implements Runnable { - private final long startTime; - private final List> initialActions; - - public ReplicaCallIssuingRunnable(List> initialActions, long startTime) { - this.initialActions = initialActions; - this.startTime = startTime; - } - - @Override - public void run() { - boolean done = false; - if (primaryCallTimeoutMicroseconds > 0) { - try { - done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds); - } catch (InterruptedException ex) { - LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage()); - return; - } - } - if (done) return; // Done within primary timeout - Map> actionsByServer = - new HashMap>(); - List> unknownLocActions = new ArrayList>(); - if (replicaGetIndices == null) { - for (int i = 0; i < results.length; ++i) { - addReplicaActions(i, actionsByServer, unknownLocActions); - } - } else { - for (int replicaGetIndice : replicaGetIndices) { - addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions); - } - } - if (!actionsByServer.isEmpty()) { - sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty()); - } - if (!unknownLocActions.isEmpty()) { - actionsByServer = new HashMap>(); - for (Action action : unknownLocActions) { - addReplicaActionsAgain(action, actionsByServer); - } - // Some actions may have completely failed, they are handled inside addAgain. - if (!actionsByServer.isEmpty()) { - sendMultiAction(actionsByServer, 1, null, true); - } - } - } - - /** - * Add replica actions to action map by server. - * @param index Index of the original action. - * @param actionsByServer The map by server to add it to. - */ - private void addReplicaActions(int index, Map> actionsByServer, - List> unknownReplicaActions) { - if (results[index] != null) return; // opportunistic. Never goes from non-null to null. - Action action = initialActions.get(index); - RegionLocations loc = findAllLocationsOrFail(action, true); - if (loc == null) return; - HRegionLocation[] locs = loc.getRegionLocations(); - if (locs.length == 1) { - LOG.warn("No replicas found for " + action.getAction()); - return; - } - synchronized (replicaResultLock) { - // Don't run replica calls if the original has finished. We could do it e.g. if - // original has already failed before first replica call (unlikely given retries), - // but that would require additional synchronization w.r.t. returning to caller. - if (results[index] != null) return; - // We set the number of calls here. After that any path must call setResult/setError. - // True even for replicas that are not found - if we refuse to send we MUST set error. - results[index] = new ReplicaResultState(locs.length); - } - for (int i = 1; i < locs.length; ++i) { - Action replicaAction = new Action(action, i); - if (locs[i] != null) { - addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(), - replicaAction, actionsByServer, nonceGroup); - } else { - unknownReplicaActions.add(replicaAction); - } - } - } - - private void addReplicaActionsAgain( - Action action, Map> actionsByServer) { - if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) { - throw new AssertionError("Cannot have default replica here"); - } - HRegionLocation loc = getReplicaLocationOrFail(action); - if (loc == null) return; - addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(), - action, actionsByServer, nonceGroup); - } - } - - /** - * Runnable (that can be submitted to thread pool) that submits MultiAction to a - * single server. The server call is synchronous, therefore we do it on a thread pool. - */ - private final class SingleServerRequestRunnable implements Runnable { - private final MultiAction multiAction; - private final int numAttempt; - private final ServerName server; - private final Set callsInProgress; - private Long heapSize = null; - private SingleServerRequestRunnable( - MultiAction multiAction, int numAttempt, ServerName server, - Set callsInProgress) { - this.multiAction = multiAction; - this.numAttempt = numAttempt; - this.server = server; - this.callsInProgress = callsInProgress; - } - - @VisibleForTesting - long heapSize() { - if (heapSize != null) { - return heapSize; - } - heapSize = 0L; - for (Map.Entry>> e: this.multiAction.actions.entrySet()) { - List> actions = e.getValue(); - for (Action action: actions) { - Row row = action.getAction(); - if (row instanceof Mutation) { - heapSize += ((Mutation) row).heapSize(); - } - } - } - return heapSize; - } - - @Override - public void run() { - AbstractResponse res = null; - CancellableRegionServerCallable callable = currentCallable; - try { - // setup the callable based on the actions, if we don't have one already from the request - if (callable == null) { - callable = createCallable(server, tableName, multiAction); - } - RpcRetryingCaller caller = createCaller(callable); - try { - if (callsInProgress != null) { - callsInProgress.add(callable); - } - res = caller.callWithoutRetries(callable, currentCallTotalTimeout); - if (res == null) { - // Cancelled - return; - } - } catch (IOException e) { - // The service itself failed . It may be an error coming from the communication - // layer, but, as well, a functional error raised by the server. - receiveGlobalFailure(multiAction, server, numAttempt, e); - return; - } catch (Throwable t) { - // This should not happen. Let's log & retry anyway. - LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." + - " Retrying. Server is " + server + ", tableName=" + tableName, t); - receiveGlobalFailure(multiAction, server, numAttempt, t); - return; - } - if (res.type() == AbstractResponse.ResponseType.MULTI) { - // Normal case: we received an answer from the server, and it's not an exception. - receiveMultiAction(multiAction, server, (MultiResponse) res, numAttempt); - } else { - if (results != null) { - SingleResponse singleResponse = (SingleResponse) res; - results[0] = singleResponse.getEntry(); - } - decActionCounter(1); - } - } catch (Throwable t) { - // Something really bad happened. We are on the send thread that will now die. - LOG.error("Internal AsyncProcess #" + id + " error for " - + tableName + " processing for " + server, t); - throw new RuntimeException(t); - } finally { - decTaskCounters(multiAction.getRegions(), server); - if (callsInProgress != null && callable != null && res != null) { - callsInProgress.remove(callable); - } - } - } - } - - private final Batch.Callback callback; - private final BatchErrors errors; - private final ConnectionImplementation.ServerErrorTracker errorsByServer; - private final ExecutorService pool; - private final Set callsInProgress; - - - private final TableName tableName; - private final AtomicLong actionsInProgress = new AtomicLong(-1); - /** - * The lock controls access to results. It is only held when populating results where - * there might be several callers (eventual consistency gets). For other requests, - * there's one unique call going on per result index. - */ - private final Object replicaResultLock = new Object(); - /** - * Result array. Null if results are not needed. Otherwise, each index corresponds to - * the action index in initial actions submitted. For most request types, has null-s for - * requests that are not done, and result/exception for those that are done. - * For eventual-consistency gets, initially the same applies; at some point, replica calls - * might be started, and ReplicaResultState is put at the corresponding indices. The - * returning calls check the type to detect when this is the case. After all calls are done, - * ReplicaResultState-s are replaced with results for the user. - */ - private final Object[] results; - /** - * Indices of replica gets in results. If null, all or no actions are replica-gets. - */ - private final int[] replicaGetIndices; - private final boolean hasAnyReplicaGets; - private final long nonceGroup; - private CancellableRegionServerCallable currentCallable; - private int currentCallTotalTimeout; - private final Map> heapSizesByServer = new HashMap<>(); - public AsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, - ExecutorService pool, boolean needResults, Object[] results, - Batch.Callback callback, CancellableRegionServerCallable callable, int timeout) { - this.pool = pool; - this.callback = callback; - this.nonceGroup = nonceGroup; - this.tableName = tableName; - this.actionsInProgress.set(actions.size()); - if (results != null) { - assert needResults; - if (results.length != actions.size()) { - throw new AssertionError("results.length"); - } - this.results = results; - for (int i = 0; i != this.results.length; ++i) { - results[i] = null; - } - } else { - this.results = needResults ? new Object[actions.size()] : null; - } - List replicaGetIndices = null; - boolean hasAnyReplicaGets = false; - if (needResults) { - // Check to see if any requests might require replica calls. - // We expect that many requests will consist of all or no multi-replica gets; in such - // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will - // store the list of action indexes for which replica gets are possible, and set - // hasAnyReplicaGets to true. - boolean hasAnyNonReplicaReqs = false; - int posInList = 0; - for (Action action : actions) { - boolean isReplicaGet = isReplicaGet(action.getAction()); - if (isReplicaGet) { - hasAnyReplicaGets = true; - if (hasAnyNonReplicaReqs) { // Mixed case - if (replicaGetIndices == null) { - replicaGetIndices = new ArrayList(actions.size() - 1); - } - replicaGetIndices.add(posInList); - } - } else if (!hasAnyNonReplicaReqs) { - // The first non-multi-replica request in the action list. - hasAnyNonReplicaReqs = true; - if (posInList > 0) { - // Add all the previous requests to the index lists. We know they are all - // replica-gets because this is the first non-multi-replica request in the list. - replicaGetIndices = new ArrayList(actions.size() - 1); - for (int i = 0; i < posInList; ++i) { - replicaGetIndices.add(i); - } - } - } - ++posInList; - } - } - this.hasAnyReplicaGets = hasAnyReplicaGets; - if (replicaGetIndices != null) { - this.replicaGetIndices = new int[replicaGetIndices.size()]; - int i = 0; - for (Integer el : replicaGetIndices) { - this.replicaGetIndices[i++] = el; - } - } else { - this.replicaGetIndices = null; - } - this.callsInProgress = !hasAnyReplicaGets ? null : - Collections.newSetFromMap( - new ConcurrentHashMap()); - - this.errorsByServer = createServerErrorTracker(); - this.errors = (globalErrors != null) ? globalErrors : new BatchErrors(); - this.currentCallable = callable; - this.currentCallTotalTimeout = timeout; - } - - public Set getCallsInProgress() { - return callsInProgress; - } - @VisibleForTesting - Map> getRequestHeapSize() { - return heapSizesByServer; - } - - private SingleServerRequestRunnable addSingleServerRequestHeapSize(ServerName server, - SingleServerRequestRunnable runnable) { - List heapCount = heapSizesByServer.get(server); - if (heapCount == null) { - heapCount = new LinkedList<>(); - heapSizesByServer.put(server, heapCount); - } - heapCount.add(runnable.heapSize()); - return runnable; - } - /** - * Group a list of actions per region servers, and send them. - * - * @param currentActions - the list of row to submit - * @param numAttempt - the current numAttempt (first attempt is 1) - */ - private void groupAndSendMultiAction(List> currentActions, int numAttempt) { - Map> actionsByServer = - new HashMap>(); - - boolean isReplica = false; - List> unknownReplicaActions = null; - for (Action action : currentActions) { - RegionLocations locs = findAllLocationsOrFail(action, true); - if (locs == null) continue; - boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); - if (isReplica && !isReplicaAction) { - // This is the property of the current implementation, not a requirement. - throw new AssertionError("Replica and non-replica actions in the same retry"); - } - isReplica = isReplicaAction; - HRegionLocation loc = locs.getRegionLocation(action.getReplicaId()); - if (loc == null || loc.getServerName() == null) { - if (isReplica) { - if (unknownReplicaActions == null) { - unknownReplicaActions = new ArrayList>(); - } - unknownReplicaActions.add(action); - } else { - // TODO: relies on primary location always being fetched - manageLocationError(action, null); - } - } else { - byte[] regionName = loc.getRegionInfo().getRegionName(); - addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); - } - } - boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets); - boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty(); - - if (!actionsByServer.isEmpty()) { - // If this is a first attempt to group and send, no replicas, we need replica thread. - sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown) - ? currentActions : null, numAttempt > 1 && !hasUnknown); - } - - if (hasUnknown) { - actionsByServer = new HashMap>(); - for (Action action : unknownReplicaActions) { - HRegionLocation loc = getReplicaLocationOrFail(action); - if (loc == null) continue; - byte[] regionName = loc.getRegionInfo().getRegionName(); - addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); - } - if (!actionsByServer.isEmpty()) { - sendMultiAction( - actionsByServer, numAttempt, doStartReplica ? currentActions : null, true); - } - } - } - - private HRegionLocation getReplicaLocationOrFail(Action action) { - // We are going to try get location once again. For each action, we'll do it once - // from cache, because the previous calls in the loop might populate it. - int replicaId = action.getReplicaId(); - RegionLocations locs = findAllLocationsOrFail(action, true); - if (locs == null) return null; // manageError already called - HRegionLocation loc = locs.getRegionLocation(replicaId); - if (loc == null || loc.getServerName() == null) { - locs = findAllLocationsOrFail(action, false); - if (locs == null) return null; // manageError already called - loc = locs.getRegionLocation(replicaId); - } - if (loc == null || loc.getServerName() == null) { - manageLocationError(action, null); - return null; - } - return loc; - } - - private void manageLocationError(Action action, Exception ex) { - String msg = "Cannot get replica " + action.getReplicaId() - + " location for " + action.getAction(); - LOG.error(msg); - if (ex == null) { - ex = new IOException(msg); - } - manageError(action.getOriginalIndex(), action.getAction(), - Retry.NO_LOCATION_PROBLEM, ex, null); - } - - private RegionLocations findAllLocationsOrFail(Action action, boolean useCache) { - if (action.getAction() == null) throw new IllegalArgumentException("#" + id + - ", row cannot be null"); - RegionLocations loc = null; - try { - loc = connection.locateRegion( - tableName, action.getAction().getRow(), useCache, true, action.getReplicaId()); - } catch (IOException ex) { - manageLocationError(action, ex); - } - return loc; - } - - /** - * Send a multi action structure to the servers, after a delay depending on the attempt - * number. Asynchronous. - * - * @param actionsByServer the actions structured by regions - * @param numAttempt the attempt number. - * @param actionsForReplicaThread original actions for replica thread; null on non-first call. - */ - private void sendMultiAction(Map> actionsByServer, - int numAttempt, List> actionsForReplicaThread, boolean reuseThread) { - // Run the last item on the same thread if we are already on a send thread. - // We hope most of the time it will be the only item, so we can cut down on threads. - int actionsRemaining = actionsByServer.size(); - // This iteration is by server (the HRegionLocation comparator is by server portion only). - for (Map.Entry> e : actionsByServer.entrySet()) { - ServerName server = e.getKey(); - MultiAction multiAction = e.getValue(); - Collection runnables = getNewMultiActionRunnable(server, multiAction, - numAttempt); - // make sure we correctly count the number of runnables before we try to reuse the send - // thread, in case we had to split the request into different runnables because of backoff - if (runnables.size() > actionsRemaining) { - actionsRemaining = runnables.size(); - } - - // run all the runnables - for (Runnable runnable : runnables) { - if ((--actionsRemaining == 0) && reuseThread) { - runnable.run(); - } else { - try { - pool.submit(runnable); - } catch (Throwable t) { - if (t instanceof RejectedExecutionException) { - // This should never happen. But as the pool is provided by the end user, - // let's secure this a little. - LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." + - " Server is " + server.getServerName(), t); - } else { - // see #HBASE-14359 for more details - LOG.warn("Caught unexpected exception/error: ", t); - } - decTaskCounters(multiAction.getRegions(), server); - // We're likely to fail again, but this will increment the attempt counter, - // so it will finish. - receiveGlobalFailure(multiAction, server, numAttempt, t); - } - } - } - } - - if (actionsForReplicaThread != null) { - startWaitingForReplicaCalls(actionsForReplicaThread); - } - } - - private Collection getNewMultiActionRunnable(ServerName server, - MultiAction multiAction, - int numAttempt) { - // no stats to manage, just do the standard action - if (AsyncProcess.this.connection.getStatisticsTracker() == null) { - if (connection.getConnectionMetrics() != null) { - connection.getConnectionMetrics().incrNormalRunners(); - } - incTaskCounters(multiAction.getRegions(), server); - SingleServerRequestRunnable runnable = addSingleServerRequestHeapSize(server, - new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)); - return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable)); - } - - // group the actions by the amount of delay - Map actions = new HashMap(multiAction - .size()); - - // split up the actions - for (Map.Entry>> e : multiAction.actions.entrySet()) { - Long backoff = getBackoff(server, e.getKey()); - DelayingRunner runner = actions.get(backoff); - if (runner == null) { - actions.put(backoff, new DelayingRunner(backoff, e)); - } else { - runner.add(e); - } - } - - List toReturn = new ArrayList(actions.size()); - for (DelayingRunner runner : actions.values()) { - incTaskCounters(runner.getActions().getRegions(), server); - String traceText = "AsyncProcess.sendMultiAction"; - Runnable runnable = addSingleServerRequestHeapSize(server, - new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress)); - // use a delay runner only if we need to sleep for some time - if (runner.getSleepTime() > 0) { - runner.setRunner(runnable); - traceText = "AsyncProcess.clientBackoff.sendMultiAction"; - runnable = runner; - if (connection.getConnectionMetrics() != null) { - connection.getConnectionMetrics().incrDelayRunners(); - connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime()); - } - } else { - if (connection.getConnectionMetrics() != null) { - connection.getConnectionMetrics().incrNormalRunners(); - } - } - runnable = Trace.wrap(traceText, runnable); - toReturn.add(runnable); - - } - return toReturn; - } - - /** - * @param server server location where the target region is hosted - * @param regionName name of the region which we are going to write some data - * @return the amount of time the client should wait until it submit a request to the - * specified server and region - */ - private Long getBackoff(ServerName server, byte[] regionName) { - ServerStatisticTracker tracker = AsyncProcess.this.connection.getStatisticsTracker(); - ServerStatistics stats = tracker.getStats(server); - return AsyncProcess.this.connection.getBackoffPolicy() - .getBackoffTime(server, regionName, stats); - } - - /** - * Starts waiting to issue replica calls on a different thread; or issues them immediately. - */ - private void startWaitingForReplicaCalls(List> actionsForReplicaThread) { - long startTime = EnvironmentEdgeManager.currentTime(); - ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable( - actionsForReplicaThread, startTime); - if (primaryCallTimeoutMicroseconds == 0) { - // Start replica calls immediately. - replicaRunnable.run(); - } else { - // Start the thread that may kick off replica gets. - // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea. - try { - pool.submit(replicaRunnable); - } catch (RejectedExecutionException ree) { - LOG.warn("#" + id + ", replica task was rejected by the pool - no replica calls", ree); - } - } - } - - /** - * Check that we can retry acts accordingly: logs, set the error status. - * - * @param originalIndex the position in the list sent - * @param row the row - * @param canRetry if false, we won't retry whatever the settings. - * @param throwable the throwable, if any (can be null) - * @param server the location, if any (can be null) - * @return true if the action can be retried, false otherwise. - */ - public Retry manageError(int originalIndex, Row row, Retry canRetry, - Throwable throwable, ServerName server) { - if (canRetry == Retry.YES - && throwable != null && throwable instanceof DoNotRetryIOException) { - canRetry = Retry.NO_NOT_RETRIABLE; - } - - if (canRetry != Retry.YES) { - // Batch.Callback was not called on failure in 0.94. We keep this. - setError(originalIndex, row, throwable, server); - } else if (isActionComplete(originalIndex, row)) { - canRetry = Retry.NO_OTHER_SUCCEEDED; - } - return canRetry; - } - - /** - * Resubmit all the actions from this multiaction after a failure. - * - * @param rsActions the actions still to do from the initial list - * @param server the destination - * @param numAttempt the number of attempts so far - * @param t the throwable (if any) that caused the resubmit - */ - private void receiveGlobalFailure( - MultiAction rsActions, ServerName server, int numAttempt, Throwable t) { - errorsByServer.reportServerError(server); - Retry canRetry = errorsByServer.canTryMore(numAttempt) - ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED; - - if (tableName == null && ClientExceptionsUtil.isMetaClearingException(t)) { - // tableName is null when we made a cross-table RPC call. - connection.clearCaches(server); - } - int failed = 0, stopped = 0; - List> toReplay = new ArrayList>(); - for (Map.Entry>> e : rsActions.actions.entrySet()) { - byte[] regionName = e.getKey(); - byte[] row = e.getValue().iterator().next().getAction().getRow(); - // Do not use the exception for updating cache because it might be coming from - // any of the regions in the MultiAction. - try { - if (tableName != null) { - connection.updateCachedLocations(tableName, regionName, row, - ClientExceptionsUtil.isMetaClearingException(t) ? null : t, server); - } - } catch (Throwable ex) { - // That should never happen, but if it did, we want to make sure - // we still process errors - LOG.error("Couldn't update cached region locations: " + ex); - } - for (Action action : e.getValue()) { - Retry retry = manageError( - action.getOriginalIndex(), action.getAction(), canRetry, t, server); - if (retry == Retry.YES) { - toReplay.add(action); - } else if (retry == Retry.NO_OTHER_SUCCEEDED) { - ++stopped; - } else { - ++failed; - } - } - } - - if (toReplay.isEmpty()) { - logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped); - } else { - resubmit(server, toReplay, numAttempt, rsActions.size(), t); - } - } - - /** - * Log as much info as possible, and, if there is something to replay, - * submit it again after a back off sleep. - */ - private void resubmit(ServerName oldServer, List> toReplay, - int numAttempt, int failureCount, Throwable throwable) { - // We have something to replay. We're going to sleep a little before. - - // We have two contradicting needs here: - // 1) We want to get the new location after having slept, as it may change. - // 2) We want to take into account the location when calculating the sleep time. - // 3) If all this is just because the response needed to be chunked try again FAST. - // It should be possible to have some heuristics to take the right decision. Short term, - // we go for one. - boolean retryImmediately = throwable instanceof RetryImmediatelyException; - int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1; - long backOffTime = retryImmediately ? 0 : - errorsByServer.calculateBackoffTime(oldServer, pause); - if (numAttempt > startLogErrorsCnt) { - // We use this value to have some logs when we have multiple failures, but not too many - // logs, as errors are to be expected when a region moves, splits and so on - LOG.info(createLog(numAttempt, failureCount, toReplay.size(), - oldServer, throwable, backOffTime, true, null, -1, -1)); - } - - try { - if (backOffTime > 0) { - Thread.sleep(backOffTime); - } - } catch (InterruptedException e) { - LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e); - Thread.currentThread().interrupt(); - return; - } - - groupAndSendMultiAction(toReplay, nextAttemptNumber); - } - - private void logNoResubmit(ServerName oldServer, int numAttempt, - int failureCount, Throwable throwable, int failed, int stopped) { - if (failureCount != 0 || numAttempt > startLogErrorsCnt + 1) { - String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString(); - String logMessage = createLog(numAttempt, failureCount, 0, oldServer, - throwable, -1, false, timeStr, failed, stopped); - if (failed != 0) { - // Only log final failures as warning - LOG.warn(logMessage); - } else { - LOG.info(logMessage); - } - } - } - - /** - * Called when we receive the result of a server query. - * - * @param multiAction - the multiAction we sent - * @param server - the location. It's used as a server name. - * @param responses - the response, if any - * @param numAttempt - the attempt - */ - private void receiveMultiAction(MultiAction multiAction, - ServerName server, MultiResponse responses, int numAttempt) { - assert responses != null; - - // Success or partial success - // Analyze detailed results. We can still have individual failures to be redo. - // two specific throwables are managed: - // - DoNotRetryIOException: we continue to retry for other actions - // - RegionMovedException: we update the cache with the new region location - - List> toReplay = new ArrayList>(); - Throwable throwable = null; - int failureCount = 0; - boolean canRetry = true; - - Map results = responses.getResults(); - updateStats(server, results); - - int failed = 0, stopped = 0; - // Go by original action. - for (Map.Entry>> regionEntry : multiAction.actions.entrySet()) { - byte[] regionName = regionEntry.getKey(); - Map regionResults = results.get(regionName) == null - ? null : results.get(regionName).result; - if (regionResults == null) { - if (!responses.getExceptions().containsKey(regionName)) { - LOG.error("Server sent us neither results nor exceptions for " - + Bytes.toStringBinary(regionName)); - responses.getExceptions().put(regionName, new RuntimeException("Invalid response")); - } - continue; - } - boolean regionFailureRegistered = false; - for (Action sentAction : regionEntry.getValue()) { - Object result = regionResults.get(sentAction.getOriginalIndex()); - // Failure: retry if it's make sense else update the errors lists - if (result == null || result instanceof Throwable) { - Row row = sentAction.getAction(); - throwable = ClientExceptionsUtil.findException(result); - // Register corresponding failures once per server/once per region. - if (!regionFailureRegistered) { - regionFailureRegistered = true; - try { - connection.updateCachedLocations( - tableName, regionName, row.getRow(), result, server); - } catch (Throwable ex) { - // That should never happen, but if it did, we want to make sure - // we still process errors - LOG.error("Couldn't update cached region locations: " + ex); - } - } - if (failureCount == 0) { - errorsByServer.reportServerError(server); - // We determine canRetry only once for all calls, after reporting server failure. - canRetry = errorsByServer.canTryMore(numAttempt); - } - ++failureCount; - Retry retry = manageError(sentAction.getOriginalIndex(), row, - canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable) result, server); - if (retry == Retry.YES) { - toReplay.add(sentAction); - } else if (retry == Retry.NO_OTHER_SUCCEEDED) { - ++stopped; - } else { - ++failed; - } - } else { - if (callback != null) { - try { - //noinspection unchecked - // TODO: would callback expect a replica region name if it gets one? - this.callback.update(regionName, sentAction.getAction().getRow(), (CResult) result); - } catch (Throwable t) { - LOG.error("User callback threw an exception for " - + Bytes.toStringBinary(regionName) + ", ignoring", t); - } - } - setResult(sentAction, result); - } - } - } - - // The failures global to a region. We will use for multiAction we sent previously to find the - // actions to replay. - for (Map.Entry throwableEntry : responses.getExceptions().entrySet()) { - throwable = throwableEntry.getValue(); - byte[] region = throwableEntry.getKey(); - List> actions = multiAction.actions.get(region); - if (actions == null || actions.isEmpty()) { - throw new IllegalStateException("Wrong response for the region: " + - HRegionInfo.encodeRegionName(region)); - } - - if (failureCount == 0) { - errorsByServer.reportServerError(server); - canRetry = errorsByServer.canTryMore(numAttempt); - } - if (null == tableName && ClientExceptionsUtil.isMetaClearingException(throwable)) { - // For multi-actions, we don't have a table name, but we want to make sure to clear the - // cache in case there were location-related exceptions. We don't to clear the cache - // for every possible exception that comes through, however. - connection.clearCaches(server); - } else { - try { - connection.updateCachedLocations( - tableName, region, actions.get(0).getAction().getRow(), throwable, server); - } catch (Throwable ex) { - // That should never happen, but if it did, we want to make sure - // we still process errors - LOG.error("Couldn't update cached region locations: " + ex); - } - } - failureCount += actions.size(); - - for (Action action : actions) { - Row row = action.getAction(); - Retry retry = manageError(action.getOriginalIndex(), row, - canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server); - if (retry == Retry.YES) { - toReplay.add(action); - } else if (retry == Retry.NO_OTHER_SUCCEEDED) { - ++stopped; - } else { - ++failed; - } - } - } - if (toReplay.isEmpty()) { - logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped); - } else { - resubmit(server, toReplay, numAttempt, failureCount, throwable); - } - } - - private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn, - Throwable error, long backOffTime, boolean willRetry, String startTime, - int failed, int stopped) { - StringBuilder sb = new StringBuilder(); - sb.append("#").append(id).append(", table=").append(tableName).append(", ") - .append("attempt=").append(numAttempt) - .append("/").append(numTries).append(" "); - - if (failureCount > 0 || error != null){ - sb.append("failed=").append(failureCount).append("ops").append(", last exception: "). - append(error == null ? "null" : error); - } else { - sb.append("succeeded"); - } - - sb.append(" on ").append(sn).append(", tracking started ").append(startTime); - - if (willRetry) { - sb.append(", retrying after=").append(backOffTime).append("ms"). - append(", replay=").append(replaySize).append("ops"); - } else if (failureCount > 0) { - if (stopped > 0) { - sb.append("; not retrying ").append(stopped).append(" due to success from other replica"); - } - if (failed > 0) { - sb.append("; not retrying ").append(failed).append(" - final failure"); - } - - } - - return sb.toString(); - } - - /** - * Sets the non-error result from a particular action. - * @param action Action (request) that the server responded to. - * @param result The result. - */ - private void setResult(Action action, Object result) { - if (result == null) { - throw new RuntimeException("Result cannot be null"); - } - ReplicaResultState state = null; - boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); - int index = action.getOriginalIndex(); - if (results == null) { - decActionCounter(index); - return; // Simple case, no replica requests. - } - state = trySetResultSimple(index, action.getAction(), false, result, null, isStale); - if (state == null) { - return; // Simple case, no replica requests. - } - // At this point we know that state is set to replica tracking class. - // It could be that someone else is also looking at it; however, we know there can - // only be one state object, and only one thread can set callCount to 0. Other threads - // will either see state with callCount 0 after locking it; or will not see state at all - // we will replace it with the result. - synchronized (state) { - if (state.callCount == 0) { - return; // someone already set the result - } - state.callCount = 0; - } - synchronized (replicaResultLock) { - if (results[index] != state) { - throw new AssertionError("We set the callCount but someone else replaced the result"); - } - results[index] = result; - } - - decActionCounter(index); - } - - /** - * Sets the error from a particular action. - * @param index Original action index. - * @param row Original request. - * @param throwable The resulting error. - * @param server The source server. - */ - private void setError(int index, Row row, Throwable throwable, ServerName server) { - ReplicaResultState state = null; - if (results == null) { - // Note that we currently cannot have replica requests with null results. So it shouldn't - // happen that multiple replica calls will call dAC for same actions with results == null. - // Only one call per action should be present in this case. - errors.add(throwable, row, server); - decActionCounter(index); - return; // Simple case, no replica requests. - } - state = trySetResultSimple(index, row, true, throwable, server, false); - if (state == null) { - return; // Simple case, no replica requests. - } - BatchErrors target = null; // Error will be added to final errors, or temp replica errors. - boolean isActionDone = false; - synchronized (state) { - switch (state.callCount) { - case 0: return; // someone already set the result - case 1: { // All calls failed, we are the last error. - target = errors; - isActionDone = true; - break; - } - default: { - assert state.callCount > 1; - if (state.replicaErrors == null) { - state.replicaErrors = new BatchErrors(); - } - target = state.replicaErrors; - break; - } - } - --state.callCount; - } - target.add(throwable, row, server); - if (isActionDone) { - if (state.replicaErrors != null) { // last call, no need to lock - errors.merge(state.replicaErrors); - } - // See setResult for explanations. - synchronized (replicaResultLock) { - if (results[index] != state) { - throw new AssertionError("We set the callCount but someone else replaced the result"); - } - results[index] = throwable; - } - decActionCounter(index); - } - } - - /** - * Checks if the action is complete; used on error to prevent needless retries. - * Does not synchronize, assuming element index/field accesses are atomic. - * This is an opportunistic optimization check, doesn't have to be strict. - * @param index Original action index. - * @param row Original request. - */ - private boolean isActionComplete(int index, Row row) { - if (!isReplicaGet(row)) return false; - Object resObj = results[index]; - return (resObj != null) && (!(resObj instanceof ReplicaResultState) - || ((ReplicaResultState)resObj).callCount == 0); - } - - /** - * Tries to set the result or error for a particular action as if there were no replica calls. - * @return null if successful; replica state if there were in fact replica calls. - */ - private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError, - Object result, ServerName server, boolean isFromReplica) { - Object resObj = null; - if (!isReplicaGet(row)) { - if (isFromReplica) { - throw new AssertionError("Unexpected stale result for " + row); - } - results[index] = result; - } else { - synchronized (replicaResultLock) { - resObj = results[index]; - if (resObj == null) { - if (isFromReplica) { - throw new AssertionError("Unexpected stale result for " + row); - } - results[index] = result; - } - } - } - - ReplicaResultState rrs = - (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null; - if (rrs == null && isError) { - // The resObj is not replica state (null or already set). - errors.add((Throwable)result, row, server); - } - - if (resObj == null) { - // resObj is null - no replica calls were made. - decActionCounter(index); - return null; - } - return rrs; - } - - private void decActionCounter(int index) { - long actionsRemaining = actionsInProgress.decrementAndGet(); - if (actionsRemaining < 0) { - String error = buildDetailedErrorMsg("Incorrect actions in progress", index); - throw new AssertionError(error); - } else if (actionsRemaining == 0) { - synchronized (actionsInProgress) { - actionsInProgress.notifyAll(); - } - } - } - - private String buildDetailedErrorMsg(String string, int index) { - StringBuilder error = new StringBuilder(128); - error.append(string).append("; called for ").append(index).append(", actionsInProgress ") - .append(actionsInProgress.get()).append("; replica gets: "); - if (replicaGetIndices != null) { - for (int i = 0; i < replicaGetIndices.length; ++i) { - error.append(replicaGetIndices[i]).append(", "); - } - } else { - error.append(hasAnyReplicaGets ? "all" : "none"); - } - error.append("; results "); - if (results != null) { - for (int i = 0; i < results.length; ++i) { - Object o = results[i]; - error.append(((o == null) ? "null" : o.toString())).append(", "); - } - } - return error.toString(); - } - - @Override - public void waitUntilDone() throws InterruptedIOException { - try { - waitUntilDone(Long.MAX_VALUE); - } catch (InterruptedException iex) { - throw new InterruptedIOException(iex.getMessage()); - } finally { - if (callsInProgress != null) { - for (CancellableRegionServerCallable clb : callsInProgress) { - clb.cancel(); - } - } - } - } - - private boolean waitUntilDone(long cutoff) throws InterruptedException { - boolean hasWait = cutoff != Long.MAX_VALUE; - long lastLog = EnvironmentEdgeManager.currentTime(); - long currentInProgress; - while (0 != (currentInProgress = actionsInProgress.get())) { - long now = EnvironmentEdgeManager.currentTime(); - if (hasWait && (now * 1000L) > cutoff) { - return false; - } - if (!hasWait) { // Only log if wait is infinite. - if (now > lastLog + 10000) { - lastLog = now; - LOG.info("#" + id + ", waiting for " + currentInProgress - + " actions to finish on table: " + tableName); - if (currentInProgress <= thresholdToLogUndoneTaskDetails) { - logDetailsOfUndoneTasks(currentInProgress); - } - } - } - synchronized (actionsInProgress) { - if (actionsInProgress.get() == 0) break; - if (!hasWait) { - actionsInProgress.wait(10); - } else { - long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L)); - TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond); - } - } - } - return true; - } - - @Override - public boolean hasError() { - return errors.hasErrors(); - } - - @Override - public List getFailedOperations() { - return errors.actions; - } - - @Override - public RetriesExhaustedWithDetailsException getErrors() { - return errors.makeException(logBatchErrorDetails); - } - - @Override - public Object[] getResults() throws InterruptedIOException { - waitUntilDone(); - return results; - } - } - - @VisibleForTesting - protected void updateStats(ServerName server, Map results) { - boolean metrics = AsyncProcess.this.connection.getConnectionMetrics() != null; - boolean stats = AsyncProcess.this.connection.getStatisticsTracker() != null; - if (!stats && !metrics) { - return; - } - for (Map.Entry regionStats : results.entrySet()) { - byte[] regionName = regionStats.getKey(); - ClientProtos.RegionLoadStats stat = regionStats.getValue().getStat(); - RegionLoadStats regionLoadstats = ProtobufUtil.createRegionLoadStats(stat); - ResultStatsUtil.updateStats(AsyncProcess.this.connection.getStatisticsTracker(), server, - regionName, regionLoadstats); - ResultStatsUtil.updateStats(AsyncProcess.this.connection.getConnectionMetrics(), - server, regionName, regionLoadstats); - } - } - protected AsyncRequestFutureImpl createAsyncRequestFuture( TableName tableName, List> actions, long nonceGroup, ExecutorService pool, Batch.Callback callback, Object[] results, boolean needResults, CancellableRegionServerCallable callable, int curTimeout) { return new AsyncRequestFutureImpl( tableName, actions, nonceGroup, getPool(pool), needResults, - results, callback, callable, curTimeout); - } - - /** - * Create a callable. Isolated to be easily overridden in the tests. - */ - @VisibleForTesting - protected MultiServerCallable createCallable(final ServerName server, - TableName tableName, final MultiAction multi) { - return new MultiServerCallable(connection, tableName, server, this.rpcFactory, multi); - } - - /** - * Create a caller. Isolated to be easily overridden in the tests. - */ - @VisibleForTesting - protected RpcRetryingCaller createCaller( - CancellableRegionServerCallable callable) { - return rpcCallerFactory. newCaller(); + results, callback, callable, curTimeout, this); } @VisibleForTesting @@ -1839,7 +619,7 @@ class AsyncProcess { } } - private void logDetailsOfUndoneTasks(long taskInProgress) { + void logDetailsOfUndoneTasks(long taskInProgress) { ArrayList servers = new ArrayList(); for (Map.Entry entry : taskCounterPerServer.entrySet()) { if (entry.getValue().get() > 0) { @@ -1861,7 +641,7 @@ class AsyncProcess { /** * Only used w/useGlobalErrors ctor argument, for HTable backward compat. * @return Whether there were any errors in any request since the last time - * {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created. + * {@link #waitForAllPreviousOpsAndReset(List, String)} was called, or AP was created. */ public boolean hasError() { return globalErrors.hasErrors(); @@ -1872,9 +652,9 @@ class AsyncProcess { * Waits for all previous operations to finish, and returns errors and (optionally) * failed operations themselves. * @param failedRows an optional list into which the rows that failed since the last time - * {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created, are saved. + * {@link #waitForAllPreviousOpsAndReset(List, String)} was called, or AP was created, are saved. * @param tableName name of the table - * @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List)} + * @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List, String)} * was called, or AP was created. */ public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset( @@ -1934,6 +714,16 @@ class AsyncProcess { } /** + * Create a caller. Isolated to be easily overridden in the tests. + */ + @VisibleForTesting + protected RpcRetryingCaller createCaller( + CancellableRegionServerCallable callable) { + return rpcCallerFactory. newCaller(); + } + + + /** * Creates the server error tracker to use inside process. * Currently, to preserve the main assumption about current retries, and to work well with * the retry-limit-based calculation, the calculation is local per Process object. @@ -1945,21 +735,10 @@ class AsyncProcess { this.serverTrackerTimeout, this.numTries); } - private static boolean isReplicaGet(Row row) { + public static boolean isReplicaGet(Row row) { return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE); } - /** - * For {@link AsyncRequestFutureImpl#manageError(int, Row, Retry, Throwable, ServerName)}. Only - * used to make logging more clear, we don't actually care why we don't retry. - */ - private enum Retry { - YES, - NO_LOCATION_PROBLEM, - NO_NOT_RETRIABLE, - NO_RETRIES_EXHAUSTED, - NO_OTHER_SUCCEEDED - } /** * Collect all advices from checkers and make the final decision. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFuture.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFuture.java new file mode 100644 index 0000000..539e234 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFuture.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import java.io.InterruptedIOException; +import java.util.List; + +/** + * The context used to wait for results from one submit call. + * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts), + * then errors and failed operations in this object will reflect global errors. + * 2) If submit call is made with needResults false, results will not be saved. + * */ +@InterfaceAudience.Private +public interface AsyncRequestFuture { + public boolean hasError(); + public RetriesExhaustedWithDetailsException getErrors(); + public List getFailedOperations(); + public Object[] getResults() throws InterruptedIOException; + /** Wait until all tasks are executed, successfully or not. */ + public void waitUntilDone() throws InterruptedIOException; +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java new file mode 100644 index 0000000..7dfd19b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -0,0 +1,1289 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.RetryImmediatelyException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.htrace.Trace; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * The context, and return value, for a single submit/submitAll call. + * Note on how this class (one AP submit) works. Initially, all requests are split into groups + * by server; request is sent to each server in parallel; the RPC calls are not async so a + * thread per server is used. Every time some actions fail, regions/locations might have + * changed, so we re-group them by server and region again and send these groups in parallel + * too. The result, in case of retries, is a "tree" of threads, with parent exiting after + * scheduling children. This is why lots of code doesn't require any synchronization. + */ +@InterfaceAudience.Private +class AsyncRequestFutureImpl implements AsyncRequestFuture { + + private static final Log LOG = LogFactory.getLog(AsyncRequestFutureImpl.class); + + /** + * Runnable (that can be submitted to thread pool) that waits for when it's time + * to issue replica calls, finds region replicas, groups the requests by replica and + * issues the calls (on separate threads, via sendMultiAction). + * This is done on a separate thread because we don't want to wait on user thread for + * our asynchronous call, and usually we have to wait before making replica calls. + */ + private final class ReplicaCallIssuingRunnable implements Runnable { + private final long startTime; + private final List> initialActions; + + public ReplicaCallIssuingRunnable(List> initialActions, long startTime) { + this.initialActions = initialActions; + this.startTime = startTime; + } + + @Override + public void run() { + boolean done = false; + if (asyncProcess.primaryCallTimeoutMicroseconds > 0) { + try { + done = waitUntilDone(startTime * 1000L + asyncProcess.primaryCallTimeoutMicroseconds); + } catch (InterruptedException ex) { + LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage()); + return; + } + } + if (done) return; // Done within primary timeout + Map> actionsByServer = + new HashMap>(); + List> unknownLocActions = new ArrayList>(); + if (replicaGetIndices == null) { + for (int i = 0; i < results.length; ++i) { + addReplicaActions(i, actionsByServer, unknownLocActions); + } + } else { + for (int replicaGetIndice : replicaGetIndices) { + addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions); + } + } + if (!actionsByServer.isEmpty()) { + sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty()); + } + if (!unknownLocActions.isEmpty()) { + actionsByServer = new HashMap>(); + for (Action action : unknownLocActions) { + addReplicaActionsAgain(action, actionsByServer); + } + // Some actions may have completely failed, they are handled inside addAgain. + if (!actionsByServer.isEmpty()) { + sendMultiAction(actionsByServer, 1, null, true); + } + } + } + + /** + * Add replica actions to action map by server. + * @param index Index of the original action. + * @param actionsByServer The map by server to add it to. + */ + private void addReplicaActions(int index, Map> actionsByServer, + List> unknownReplicaActions) { + if (results[index] != null) return; // opportunistic. Never goes from non-null to null. + Action action = initialActions.get(index); + RegionLocations loc = findAllLocationsOrFail(action, true); + if (loc == null) return; + HRegionLocation[] locs = loc.getRegionLocations(); + if (locs.length == 1) { + LOG.warn("No replicas found for " + action.getAction()); + return; + } + synchronized (replicaResultLock) { + // Don't run replica calls if the original has finished. We could do it e.g. if + // original has already failed before first replica call (unlikely given retries), + // but that would require additional synchronization w.r.t. returning to caller. + if (results[index] != null) return; + // We set the number of calls here. After that any path must call setResult/setError. + // True even for replicas that are not found - if we refuse to send we MUST set error. + results[index] = new ReplicaResultState(locs.length); + } + for (int i = 1; i < locs.length; ++i) { + Action replicaAction = new Action(action, i); + if (locs[i] != null) { + asyncProcess.addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(), + replicaAction, actionsByServer, nonceGroup); + } else { + unknownReplicaActions.add(replicaAction); + } + } + } + + private void addReplicaActionsAgain( + Action action, Map> actionsByServer) { + if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) { + throw new AssertionError("Cannot have default replica here"); + } + HRegionLocation loc = getReplicaLocationOrFail(action); + if (loc == null) return; + asyncProcess.addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(), + action, actionsByServer, nonceGroup); + } + } + + /** + * Runnable (that can be submitted to thread pool) that submits MultiAction to a + * single server. The server call is synchronous, therefore we do it on a thread pool. + */ + private final class SingleServerRequestRunnable implements Runnable { + private final MultiAction multiAction; + private final int numAttempt; + private final ServerName server; + private final Set callsInProgress; + private Long heapSize = null; + private SingleServerRequestRunnable( + MultiAction multiAction, int numAttempt, ServerName server, + Set callsInProgress) { + this.multiAction = multiAction; + this.numAttempt = numAttempt; + this.server = server; + this.callsInProgress = callsInProgress; + } + + @VisibleForTesting + long heapSize() { + if (heapSize != null) { + return heapSize; + } + heapSize = 0L; + for (Map.Entry>> e: this.multiAction.actions.entrySet()) { + List> actions = e.getValue(); + for (Action action: actions) { + Row row = action.getAction(); + if (row instanceof Mutation) { + heapSize += ((Mutation) row).heapSize(); + } + } + } + return heapSize; + } + + @Override + public void run() { + AbstractResponse res = null; + CancellableRegionServerCallable callable = currentCallable; + try { + // setup the callable based on the actions, if we don't have one already from the request + if (callable == null) { + callable = createCallable(server, tableName, multiAction); + } + RpcRetryingCaller caller = asyncProcess.createCaller(callable); + try { + if (callsInProgress != null) { + callsInProgress.add(callable); + } + res = caller.callWithoutRetries(callable, currentCallTotalTimeout); + if (res == null) { + // Cancelled + return; + } + } catch (IOException e) { + // The service itself failed . It may be an error coming from the communication + // layer, but, as well, a functional error raised by the server. + receiveGlobalFailure(multiAction, server, numAttempt, e); + return; + } catch (Throwable t) { + // This should not happen. Let's log & retry anyway. + LOG.error("#" + asyncProcess.id + ", Caught throwable while calling. This is unexpected." + + " Retrying. Server is " + server + ", tableName=" + tableName, t); + receiveGlobalFailure(multiAction, server, numAttempt, t); + return; + } + if (res.type() == AbstractResponse.ResponseType.MULTI) { + // Normal case: we received an answer from the server, and it's not an exception. + receiveMultiAction(multiAction, server, (MultiResponse) res, numAttempt); + } else { + if (results != null) { + SingleResponse singleResponse = (SingleResponse) res; + results[0] = singleResponse.getEntry(); + } + decActionCounter(1); + } + } catch (Throwable t) { + // Something really bad happened. We are on the send thread that will now die. + LOG.error("Internal AsyncProcess #" + asyncProcess.id + " error for " + + tableName + " processing for " + server, t); + throw new RuntimeException(t); + } finally { + asyncProcess.decTaskCounters(multiAction.getRegions(), server); + if (callsInProgress != null && callable != null && res != null) { + callsInProgress.remove(callable); + } + } + } + } + + private final Batch.Callback callback; + private final AsyncProcess.BatchErrors errors; + private final ConnectionImplementation.ServerErrorTracker errorsByServer; + private final ExecutorService pool; + private final Set callsInProgress; + + + private final TableName tableName; + private final AtomicLong actionsInProgress = new AtomicLong(-1); + /** + * The lock controls access to results. It is only held when populating results where + * there might be several callers (eventual consistency gets). For other requests, + * there's one unique call going on per result index. + */ + private final Object replicaResultLock = new Object(); + /** + * Result array. Null if results are not needed. Otherwise, each index corresponds to + * the action index in initial actions submitted. For most request types, has null-s for + * requests that are not done, and result/exception for those that are done. + * For eventual-consistency gets, initially the same applies; at some point, replica calls + * might be started, and ReplicaResultState is put at the corresponding indices. The + * returning calls check the type to detect when this is the case. After all calls are done, + * ReplicaResultState-s are replaced with results for the user. + */ + private final Object[] results; + /** + * Indices of replica gets in results. If null, all or no actions are replica-gets. + */ + private final int[] replicaGetIndices; + private final boolean hasAnyReplicaGets; + private final long nonceGroup; + private CancellableRegionServerCallable currentCallable; + private int currentCallTotalTimeout; + private final Map> heapSizesByServer = new HashMap<>(); + protected AsyncProcess asyncProcess; + + /** + * For {@link AsyncRequestFutureImpl#manageError(int, Row, Retry, Throwable, ServerName)}. Only + * used to make logging more clear, we don't actually care why we don't retry. + */ + public enum Retry { + YES, + NO_LOCATION_PROBLEM, + NO_NOT_RETRIABLE, + NO_RETRIES_EXHAUSTED, + NO_OTHER_SUCCEEDED + } + + /** Sync point for calls to multiple replicas for the same user request (Get). + * Created and put in the results array (we assume replica calls require results) when + * the replica calls are launched. See results for details of this process. + * POJO, all fields are public. To modify them, the object itself is locked. */ + private static class ReplicaResultState { + public ReplicaResultState(int callCount) { + this.callCount = callCount; + } + + /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */ + int callCount; + /** Errors for which it is not decided whether we will report them to user. If one of the + * calls succeeds, we will discard the errors that may have happened in the other calls. */ + AsyncProcess.BatchErrors replicaErrors = null; + + @Override + public String toString() { + return "[call count " + callCount + "; errors " + replicaErrors + "]"; + } + } + + + + public AsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, + ExecutorService pool, boolean needResults, Object[] results, + Batch.Callback callback, + CancellableRegionServerCallable callable, int timeout, + AsyncProcess asyncProcess) { + this.pool = pool; + this.callback = callback; + this.nonceGroup = nonceGroup; + this.tableName = tableName; + this.actionsInProgress.set(actions.size()); + if (results != null) { + assert needResults; + if (results.length != actions.size()) { + throw new AssertionError("results.length"); + } + this.results = results; + for (int i = 0; i != this.results.length; ++i) { + results[i] = null; + } + } else { + this.results = needResults ? new Object[actions.size()] : null; + } + List replicaGetIndices = null; + boolean hasAnyReplicaGets = false; + if (needResults) { + // Check to see if any requests might require replica calls. + // We expect that many requests will consist of all or no multi-replica gets; in such + // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will + // store the list of action indexes for which replica gets are possible, and set + // hasAnyReplicaGets to true. + boolean hasAnyNonReplicaReqs = false; + int posInList = 0; + for (Action action : actions) { + boolean isReplicaGet = AsyncProcess.isReplicaGet(action.getAction()); + if (isReplicaGet) { + hasAnyReplicaGets = true; + if (hasAnyNonReplicaReqs) { // Mixed case + if (replicaGetIndices == null) { + replicaGetIndices = new ArrayList(actions.size() - 1); + } + replicaGetIndices.add(posInList); + } + } else if (!hasAnyNonReplicaReqs) { + // The first non-multi-replica request in the action list. + hasAnyNonReplicaReqs = true; + if (posInList > 0) { + // Add all the previous requests to the index lists. We know they are all + // replica-gets because this is the first non-multi-replica request in the list. + replicaGetIndices = new ArrayList(actions.size() - 1); + for (int i = 0; i < posInList; ++i) { + replicaGetIndices.add(i); + } + } + } + ++posInList; + } + } + this.hasAnyReplicaGets = hasAnyReplicaGets; + if (replicaGetIndices != null) { + this.replicaGetIndices = new int[replicaGetIndices.size()]; + int i = 0; + for (Integer el : replicaGetIndices) { + this.replicaGetIndices[i++] = el; + } + } else { + this.replicaGetIndices = null; + } + this.callsInProgress = !hasAnyReplicaGets ? null : + Collections.newSetFromMap( + new ConcurrentHashMap()); + this.asyncProcess = asyncProcess; + this.errorsByServer = createServerErrorTracker(); + this.errors = (asyncProcess.globalErrors != null) + ? asyncProcess.globalErrors : new AsyncProcess.BatchErrors(); + this.currentCallable = callable; + this.currentCallTotalTimeout = timeout; + + } + + public Set getCallsInProgress() { + return callsInProgress; + } + @VisibleForTesting + Map> getRequestHeapSize() { + return heapSizesByServer; + } + + private SingleServerRequestRunnable addSingleServerRequestHeapSize(ServerName server, + SingleServerRequestRunnable runnable) { + List heapCount = heapSizesByServer.get(server); + if (heapCount == null) { + heapCount = new LinkedList<>(); + heapSizesByServer.put(server, heapCount); + } + heapCount.add(runnable.heapSize()); + return runnable; + } + /** + * Group a list of actions per region servers, and send them. + * + * @param currentActions - the list of row to submit + * @param numAttempt - the current numAttempt (first attempt is 1) + */ + void groupAndSendMultiAction(List> currentActions, int numAttempt) { + Map> actionsByServer = + new HashMap>(); + + boolean isReplica = false; + List> unknownReplicaActions = null; + for (Action action : currentActions) { + RegionLocations locs = findAllLocationsOrFail(action, true); + if (locs == null) continue; + boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); + if (isReplica && !isReplicaAction) { + // This is the property of the current implementation, not a requirement. + throw new AssertionError("Replica and non-replica actions in the same retry"); + } + isReplica = isReplicaAction; + HRegionLocation loc = locs.getRegionLocation(action.getReplicaId()); + if (loc == null || loc.getServerName() == null) { + if (isReplica) { + if (unknownReplicaActions == null) { + unknownReplicaActions = new ArrayList>(); + } + unknownReplicaActions.add(action); + } else { + // TODO: relies on primary location always being fetched + manageLocationError(action, null); + } + } else { + byte[] regionName = loc.getRegionInfo().getRegionName(); + AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); + } + } + boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets); + boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty(); + + if (!actionsByServer.isEmpty()) { + // If this is a first attempt to group and send, no replicas, we need replica thread. + sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown) + ? currentActions : null, numAttempt > 1 && !hasUnknown); + } + + if (hasUnknown) { + actionsByServer = new HashMap>(); + for (Action action : unknownReplicaActions) { + HRegionLocation loc = getReplicaLocationOrFail(action); + if (loc == null) continue; + byte[] regionName = loc.getRegionInfo().getRegionName(); + AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); + } + if (!actionsByServer.isEmpty()) { + sendMultiAction( + actionsByServer, numAttempt, doStartReplica ? currentActions : null, true); + } + } + } + + private HRegionLocation getReplicaLocationOrFail(Action action) { + // We are going to try get location once again. For each action, we'll do it once + // from cache, because the previous calls in the loop might populate it. + int replicaId = action.getReplicaId(); + RegionLocations locs = findAllLocationsOrFail(action, true); + if (locs == null) return null; // manageError already called + HRegionLocation loc = locs.getRegionLocation(replicaId); + if (loc == null || loc.getServerName() == null) { + locs = findAllLocationsOrFail(action, false); + if (locs == null) return null; // manageError already called + loc = locs.getRegionLocation(replicaId); + } + if (loc == null || loc.getServerName() == null) { + manageLocationError(action, null); + return null; + } + return loc; + } + + private void manageLocationError(Action action, Exception ex) { + String msg = "Cannot get replica " + action.getReplicaId() + + " location for " + action.getAction(); + LOG.error(msg); + if (ex == null) { + ex = new IOException(msg); + } + manageError(action.getOriginalIndex(), action.getAction(), + Retry.NO_LOCATION_PROBLEM, ex, null); + } + + private RegionLocations findAllLocationsOrFail(Action action, boolean useCache) { + if (action.getAction() == null) throw new IllegalArgumentException("#" + asyncProcess.id + + ", row cannot be null"); + RegionLocations loc = null; + try { + loc = asyncProcess.connection.locateRegion( + tableName, action.getAction().getRow(), useCache, true, action.getReplicaId()); + } catch (IOException ex) { + manageLocationError(action, ex); + } + return loc; + } + + /** + * Send a multi action structure to the servers, after a delay depending on the attempt + * number. Asynchronous. + * + * @param actionsByServer the actions structured by regions + * @param numAttempt the attempt number. + * @param actionsForReplicaThread original actions for replica thread; null on non-first call. + */ + void sendMultiAction(Map> actionsByServer, + int numAttempt, List> actionsForReplicaThread, boolean reuseThread) { + // Run the last item on the same thread if we are already on a send thread. + // We hope most of the time it will be the only item, so we can cut down on threads. + int actionsRemaining = actionsByServer.size(); + // This iteration is by server (the HRegionLocation comparator is by server portion only). + for (Map.Entry> e : actionsByServer.entrySet()) { + ServerName server = e.getKey(); + MultiAction multiAction = e.getValue(); + Collection runnables = getNewMultiActionRunnable(server, multiAction, + numAttempt); + // make sure we correctly count the number of runnables before we try to reuse the send + // thread, in case we had to split the request into different runnables because of backoff + if (runnables.size() > actionsRemaining) { + actionsRemaining = runnables.size(); + } + + // run all the runnables + for (Runnable runnable : runnables) { + if ((--actionsRemaining == 0) && reuseThread) { + runnable.run(); + } else { + try { + pool.submit(runnable); + } catch (Throwable t) { + if (t instanceof RejectedExecutionException) { + // This should never happen. But as the pool is provided by the end user, + // let's secure this a little. + LOG.warn("#" + asyncProcess.id + ", the task was rejected by the pool. This is unexpected." + + " Server is " + server.getServerName(), t); + } else { + // see #HBASE-14359 for more details + LOG.warn("Caught unexpected exception/error: ", t); + } + asyncProcess.decTaskCounters(multiAction.getRegions(), server); + // We're likely to fail again, but this will increment the attempt counter, + // so it will finish. + receiveGlobalFailure(multiAction, server, numAttempt, t); + } + } + } + } + + if (actionsForReplicaThread != null) { + startWaitingForReplicaCalls(actionsForReplicaThread); + } + } + + private Collection getNewMultiActionRunnable(ServerName server, + MultiAction multiAction, + int numAttempt) { + // no stats to manage, just do the standard action + if (asyncProcess.connection.getStatisticsTracker() == null) { + if (asyncProcess.connection.getConnectionMetrics() != null) { + asyncProcess.connection.getConnectionMetrics().incrNormalRunners(); + } + asyncProcess.incTaskCounters(multiAction.getRegions(), server); + SingleServerRequestRunnable runnable = addSingleServerRequestHeapSize(server, + new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)); + return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable)); + } + + // group the actions by the amount of delay + Map actions = new HashMap(multiAction + .size()); + + // split up the actions + for (Map.Entry>> e : multiAction.actions.entrySet()) { + Long backoff = getBackoff(server, e.getKey()); + DelayingRunner runner = actions.get(backoff); + if (runner == null) { + actions.put(backoff, new DelayingRunner(backoff, e)); + } else { + runner.add(e); + } + } + + List toReturn = new ArrayList(actions.size()); + for (DelayingRunner runner : actions.values()) { + asyncProcess.incTaskCounters(runner.getActions().getRegions(), server); + String traceText = "AsyncProcess.sendMultiAction"; + Runnable runnable = addSingleServerRequestHeapSize(server, + new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress)); + // use a delay runner only if we need to sleep for some time + if (runner.getSleepTime() > 0) { + runner.setRunner(runnable); + traceText = "AsyncProcess.clientBackoff.sendMultiAction"; + runnable = runner; + if (asyncProcess.connection.getConnectionMetrics() != null) { + asyncProcess.connection.getConnectionMetrics().incrDelayRunners(); + asyncProcess.connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime()); + } + } else { + if (asyncProcess.connection.getConnectionMetrics() != null) { + asyncProcess.connection.getConnectionMetrics().incrNormalRunners(); + } + } + runnable = Trace.wrap(traceText, runnable); + toReturn.add(runnable); + + } + return toReturn; + } + + /** + * @param server server location where the target region is hosted + * @param regionName name of the region which we are going to write some data + * @return the amount of time the client should wait until it submit a request to the + * specified server and region + */ + private Long getBackoff(ServerName server, byte[] regionName) { + ServerStatisticTracker tracker = asyncProcess.connection.getStatisticsTracker(); + ServerStatistics stats = tracker.getStats(server); + return asyncProcess.connection.getBackoffPolicy() + .getBackoffTime(server, regionName, stats); + } + + /** + * Starts waiting to issue replica calls on a different thread; or issues them immediately. + */ + private void startWaitingForReplicaCalls(List> actionsForReplicaThread) { + long startTime = EnvironmentEdgeManager.currentTime(); + ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable( + actionsForReplicaThread, startTime); + if (asyncProcess.primaryCallTimeoutMicroseconds == 0) { + // Start replica calls immediately. + replicaRunnable.run(); + } else { + // Start the thread that may kick off replica gets. + // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea. + try { + pool.submit(replicaRunnable); + } catch (RejectedExecutionException ree) { + LOG.warn("#" + asyncProcess.id + ", replica task was rejected by the pool - no replica calls", ree); + } + } + } + + /** + * Check that we can retry acts accordingly: logs, set the error status. + * + * @param originalIndex the position in the list sent + * @param row the row + * @param canRetry if false, we won't retry whatever the settings. + * @param throwable the throwable, if any (can be null) + * @param server the location, if any (can be null) + * @return true if the action can be retried, false otherwise. + */ + public Retry manageError(int originalIndex, Row row, Retry canRetry, + Throwable throwable, ServerName server) { + if (canRetry == Retry.YES + && throwable != null && throwable instanceof DoNotRetryIOException) { + canRetry = Retry.NO_NOT_RETRIABLE; + } + + if (canRetry != Retry.YES) { + // Batch.Callback was not called on failure in 0.94. We keep this. + setError(originalIndex, row, throwable, server); + } else if (isActionComplete(originalIndex, row)) { + canRetry = Retry.NO_OTHER_SUCCEEDED; + } + return canRetry; + } + + /** + * Resubmit all the actions from this multiaction after a failure. + * + * @param rsActions the actions still to do from the initial list + * @param server the destination + * @param numAttempt the number of attempts so far + * @param t the throwable (if any) that caused the resubmit + */ + private void receiveGlobalFailure( + MultiAction rsActions, ServerName server, int numAttempt, Throwable t) { + errorsByServer.reportServerError(server); + Retry canRetry = errorsByServer.canTryMore(numAttempt) + ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED; + + if (tableName == null && ClientExceptionsUtil.isMetaClearingException(t)) { + // tableName is null when we made a cross-table RPC call. + asyncProcess.connection.clearCaches(server); + } + int failed = 0, stopped = 0; + List> toReplay = new ArrayList>(); + for (Map.Entry>> e : rsActions.actions.entrySet()) { + byte[] regionName = e.getKey(); + byte[] row = e.getValue().iterator().next().getAction().getRow(); + // Do not use the exception for updating cache because it might be coming from + // any of the regions in the MultiAction. + try { + if (tableName != null) { + asyncProcess.connection.updateCachedLocations(tableName, regionName, row, + ClientExceptionsUtil.isMetaClearingException(t) ? null : t, server); + } + } catch (Throwable ex) { + // That should never happen, but if it did, we want to make sure + // we still process errors + LOG.error("Couldn't update cached region locations: " + ex); + } + for (Action action : e.getValue()) { + Retry retry = manageError( + action.getOriginalIndex(), action.getAction(), canRetry, t, server); + if (retry == Retry.YES) { + toReplay.add(action); + } else if (retry == Retry.NO_OTHER_SUCCEEDED) { + ++stopped; + } else { + ++failed; + } + } + } + + if (toReplay.isEmpty()) { + logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped); + } else { + resubmit(server, toReplay, numAttempt, rsActions.size(), t); + } + } + + /** + * Log as much info as possible, and, if there is something to replay, + * submit it again after a back off sleep. + */ + private void resubmit(ServerName oldServer, List> toReplay, + int numAttempt, int failureCount, Throwable throwable) { + // We have something to replay. We're going to sleep a little before. + + // We have two contradicting needs here: + // 1) We want to get the new location after having slept, as it may change. + // 2) We want to take into account the location when calculating the sleep time. + // 3) If all this is just because the response needed to be chunked try again FAST. + // It should be possible to have some heuristics to take the right decision. Short term, + // we go for one. + boolean retryImmediately = throwable instanceof RetryImmediatelyException; + int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1; + long backOffTime = retryImmediately ? 0 : + errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pause); + if (numAttempt > asyncProcess.startLogErrorsCnt) { + // We use this value to have some logs when we have multiple failures, but not too many + // logs, as errors are to be expected when a region moves, splits and so on + LOG.info(createLog(numAttempt, failureCount, toReplay.size(), + oldServer, throwable, backOffTime, true, null, -1, -1)); + } + + try { + if (backOffTime > 0) { + Thread.sleep(backOffTime); + } + } catch (InterruptedException e) { + LOG.warn("#" + asyncProcess.id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e); + Thread.currentThread().interrupt(); + return; + } + + groupAndSendMultiAction(toReplay, nextAttemptNumber); + } + + private void logNoResubmit(ServerName oldServer, int numAttempt, + int failureCount, Throwable throwable, int failed, int stopped) { + if (failureCount != 0 || numAttempt > asyncProcess.startLogErrorsCnt + 1) { + String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString(); + String logMessage = createLog(numAttempt, failureCount, 0, oldServer, + throwable, -1, false, timeStr, failed, stopped); + if (failed != 0) { + // Only log final failures as warning + LOG.warn(logMessage); + } else { + LOG.info(logMessage); + } + } + } + + /** + * Called when we receive the result of a server query. + * + * @param multiAction - the multiAction we sent + * @param server - the location. It's used as a server name. + * @param responses - the response, if any + * @param numAttempt - the attempt + */ + private void receiveMultiAction(MultiAction multiAction, + ServerName server, MultiResponse responses, int numAttempt) { + assert responses != null; + + // Success or partial success + // Analyze detailed results. We can still have individual failures to be redo. + // two specific throwables are managed: + // - DoNotRetryIOException: we continue to retry for other actions + // - RegionMovedException: we update the cache with the new region location + + List> toReplay = new ArrayList>(); + Throwable throwable = null; + int failureCount = 0; + boolean canRetry = true; + + Map results = responses.getResults(); + updateStats(server, results); + + int failed = 0, stopped = 0; + // Go by original action. + for (Map.Entry>> regionEntry : multiAction.actions.entrySet()) { + byte[] regionName = regionEntry.getKey(); + Map regionResults = results.get(regionName) == null + ? null : results.get(regionName).result; + if (regionResults == null) { + if (!responses.getExceptions().containsKey(regionName)) { + LOG.error("Server sent us neither results nor exceptions for " + + Bytes.toStringBinary(regionName)); + responses.getExceptions().put(regionName, new RuntimeException("Invalid response")); + } + continue; + } + boolean regionFailureRegistered = false; + for (Action sentAction : regionEntry.getValue()) { + Object result = regionResults.get(sentAction.getOriginalIndex()); + // Failure: retry if it's make sense else update the errors lists + if (result == null || result instanceof Throwable) { + Row row = sentAction.getAction(); + throwable = ClientExceptionsUtil.findException(result); + // Register corresponding failures once per server/once per region. + if (!regionFailureRegistered) { + regionFailureRegistered = true; + try { + asyncProcess.connection.updateCachedLocations( + tableName, regionName, row.getRow(), result, server); + } catch (Throwable ex) { + // That should never happen, but if it did, we want to make sure + // we still process errors + LOG.error("Couldn't update cached region locations: " + ex); + } + } + if (failureCount == 0) { + errorsByServer.reportServerError(server); + // We determine canRetry only once for all calls, after reporting server failure. + canRetry = errorsByServer.canTryMore(numAttempt); + } + ++failureCount; + Retry retry = manageError(sentAction.getOriginalIndex(), row, + canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable) result, server); + if (retry == Retry.YES) { + toReplay.add(sentAction); + } else if (retry == Retry.NO_OTHER_SUCCEEDED) { + ++stopped; + } else { + ++failed; + } + } else { + if (callback != null) { + try { + //noinspection unchecked + // TODO: would callback expect a replica region name if it gets one? + this.callback.update(regionName, sentAction.getAction().getRow(), (CResult) result); + } catch (Throwable t) { + LOG.error("User callback threw an exception for " + + Bytes.toStringBinary(regionName) + ", ignoring", t); + } + } + setResult(sentAction, result); + } + } + } + + // The failures global to a region. We will use for multiAction we sent previously to find the + // actions to replay. + for (Map.Entry throwableEntry : responses.getExceptions().entrySet()) { + throwable = throwableEntry.getValue(); + byte[] region = throwableEntry.getKey(); + List> actions = multiAction.actions.get(region); + if (actions == null || actions.isEmpty()) { + throw new IllegalStateException("Wrong response for the region: " + + HRegionInfo.encodeRegionName(region)); + } + + if (failureCount == 0) { + errorsByServer.reportServerError(server); + canRetry = errorsByServer.canTryMore(numAttempt); + } + if (null == tableName && ClientExceptionsUtil.isMetaClearingException(throwable)) { + // For multi-actions, we don't have a table name, but we want to make sure to clear the + // cache in case there were location-related exceptions. We don't to clear the cache + // for every possible exception that comes through, however. + asyncProcess.connection.clearCaches(server); + } else { + try { + asyncProcess.connection.updateCachedLocations( + tableName, region, actions.get(0).getAction().getRow(), throwable, server); + } catch (Throwable ex) { + // That should never happen, but if it did, we want to make sure + // we still process errors + LOG.error("Couldn't update cached region locations: " + ex); + } + } + failureCount += actions.size(); + + for (Action action : actions) { + Row row = action.getAction(); + Retry retry = manageError(action.getOriginalIndex(), row, + canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server); + if (retry == Retry.YES) { + toReplay.add(action); + } else if (retry == Retry.NO_OTHER_SUCCEEDED) { + ++stopped; + } else { + ++failed; + } + } + } + if (toReplay.isEmpty()) { + logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped); + } else { + resubmit(server, toReplay, numAttempt, failureCount, throwable); + } + } + + @VisibleForTesting + protected void updateStats(ServerName server, Map results) { + boolean metrics = asyncProcess.connection.getConnectionMetrics() != null; + boolean stats = asyncProcess.connection.getStatisticsTracker() != null; + if (!stats && !metrics) { + return; + } + for (Map.Entry regionStats : results.entrySet()) { + byte[] regionName = regionStats.getKey(); + ClientProtos.RegionLoadStats stat = regionStats.getValue().getStat(); + RegionLoadStats regionLoadstats = ProtobufUtil.createRegionLoadStats(stat); + ResultStatsUtil.updateStats(asyncProcess.connection.getStatisticsTracker(), server, + regionName, regionLoadstats); + ResultStatsUtil.updateStats(asyncProcess.connection.getConnectionMetrics(), + server, regionName, regionLoadstats); + } + } + + + private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn, + Throwable error, long backOffTime, boolean willRetry, String startTime, + int failed, int stopped) { + StringBuilder sb = new StringBuilder(); + sb.append("#").append(asyncProcess.id).append(", table=").append(tableName).append(", ") + .append("attempt=").append(numAttempt) + .append("/").append(asyncProcess.numTries).append(" "); + + if (failureCount > 0 || error != null){ + sb.append("failed=").append(failureCount).append("ops").append(", last exception: "). + append(error == null ? "null" : error); + } else { + sb.append("succeeded"); + } + + sb.append(" on ").append(sn).append(", tracking started ").append(startTime); + + if (willRetry) { + sb.append(", retrying after=").append(backOffTime).append("ms"). + append(", replay=").append(replaySize).append("ops"); + } else if (failureCount > 0) { + if (stopped > 0) { + sb.append("; not retrying ").append(stopped).append(" due to success from other replica"); + } + if (failed > 0) { + sb.append("; not retrying ").append(failed).append(" - final failure"); + } + + } + + return sb.toString(); + } + + /** + * Sets the non-error result from a particular action. + * @param action Action (request) that the server responded to. + * @param result The result. + */ + private void setResult(Action action, Object result) { + if (result == null) { + throw new RuntimeException("Result cannot be null"); + } + ReplicaResultState state = null; + boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); + int index = action.getOriginalIndex(); + if (results == null) { + decActionCounter(index); + return; // Simple case, no replica requests. + } + state = trySetResultSimple(index, action.getAction(), false, result, null, isStale); + if (state == null) { + return; // Simple case, no replica requests. + } + // At this point we know that state is set to replica tracking class. + // It could be that someone else is also looking at it; however, we know there can + // only be one state object, and only one thread can set callCount to 0. Other threads + // will either see state with callCount 0 after locking it; or will not see state at all + // we will replace it with the result. + synchronized (state) { + if (state.callCount == 0) { + return; // someone already set the result + } + state.callCount = 0; + } + synchronized (replicaResultLock) { + if (results[index] != state) { + throw new AssertionError("We set the callCount but someone else replaced the result"); + } + results[index] = result; + } + + decActionCounter(index); + } + + /** + * Sets the error from a particular action. + * @param index Original action index. + * @param row Original request. + * @param throwable The resulting error. + * @param server The source server. + */ + private void setError(int index, Row row, Throwable throwable, ServerName server) { + ReplicaResultState state = null; + if (results == null) { + // Note that we currently cannot have replica requests with null results. So it shouldn't + // happen that multiple replica calls will call dAC for same actions with results == null. + // Only one call per action should be present in this case. + errors.add(throwable, row, server); + decActionCounter(index); + return; // Simple case, no replica requests. + } + state = trySetResultSimple(index, row, true, throwable, server, false); + if (state == null) { + return; // Simple case, no replica requests. + } + AsyncProcess.BatchErrors target = null; // Error will be added to final errors, or temp replica errors. + boolean isActionDone = false; + synchronized (state) { + switch (state.callCount) { + case 0: return; // someone already set the result + case 1: { // All calls failed, we are the last error. + target = errors; + isActionDone = true; + break; + } + default: { + assert state.callCount > 1; + if (state.replicaErrors == null) { + state.replicaErrors = new AsyncProcess.BatchErrors(); + } + target = state.replicaErrors; + break; + } + } + --state.callCount; + } + target.add(throwable, row, server); + if (isActionDone) { + if (state.replicaErrors != null) { // last call, no need to lock + errors.merge(state.replicaErrors); + } + // See setResult for explanations. + synchronized (replicaResultLock) { + if (results[index] != state) { + throw new AssertionError("We set the callCount but someone else replaced the result"); + } + results[index] = throwable; + } + decActionCounter(index); + } + } + + /** + * Checks if the action is complete; used on error to prevent needless retries. + * Does not synchronize, assuming element index/field accesses are atomic. + * This is an opportunistic optimization check, doesn't have to be strict. + * @param index Original action index. + * @param row Original request. + */ + private boolean isActionComplete(int index, Row row) { + if (!AsyncProcess.isReplicaGet(row)) return false; + Object resObj = results[index]; + return (resObj != null) && (!(resObj instanceof ReplicaResultState) + || ((ReplicaResultState)resObj).callCount == 0); + } + + /** + * Tries to set the result or error for a particular action as if there were no replica calls. + * @return null if successful; replica state if there were in fact replica calls. + */ + private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError, + Object result, ServerName server, boolean isFromReplica) { + Object resObj = null; + if (!AsyncProcess.isReplicaGet(row)) { + if (isFromReplica) { + throw new AssertionError("Unexpected stale result for " + row); + } + results[index] = result; + } else { + synchronized (replicaResultLock) { + resObj = results[index]; + if (resObj == null) { + if (isFromReplica) { + throw new AssertionError("Unexpected stale result for " + row); + } + results[index] = result; + } + } + } + + ReplicaResultState rrs = + (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null; + if (rrs == null && isError) { + // The resObj is not replica state (null or already set). + errors.add((Throwable)result, row, server); + } + + if (resObj == null) { + // resObj is null - no replica calls were made. + decActionCounter(index); + return null; + } + return rrs; + } + + private void decActionCounter(int index) { + long actionsRemaining = actionsInProgress.decrementAndGet(); + if (actionsRemaining < 0) { + String error = buildDetailedErrorMsg("Incorrect actions in progress", index); + throw new AssertionError(error); + } else if (actionsRemaining == 0) { + synchronized (actionsInProgress) { + actionsInProgress.notifyAll(); + } + } + } + + private String buildDetailedErrorMsg(String string, int index) { + StringBuilder error = new StringBuilder(128); + error.append(string).append("; called for ").append(index).append(", actionsInProgress ") + .append(actionsInProgress.get()).append("; replica gets: "); + if (replicaGetIndices != null) { + for (int i = 0; i < replicaGetIndices.length; ++i) { + error.append(replicaGetIndices[i]).append(", "); + } + } else { + error.append(hasAnyReplicaGets ? "all" : "none"); + } + error.append("; results "); + if (results != null) { + for (int i = 0; i < results.length; ++i) { + Object o = results[i]; + error.append(((o == null) ? "null" : o.toString())).append(", "); + } + } + return error.toString(); + } + + @Override + public void waitUntilDone() throws InterruptedIOException { + try { + waitUntilDone(Long.MAX_VALUE); + } catch (InterruptedException iex) { + throw new InterruptedIOException(iex.getMessage()); + } finally { + if (callsInProgress != null) { + for (CancellableRegionServerCallable clb : callsInProgress) { + clb.cancel(); + } + } + } + } + + private boolean waitUntilDone(long cutoff) throws InterruptedException { + boolean hasWait = cutoff != Long.MAX_VALUE; + long lastLog = EnvironmentEdgeManager.currentTime(); + long currentInProgress; + while (0 != (currentInProgress = actionsInProgress.get())) { + long now = EnvironmentEdgeManager.currentTime(); + if (hasWait && (now * 1000L) > cutoff) { + return false; + } + if (!hasWait) { // Only log if wait is infinite. + if (now > lastLog + 10000) { + lastLog = now; + LOG.info("#" + asyncProcess.id + ", waiting for " + currentInProgress + + " actions to finish on table: " + tableName); + if (currentInProgress <= asyncProcess.thresholdToLogUndoneTaskDetails) { + asyncProcess.logDetailsOfUndoneTasks(currentInProgress); + } + } + } + synchronized (actionsInProgress) { + if (actionsInProgress.get() == 0) break; + if (!hasWait) { + actionsInProgress.wait(10); + } else { + long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L)); + TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond); + } + } + } + return true; + } + + @Override + public boolean hasError() { + return errors.hasErrors(); + } + + @Override + public List getFailedOperations() { + return errors.actions; + } + + @Override + public RetriesExhaustedWithDetailsException getErrors() { + return errors.makeException(asyncProcess.logBatchErrorDetails); + } + + @Override + public Object[] getResults() throws InterruptedIOException { + waitUntilDone(); + return results; + } + + /** + * Creates the server error tracker to use inside process. + * Currently, to preserve the main assumption about current retries, and to work well with + * the retry-limit-based calculation, the calculation is local per Process object. + * We may benefit from connection-wide tracking of server errors. + * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection + */ + protected ConnectionImplementation.ServerErrorTracker createServerErrorTracker() { + return new ConnectionImplementation.ServerErrorTracker( + asyncProcess.serverTrackerTimeout, asyncProcess.numTries); + } + + /** + * Create a callable. Isolated to be easily overridden in the tests. + */ + @VisibleForTesting + protected MultiServerCallable createCallable(final ServerName server, + TableName tableName, final MultiAction multi) { + return new MultiServerCallable(asyncProcess.connection, tableName, server, + asyncProcess.rpcFactory, multi); + } + +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index bcbb1da..1d1db3a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.filter.BinaryComparator; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index ba963c2..2c1a61e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 54552d9..53b591a 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -59,8 +59,6 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; -import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl; import org.apache.hadoop.hbase.client.AsyncProcess.ListRowAccess; import org.apache.hadoop.hbase.client.AsyncProcess.TaskCountChecker; import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode; @@ -162,8 +160,9 @@ public class TestAsyncProcess { Batch.Callback callback, Object[] results, boolean needResults, CancellableRegionServerCallable callable, int curTimeout) { // Test HTable has tableName of null, so pass DUMMY_TABLE - AsyncRequestFutureImpl r = super.createAsyncRequestFuture( - DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults, null, rpcTimeout); + AsyncRequestFutureImpl r = new MyAsyncRequestFutureImpl( + DUMMY_TABLE, actions, nonceGroup, pool, needResults, + results, callback, callable, curTimeout, this); allReqs.add(r); return r; } @@ -212,18 +211,14 @@ public class TestAsyncProcess { previousTimeout = curTimeout; return super.submitAll(pool, tableName, rows, callback, results, callable, curTimeout); } - - @Override - protected void updateStats(ServerName server, Map results) { - // Do nothing for avoiding the NPE if we test the ClientBackofPolicy. - } @Override protected RpcRetryingCaller createCaller( CancellableRegionServerCallable callable) { callsCt.incrementAndGet(); MultiServerCallable callable1 = (MultiServerCallable) callable; final MultiResponse mr = createMultiResponse( - callable1.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() { + callable1.getMulti(), nbMultiResponse, nbActions, + new ResponseGenerator() { @Override public void addResponse(MultiResponse mr, byte[] regionName, Action a) { if (Arrays.equals(FAILS, a.getAction().getRow())) { @@ -237,8 +232,8 @@ public class TestAsyncProcess { return new RpcRetryingCallerImpl(100, 10, 9) { @Override public AbstractResponse callWithoutRetries(RetryingCallable callable, - int callTimeout) - throws IOException, RuntimeException { + int callTimeout) + throws IOException, RuntimeException { try { // sleep one second in order for threadpool to start another thread instead of reusing // existing one. @@ -250,6 +245,28 @@ public class TestAsyncProcess { } }; } + + + } + + + + static class MyAsyncRequestFutureImpl extends AsyncRequestFutureImpl { + + public MyAsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, + ExecutorService pool, boolean needResults, Object[] results, + Batch.Callback callback, + CancellableRegionServerCallable callable, int timeout, + AsyncProcess asyncProcess) { + super(tableName, actions, nonceGroup, pool, needResults, + results, callback, callable, timeout, asyncProcess); + } + + @Override + protected void updateStats(ServerName server, Map results) { + // Do nothing for avoiding the NPE if we test the ClientBackofPolicy. + } + } static class CallerWithFailure extends RpcRetryingCallerImpl{ @@ -287,6 +304,7 @@ public class TestAsyncProcess { return new CallerWithFailure(ioe); } } + /** * Make the backoff time always different on each call. */ -- 2.9.3