From 25b6103dadba16e85db0a8c5f2fc44ecf9fc3f2a Mon Sep 17 00:00:00 2001
From: sershe
Date: Tue, 18 Feb 2014 23:37:17 +0000
Subject: [PATCH 06/45] HBASE-10356 Failover RPC's for multi-get
git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1569559 13f79535-47bb-0310-9956-ffa450edef68
---
.../java/org/apache/hadoop/hbase/HRegionInfo.java | 2 +-
.../org/apache/hadoop/hbase/RegionLocations.java | 5 +
.../org/apache/hadoop/hbase/client/Action.java | 23 +-
.../apache/hadoop/hbase/client/AsyncProcess.java | 580 +++++++++++++++++----
.../hadoop/hbase/client/ClusterConnection.java | 6 +
.../hadoop/hbase/client/ConnectionAdapter.java | 6 +
.../hadoop/hbase/client/ConnectionManager.java | 13 +-
.../apache/hadoop/hbase/client/MultiAction.java | 11 -
.../hadoop/hbase/client/MultiServerCallable.java | 6 +
.../hadoop/hbase/client/RegionReplicaUtil.java | 5 +-
.../hadoop/hbase/client/TestAsyncProcess.java | 332 ++++++++++--
.../hbase/client/CoprocessorHConnection.java | 13 +-
.../hbase/client/HConnectionTestingUtility.java | 3 +
13 files changed, 852 insertions(+), 153 deletions(-)
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
index 0f846b5..59a3248 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
@@ -143,7 +143,7 @@ public class HRegionInfo implements Comparable {
public static final byte REPLICA_ID_DELIMITER = (byte)'_';
private static final int MAX_REPLICA_ID = 0xFFFF;
- private static final int DEFAULT_REPLICA_ID = 0;
+ static final int DEFAULT_REPLICA_ID = 0;
/**
* Does region name contain its encoded name?
* @param regionName region name
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
index cdf1180..b5db549 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase;
import java.util.Collection;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -261,6 +262,10 @@ public class RegionLocations {
return locations;
}
+ public HRegionLocation getDefaultRegionLocation() {
+ return locations[HRegionInfo.DEFAULT_REPLICA_ID];
+ }
+
/**
* Returns the first not-null region location in the list
*/
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
index c3e2104..5147c25 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
/**
* A Get, Put, Increment, Append, or Delete associated with it's region. Used internally by
@@ -27,18 +28,34 @@ import org.apache.hadoop.hbase.HConstants;
* the index from the original request.
*/
@InterfaceAudience.Private
+//TODO: R is never used
public class Action implements Comparable {
// TODO: This class should not be visible outside of the client package.
private Row action;
private int originalIndex;
private long nonce = HConstants.NO_NONCE;
+ private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
public Action(Row action, int originalIndex) {
super();
this.action = action;
- this.originalIndex = originalIndex;
+ this.originalIndex = originalIndex;
}
+ /**
+ * Creates an action for a particular replica from original action.
+ * @param action Original action.
+ * @param replicaId Replica id for the new action.
+ */
+ public Action(Action action, int replicaId) {
+ super();
+ this.action = action.action;
+ this.nonce = action.nonce;
+ this.originalIndex = action.originalIndex;
+ this.replicaId = replicaId;
+ }
+
+
public void setNonce(long nonce) {
this.nonce = nonce;
}
@@ -55,6 +72,10 @@ public class Action implements Comparable {
return originalIndex;
}
+ public int getReplicaId() {
+ return replicaId;
+ }
+
@SuppressWarnings("rawtypes")
@Override
public int compareTo(Object o) {
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 714daeb..9419932 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
@@ -50,7 +51,6 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.htrace.Trace;
-
import com.google.common.annotations.VisibleForTesting;
/**
@@ -89,9 +89,11 @@ import com.google.common.annotations.VisibleForTesting;
*
*/
class AsyncProcess {
- private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
+ protected static final Log LOG = LogFactory.getLog(AsyncProcess.class);
protected static final AtomicLong COUNTER = new AtomicLong();
+ public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout";
+
/**
* The context used to wait for results from one submit call.
* 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
@@ -102,7 +104,7 @@ class AsyncProcess {
public boolean hasError();
public RetriesExhaustedWithDetailsException getErrors();
public List extends Row> getFailedOperations();
- public Object[] getResults();
+ public Object[] getResults() throws InterruptedIOException;
/** Wait until all tasks are executed, successfully or not. */
public void waitUntilDone() throws InterruptedIOException;
}
@@ -122,6 +124,27 @@ class AsyncProcess {
public void waitUntilDone() throws InterruptedIOException {}
};
+ /** Sync point for calls to multiple replicas for the same user request (Get).
+ * Created and put in the results array (we assume replica calls require results) when
+ * the replica calls are launched. See results for details of this process.
+ * POJO, all fields are public. To modify them, the object itself is locked. */
+ private static class ReplicaResultState {
+ public ReplicaResultState(int callCount) {
+ this.callCount = callCount;
+ }
+
+ /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */
+ int callCount;
+ /** Call that succeeds sets the count to 0 and sets this to result. Call that fails but
+ * is not last, adds error to list. If all calls fail the last one sets this to list. */
+ Object result = null;
+ /** Errors for which it is not decided whether we will report them to user. If one of the
+ * calls succeeds, we will discard the errors that may have happened in the other calls. */
+ BatchErrors replicaErrors = null;
+ }
+
+
+ // TODO: many of the fields should be made private
protected final long id;
protected final ClusterConnection hConnection;
@@ -160,6 +183,7 @@ class AsyncProcess {
protected int numTries;
protected int serverTrackerTimeout;
protected int timeout;
+ protected long primaryCallTimeout;
// End configuration settings.
protected static class BatchErrors {
@@ -192,6 +216,12 @@ class AsyncProcess {
actions.clear();
addresses.clear();
}
+
+ public synchronized void merge(BatchErrors other) {
+ throwables.addAll(other.throwables);
+ actions.addAll(other.actions);
+ addresses.addAll(other.addresses);
+ }
}
public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
@@ -212,6 +242,7 @@ class AsyncProcess {
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ this.primaryCallTimeout = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10);
this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
@@ -270,7 +301,8 @@ class AsyncProcess {
/**
* Extract from the rows list what we can submit. The rows we can not submit are kept in the
- * list.
+ * list. Does not send requests to replicas (not currently used for anything other
+ * than streaming puts anyway).
*
* @param pool ExecutorService to use.
* @param tableName The table for which this request is needed.
@@ -311,7 +343,7 @@ class AsyncProcess {
Row r = it.next();
HRegionLocation loc;
try {
- loc = findDestLocation(tableName, r);
+ loc = findDestLocation(tableName, r, true).getDefaultRegionLocation();
} catch (IOException ex) {
locationErrors = new ArrayList();
locationErrorRows = new ArrayList();
@@ -329,7 +361,9 @@ class AsyncProcess {
Action action = new Action(r, ++posInList);
setNonce(ng, r, action);
retainedActions.add(action);
- addAction(loc, action, actionsByServer, nonceGroup);
+ // TODO: replica-get is not supported on this path
+ byte[] regionName = loc.getRegionInfo().getRegionName();
+ addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
it.remove();
}
}
@@ -347,7 +381,7 @@ class AsyncProcess {
ars.manageError(originalIndex, row, false, locationErrors.get(i), null);
}
}
- ars.sendMultiAction(actionsByServer, 1);
+ ars.sendMultiAction(actionsByServer, 1, null);
return ars;
}
@@ -359,13 +393,12 @@ class AsyncProcess {
* @param actionsByServer the multiaction per server
* @param nonceGroup Nonce group.
*/
- private void addAction(HRegionLocation loc, Action action,
+ private void addAction(ServerName server, byte[] regionName, Action action,
Map> actionsByServer, long nonceGroup) {
- final byte[] regionName = loc.getRegionInfo().getRegionName();
- MultiAction multiAction = actionsByServer.get(loc.getServerName());
+ MultiAction multiAction = actionsByServer.get(server);
if (multiAction == null) {
multiAction = new MultiAction();
- actionsByServer.put(loc.getServerName(), multiAction);
+ actionsByServer.put(server, multiAction);
}
if (action.hasNonce() && !multiAction.hasNonceGroup()) {
multiAction.setNonceGroup(nonceGroup);
@@ -380,10 +413,12 @@ class AsyncProcess {
* @param row the row
* @return the destination.
*/
- private HRegionLocation findDestLocation(TableName tableName, Row row) throws IOException {
+ private RegionLocations findDestLocation(
+ TableName tableName, Row row, boolean checkPrimary) throws IOException {
if (row == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
- HRegionLocation loc = hConnection.locateRegion(tableName, row.getRow());
- if (loc == null) {
+ RegionLocations loc = hConnection.locateRegionAll(tableName, row.getRow());
+ if (loc == null
+ || (checkPrimary && (loc.isEmpty() || loc.getDefaultRegionLocation() == null))) {
throw new IOException("#" + id + ", no location found, aborting submit for" +
" tableName=" + tableName + " rowkey=" + Arrays.toString(row.getRow()));
}
@@ -516,6 +551,144 @@ class AsyncProcess {
* scheduling children. This is why lots of code doesn't require any synchronization.
*/
protected class AsyncRequestFutureImpl implements AsyncRequestFuture {
+
+ /**
+ * Runnable (that can be submitted to thread pool) that waits for when it's time
+ * to issue replica calls, finds region replicas, groups the requests by replica and
+ * issues the calls (on separate threads, via sendMultiAction).
+ * This is done on a separate thread because we don't want to wait on user thread for
+ * our asynchronous call, and usually we have to wait before making replica calls.
+ */
+ private final class ReplicaCallIssuingRunnable implements Runnable {
+ private final long startTime;
+ private final List> initialActions;
+
+ public ReplicaCallIssuingRunnable(List> initialActions, long startTime) {
+ this.initialActions = initialActions;
+ this.startTime = startTime;
+ }
+
+ @Override
+ public void run() {
+ boolean done = false;
+ if (primaryCallTimeout > 0) {
+ try {
+ done = waitUntilDone(startTime + primaryCallTimeout);
+ } catch (InterruptedException ex) {
+ LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage());
+ return;
+ }
+ }
+ if (done) return; // Done within primary timeout
+ Map> actionsByServer =
+ new HashMap>();
+ if (replicaGetIndices == null) {
+ for (int i = 0; i < results.length; ++i) {
+ addReplicaActions(i, actionsByServer);
+ }
+ } else {
+ for (int i = 0; i < replicaGetIndices.length; ++i) {
+ addReplicaActions(replicaGetIndices[i], actionsByServer);
+ }
+ }
+ if (actionsByServer.isEmpty()) return; // Nothing to do - done or no replicas found.
+ sendMultiAction(actionsByServer, 1, null);
+ }
+
+ /**
+ * Add replica actions to action map by server.
+ * @param index Index of the original action.
+ * @param actionsByServer The map by server to add it to.
+ */
+ private void addReplicaActions(
+ int index, Map> actionsByServer) {
+ if (results[index] != null) return; // opportunistic. Never goes from non-null to null.
+ Action action = initialActions.get(index);
+ RegionLocations loc = null;
+ try {
+ // For perf, we assume that this location coming from cache, since we just got location
+ // from meta for the primary call. If it turns out to not be the case, we'd need local
+ // cache since we want to keep as little time as possible before replica call.
+ loc = findDestLocation(tableName, action.getAction(), false);
+ } catch (IOException ex) {
+ manageError(action.getOriginalIndex(), action.getAction(), false, ex, null);
+ LOG.error("Cannot get location - no replica calls for some actions", ex);
+ return;
+ }
+ HRegionLocation[] locs = loc.getRegionLocations();
+ int replicaCount = 0;
+ for (int i = 1; i < locs.length; ++i) {
+ replicaCount += (locs[i] != null) ? 1 : 0;
+ }
+ if (replicaCount == 0) {
+ LOG.warn("No replicas found for " + action.getAction());
+ return;
+ }
+ synchronized (replicaResultLock) {
+ // Don't run replica calls if the original has finished. We could do it e.g. if
+ // original has already failed before first replica call (unlikely given retries),
+ // but that would require additional synchronization w.r.t. returning to caller.
+ if (results[index] != null) return;
+ // We set the number of calls here. After that any path must call setResult/setError.
+ results[index] = new ReplicaResultState(replicaCount + 1);
+ }
+ for (int i = 1; i < locs.length; ++i) {
+ if (locs[i] == null) continue;
+ addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
+ new Action(action, i), actionsByServer, nonceGroup);
+ }
+ }
+ }
+
+ /**
+ * Runnable (that can be submitted to thread pool) that submits MultiAction to a
+ * single server. The server call is synchronous, therefore we do it on a thread pool.
+ */
+ private final class SingleServerRequestRunnable implements Runnable {
+ private final MultiAction multiAction;
+ private final int numAttempt;
+ private final ServerName server;
+
+ private SingleServerRequestRunnable(
+ MultiAction multiAction, int numAttempt, ServerName server) {
+ this.multiAction = multiAction;
+ this.numAttempt = numAttempt;
+ this.server = server;
+ }
+
+ @Override
+ public void run() {
+ MultiResponse res;
+ try {
+ MultiServerCallable callable = createCallable(server, tableName, multiAction);
+ try {
+ res = createCaller(callable).callWithoutRetries(callable, timeout);
+ } catch (IOException e) {
+ // The service itself failed . It may be an error coming from the communication
+ // layer, but, as well, a functional error raised by the server.
+ receiveGlobalFailure(multiAction, server, numAttempt, e);
+ return;
+ } catch (Throwable t) {
+ // This should not happen. Let's log & retry anyway.
+ LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
+ " Retrying. Server is " + server + ", tableName=" + tableName, t);
+ receiveGlobalFailure(multiAction, server, numAttempt, t);
+ return;
+ }
+
+ // Normal case: we received an answer from the server, and it's not an exception.
+ receiveMultiAction(multiAction, server, res, numAttempt);
+ } catch (Throwable t) {
+ // Something really bad happened. We are on the send thread that will now die.
+ LOG.error("Internal AsyncProcess #" + id + " error for "
+ + tableName + " processing for " + server, t);
+ throw new RuntimeException(t);
+ } finally {
+ decTaskCounters(multiAction.getRegions(), server);
+ }
+ }
+ }
+
private final Batch.Callback callback;
private final BatchErrors errors;
private final ConnectionManager.ServerErrorTracker errorsByServer;
@@ -524,7 +697,21 @@ class AsyncProcess {
private final TableName tableName;
private final AtomicLong actionsInProgress = new AtomicLong(-1);
+ /** The lock controls access to results. It is only held when populating results where
+ * there might be several callers (eventual consistency gets). For other requests,
+ * there's one unique call going on per result index. */
+ private final Object replicaResultLock = new Object();
+ /** Result array. Null if results are not needed. Otherwise, each index corresponds to
+ * the action index in initial actions submitted. For most request types, has null-s for
+ * requests that are not done, and result/exception for those that are done.
+ * For eventual-consistency gets, initially the same applies; at some point, replica calls
+ * might be started, and ReplicaResultState is put at the corresponding indices. The
+ * returning calls check the type to detect when this is the case. After all calls are done,
+ * ReplicaResultState-s are replaced with results for the user. */
private final Object[] results;
+ /** Indices of replica gets in results. If null, all or no actions are replica-gets. */
+ private final int[] replicaGetIndices;
+ private final boolean hasAnyReplicaGets;
private final long nonceGroup;
public AsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup,
@@ -545,6 +732,51 @@ class AsyncProcess {
} else {
this.results = needResults ? new Object[actions.size()] : null;
}
+ List replicaGetIndices = null;
+ boolean hasAnyReplicaGets = false;
+ if (needResults) {
+ // Check to see if any requests might require replica calls.
+ // We expect that many requests will consist of all or no multi-replica gets; in such
+ // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will
+ // store the list of action indexes for which replica gets are possible, and set
+ // hasAnyReplicaGets to true.
+ boolean hasAnyNonReplicaReqs = false;
+ int posInList = 0;
+ for (Action action : actions) {
+ boolean isReplicaGet = isReplicaGet(action.getAction());
+ if (isReplicaGet) {
+ hasAnyReplicaGets = true;
+ if (hasAnyNonReplicaReqs) { // Mixed case
+ if (replicaGetIndices == null) {
+ replicaGetIndices = new ArrayList(actions.size() - 1);
+ }
+ replicaGetIndices.add(posInList);
+ }
+ } else if (!hasAnyNonReplicaReqs) {
+ // The first non-multi-replica request in the action list.
+ hasAnyNonReplicaReqs = true;
+ if (posInList > 0) {
+ // Add all the previous requests to the index lists. We know they are all
+ // replica-gets because this is the first non-multi-replica request in the list.
+ replicaGetIndices = new ArrayList(actions.size() - 1);
+ for (int i = 0; i < posInList; ++i) {
+ replicaGetIndices.add(i);
+ }
+ }
+ }
+ ++posInList;
+ }
+ }
+ this.hasAnyReplicaGets = hasAnyReplicaGets;
+ if (replicaGetIndices != null) {
+ this.replicaGetIndices = new int[replicaGetIndices.size()];
+ int i = 0;
+ for (Integer el : replicaGetIndices) {
+ this.replicaGetIndices[i++] = el;
+ }
+ } else {
+ this.replicaGetIndices = null;
+ }
this.errorsByServer = createServerErrorTracker();
this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
}
@@ -560,21 +792,40 @@ class AsyncProcess {
final Map> actionsByServer =
new HashMap>();
- HRegionLocation loc;
+ boolean isReplica = false;
for (Action action : currentActions) {
+ RegionLocations locs = null;
try {
- loc = findDestLocation(tableName, action.getAction());
+ locs = findDestLocation(tableName, action.getAction(), false);
} catch (IOException ex) {
// There are multiple retries in locateRegion already. No need to add new.
// We can't continue with this row, hence it's the last retry.
manageError(action.getOriginalIndex(), action.getAction(), false, ex, null);
continue;
}
- addAction(loc, action, actionsByServer, nonceGroup);
- }
+ boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
+ if (isReplica && !isReplicaAction) {
+ // This is the property of the current implementation, not a requirement.
+ throw new AssertionError("Replica and non-replica actions in the same retry");
+ }
+ isReplica = isReplicaAction;
+ HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
+ if (loc == null || loc.getServerName() == null) {
+ // On retry, we couldn't find location for some replica we saw before.
+ String str = "Cannot find location for replica " + action.getReplicaId();
+ LOG.error(str);
+ manageError(action.getOriginalIndex(), action.getAction(),
+ false, new IOException(str), null);
+ continue;
+ }
+ byte[] regionName = loc.getRegionInfo().getRegionName();
+ addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
+ }
+ // If this is a first attempt to group and send, no replicas, we need replica thread.
if (!actionsByServer.isEmpty()) {
- sendMultiAction(actionsByServer, numAttempt);
+ boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
+ sendMultiAction(actionsByServer, numAttempt, doStartReplica ? currentActions : null);
}
}
@@ -584,51 +835,22 @@ class AsyncProcess {
*
* @param actionsByServer the actions structured by regions
* @param numAttempt the attempt number.
+ * @param actionsForReplicaThread original actions for replica thread; null on non-first call.
*/
- private void sendMultiAction(
- Map> actionsByServer, final int numAttempt) {
+ private void sendMultiAction(Map> actionsByServer,
+ int numAttempt, List> actionsForReplicaThread) {
// Run the last item on the same thread if we are already on a send thread.
// We hope most of the time it will be the only item, so we can cut down on threads.
- int reuseThreadCountdown = (numAttempt > 1) ? actionsByServer.size() : Integer.MAX_VALUE;
+ int actionsRemaining = actionsByServer.size();
+ // This iteration is by server (the HRegionLocation comparator is by server portion only).
for (Map.Entry> e : actionsByServer.entrySet()) {
final ServerName server = e.getKey();
final MultiAction multiAction = e.getValue();
incTaskCounters(multiAction.getRegions(), server);
- Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() {
- @Override
- public void run() {
- MultiResponse res;
- try {
- MultiServerCallable callable = createCallable(server, tableName, multiAction);
- try {
- res = createCaller(callable).callWithoutRetries(callable, timeout);
- } catch (IOException e) {
- // The service itself failed . It may be an error coming from the communication
- // layer, but, as well, a functional error raised by the server.
- receiveGlobalFailure(multiAction, server, numAttempt, e);
- return;
- } catch (Throwable t) {
- // This should not happen. Let's log & retry anyway.
- LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
- " Retrying. Server is " + server.getServerName() + ", tableName=" + tableName, t);
- receiveGlobalFailure(multiAction, server, numAttempt, t);
- return;
- }
-
- // Normal case: we received an answer from the server, and it's not an exception.
- receiveMultiAction(multiAction, server, res, numAttempt);
- } catch (Throwable t) {
- // Something really bad happened. We are on the send thread that will now die.
- LOG.error("Internal AsyncProcess #" + id + " error for "
- + tableName + " processing for " + server, t);
- throw new RuntimeException(t);
- } finally {
- decTaskCounters(multiAction.getRegions(), server);
- }
- }
- });
- --reuseThreadCountdown;
- if (reuseThreadCountdown == 0) {
+ Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction",
+ new SingleServerRequestRunnable(multiAction, numAttempt, server));
+ --actionsRemaining;
+ if ((numAttempt > 1) && actionsRemaining == 0) {
runnable.run();
} else {
try {
@@ -645,6 +867,30 @@ class AsyncProcess {
}
}
}
+ if (actionsForReplicaThread != null) {
+ startWaitingForReplicaCalls(actionsForReplicaThread);
+ }
+ }
+
+ /**
+ * Starts waiting to issue replica calls on a different thread; or issues them immediately.
+ */
+ private void startWaitingForReplicaCalls(List> actionsForReplicaThread) {
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
+ actionsForReplicaThread, startTime);
+ if (primaryCallTimeout == 0) {
+ // Start replica calls immediately.
+ replicaRunnable.run();
+ } else {
+ // Start the thread that may kick off replica gets.
+ // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea.
+ try {
+ pool.submit(replicaRunnable);
+ } catch (RejectedExecutionException ree) {
+ LOG.warn("#" + id + ", replica task was rejected by the pool - no replica calls", ree);
+ }
+ }
}
/**
@@ -665,11 +911,11 @@ class AsyncProcess {
if (!canRetry) {
// Batch.Callback was not called on failure in 0.94. We keep this.
- errors.add(throwable, row, server);
- if (results != null) {
- setResult(originalIndex, row, throwable);
- }
- decActionCounter();
+ setError(originalIndex, row, throwable, server);
+ } else {
+ // See if we are dealing with a replica action that was completed from other server.
+ // Doesn't have to be synchronized, worst case we'd retry and be unable to set result.
+ canRetry = !isActionComplete(originalIndex, row);
}
return canRetry;
@@ -685,15 +931,17 @@ class AsyncProcess {
*/
private void receiveGlobalFailure(
MultiAction rsActions, ServerName server, int numAttempt, Throwable t) {
- // Do not use the exception for updating cache because it might be coming from
- // any of the regions in the MultiAction.
- byte[] row = rsActions.actions.values().iterator().next().get(0).getAction().getRow();
- hConnection.updateCachedLocations(tableName, null, row, null, server);
errorsByServer.reportServerError(server);
boolean canRetry = errorsByServer.canRetryMore(numAttempt);
List> toReplay = new ArrayList>();
for (Map.Entry>> e : rsActions.actions.entrySet()) {
+ byte[] regionName = e.getKey();
+ byte[] row = e.getValue().iterator().next().getAction().getRow();
+ // Do not use the exception for updating cache because it might be coming from
+ // any of the regions in the MultiAction.
+ // TODO: depending on type of exception we might not want to update cache at all?
+ hConnection.updateCachedLocations(tableName, regionName, row, null, server);
for (Action action : e.getValue()) {
if (manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, server)) {
toReplay.add(action);
@@ -791,14 +1039,16 @@ class AsyncProcess {
// Failure: retry if it's make sense else update the errors lists
if (result == null || result instanceof Throwable) {
Row row = sentAction.getAction();
- if (!regionFailureRegistered) { // We're doing this once per location.
+ // Register corresponding failures once per server/once per region.
+ if (!regionFailureRegistered) {
regionFailureRegistered = true;
- // The location here is used as a server name.
- hConnection.updateCachedLocations(tableName, regionName, row.getRow(), result, server);
- if (failureCount == 0) {
- errorsByServer.reportServerError(server);
- canRetry = errorsByServer.canRetryMore(numAttempt);
- }
+ hConnection.updateCachedLocations(
+ tableName, regionName, row.getRow(), result, server);
+ }
+ if (failureCount == 0) {
+ errorsByServer.reportServerError(server);
+ // We determine canRetry only once for all calls, after reporting server failure.
+ canRetry = errorsByServer.canRetryMore(numAttempt);
}
++failureCount;
if (manageError(
@@ -809,16 +1059,14 @@ class AsyncProcess {
if (callback != null) {
try {
//noinspection unchecked
+ // TODO: would callback expect a replica region name if it gets one?
this.callback.update(regionName, sentAction.getAction().getRow(), (CResult)result);
} catch (Throwable t) {
LOG.error("User callback threw an exception for "
+ Bytes.toStringBinary(regionName) + ", ignoring", t);
}
}
- if (results != null) {
- setResult(sentAction.getOriginalIndex(), sentAction.getAction(), result);
- }
- decActionCounter();
+ setResult(sentAction, result);
}
}
}
@@ -881,38 +1129,185 @@ class AsyncProcess {
return sb.toString();
}
- private void setResult(int index, Row row, Object result) {
- if (result == null) throw new RuntimeException("Result cannot be set to null");
- if (results[index] != null) throw new RuntimeException("Result was already set");
- results[index] = result;
+ /**
+ * Sets the non-error result from a particular action.
+ * @param action Action (request) that the server responded to.
+ * @param result The result.
+ */
+ private void setResult(Action action, Object result) {
+ ReplicaResultState state = null;
+ boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
+ if (results == null || ((state = trySetResultSimple(
+ action.getOriginalIndex(), action.getAction(), result, isStale)) == null)) {
+ decActionCounter();
+ return; // Simple case, no replica requests.
+ }
+ synchronized (state) {
+ if (state.callCount == 0) return; // someone already set the result
+ state.result = result;
+ state.callCount = 0;
+ state.replicaErrors = null; // no longer matters
+ }
+ decActionCounter();
+ }
+
+ /**
+ * Sets the error from a particular action.
+ * @param index Original action index.
+ * @param row Original request.
+ * @param throwable The resulting error.
+ * @param server The source server.
+ */
+ private void setError(int index, Row row, Throwable throwable, ServerName server) {
+ ReplicaResultState state = null;
+ if (results == null
+ || ((state = trySetResultSimple(index, row, throwable, false)) == null)) {
+ errors.add(throwable, row, server);
+ decActionCounter();
+ return; // Simple case, no replica requests.
+ }
+ BatchErrors target = null; // Error will be added to final errors, or temp replica errors.
+ boolean isActionDone = false;
+ synchronized (state) {
+ switch (state.callCount) {
+ case 0: return; // someone already set the result
+ case 1: { // All calls failed, we are the last error.
+ state.result = throwable;
+ target = errors;
+ isActionDone = true;
+ break;
+ }
+ default: {
+ assert state.callCount > 1;
+ if (state.replicaErrors == null) {
+ state.replicaErrors = new BatchErrors();
+ }
+ target = state.replicaErrors;
+ break;
+ }
+ }
+ --state.callCount;
+ }
+ target.add(throwable, row, server);
+ if (!isActionDone) return;
+ if (state.replicaErrors != null) { // last call, no need to lock
+ errors.merge(state.replicaErrors);
+ state.replicaErrors = null;
+ }
+ decActionCounter();
+ }
+
+ /**
+ * Checks if the action is complete; used on error to prevent needless retries.
+ * Does not synchronize, assuming element index/field accesses are atomic.
+ * This is an opportunistic optimization check, doesn't have to be strict.
+ * @param index Original action index.
+ * @param row Original request.
+ */
+ private boolean isActionComplete(int index, Row row) {
+ if (!isReplicaGet(row)) return false;
+ Object resObj = results[index];
+ return (resObj != null) && (!(resObj instanceof ReplicaResultState)
+ || ((ReplicaResultState)resObj).callCount == 0);
+ }
+
+ /**
+ * Tries to set the result or error for a particular action as if there were no replica calls.
+ * @return null if successful; replica state if there were in fact replica calls.
+ */
+ private ReplicaResultState trySetResultSimple(
+ int index, Row row, Object result, boolean isFromReplica) {
+ Object resObj = null;
+ if (!isReplicaGet(row)) {
+ if (isFromReplica) {
+ throw new AssertionError("Unexpected stale result for " + row);
+ }
+ results[index] = result;
+ } else {
+ synchronized (replicaResultLock) {
+ if ((resObj = results[index]) == null) {
+ if (isFromReplica) {
+ throw new AssertionError("Unexpected stale result for " + row);
+ }
+ results[index] = result;
+ }
+ }
+ }
+ return (resObj == null || !(resObj instanceof ReplicaResultState))
+ ? null : (ReplicaResultState)resObj;
}
private void decActionCounter() {
- actionsInProgress.decrementAndGet();
+ if (hasAnyReplicaGets && (actionsInProgress.get() == 1)) {
+ // Convert replica sync structures to results.
+ int staleCount = 0;
+ if (replicaGetIndices == null) {
+ for (int i = 0; i < results.length; ++i) {
+ staleCount += convertReplicaResult(i) ? 1 : 0;
+ }
+ } else {
+ for (int i = 0; i < replicaGetIndices.length; ++i) {
+ staleCount += convertReplicaResult(replicaGetIndices[i]) ? 1 : 0;
+ }
+ }
+ if (!actionsInProgress.compareAndSet(1, 0)) {
+ throw new AssertionError("Cannot set actions in progress to 0");
+ }
+ if (staleCount > 0) {
+ LOG.trace("Returning " + staleCount + " stale results");
+ }
+ } else {
+ actionsInProgress.decrementAndGet();
+ }
synchronized (actionsInProgress) {
actionsInProgress.notifyAll();
}
}
+ private boolean convertReplicaResult(int index) {
+ if (!(results[index] instanceof ReplicaResultState)) return false;
+ ReplicaResultState state = (ReplicaResultState)results[index];
+ // We know that noone will touch state with 0 callCount, no need to lock
+ if (state.callCount != 0) {
+ throw new AssertionError("Actions are done but callcount is " + state.callCount);
+ }
+ // TODO: we expect the Result coming from server to already have "isStale" specified.
+ Object res = results[index] = state.result;
+ return (res instanceof Result) && ((Result)res).isStale();
+ }
+
@Override
public void waitUntilDone() throws InterruptedIOException {
- long lastLog = EnvironmentEdgeManager.currentTimeMillis();
- long currentInProgress;
try {
- while (0 != (currentInProgress = actionsInProgress.get())) {
- long now = EnvironmentEdgeManager.currentTimeMillis();
+ waitUntilDone(Long.MAX_VALUE);
+ } catch (InterruptedException iex) {
+ throw new InterruptedIOException(iex.getMessage());
+ }
+ }
+
+ private boolean waitUntilDone(long cutoff) throws InterruptedException {
+ boolean hasWait = cutoff != Long.MAX_VALUE;
+ long lastLog = hasWait ? 0 : EnvironmentEdgeManager.currentTimeMillis();
+ long currentInProgress;
+ while (0 != (currentInProgress = actionsInProgress.get())) {
+ long now = 0;
+ if (hasWait && (now = EnvironmentEdgeManager.currentTimeMillis()) > cutoff) {
+ return false;
+ }
+ if (!hasWait) {
+ // Only log if wait is infinite.
+ now = EnvironmentEdgeManager.currentTimeMillis();
if (now > lastLog + 10000) {
lastLog = now;
LOG.info("#" + id + ", waiting for " + currentInProgress + " actions to finish");
}
synchronized (actionsInProgress) {
if (actionsInProgress.get() == 0) break;
- actionsInProgress.wait(100);
+ actionsInProgress.wait(Math.min(100, hasWait ? (cutoff - now) : Long.MAX_VALUE));
}
}
- } catch (InterruptedException iex) {
- throw new InterruptedIOException(iex.getMessage());
}
+ return true;
}
@Override
@@ -931,7 +1326,8 @@ class AsyncProcess {
}
@Override
- public Object[] getResults() {
+ public Object[] getResults() throws InterruptedIOException {
+ waitUntilDone();
return results;
}
}
@@ -1080,4 +1476,8 @@ class AsyncProcess {
return new ConnectionManager.ServerErrorTracker(
this.serverTrackerTimeout, this.numTries);
}
+
+ private static boolean isReplicaGet(Row row) {
+ return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
+ }
}
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
index 0c4776d..779d15d 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
@@ -231,4 +232,9 @@ interface ClusterConnection extends HConnection {
* @return Default AsyncProcess associated with this connection.
*/
AsyncProcess getAsyncProcess();
+
+ /**
+ * @return All locations for a particular region.
+ */
+ RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException;
}
\ No newline at end of file
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
index dfc6d00..3038fb2 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
@@ -203,6 +204,11 @@ class ConnectionAdapter implements ClusterConnection {
}
@Override
+ public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException {
+ return wrappedConnection.locateRegionAll(tableName, row);
+ }
+
+ @Override
public void clearRegionCache() {
wrappedConnection.clearRegionCache();
}
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index 15f78c5..d2b58ae 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -946,10 +946,15 @@ class ConnectionManager {
}
@Override
- public HRegionLocation locateRegion(final TableName tableName,
- final byte [] row)
- throws IOException{
- RegionLocations locations = locateRegion(tableName, row, true, true);
+ public RegionLocations locateRegionAll(
+ final TableName tableName, final byte[] row) throws IOException{
+ return locateRegion(tableName, row, true, true);
+ }
+
+ @Override
+ public HRegionLocation locateRegion(
+ final TableName tableName, final byte[] row) throws IOException{
+ RegionLocations locations = locateRegionAll(tableName, row);
return locations == null ? null : locations.getRegionLocation();
}
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
index 2ca24dc..cad521a 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
@@ -84,17 +84,6 @@ public final class MultiAction {
return actions.keySet();
}
- /**
- * @return All actions from all regions in this container
- */
- public List> allActions() {
- List> res = new ArrayList>();
- for (List> lst : actions.values()) {
- res.addAll(lst);
- }
- return res;
- }
-
public boolean hasNonceGroup() {
return nonceGroup != HConstants.NO_NONCE;
}
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index cc26ecf..20cf766 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException;
/**
@@ -153,4 +154,9 @@ class MultiServerCallable extends RegionServerCallable {
// Use the location we were given in the constructor rather than go look it up.
setStub(getConnection().getClient(this.location.getServerName()));
}
+
+ @VisibleForTesting
+ ServerName getServerName() {
+ return location.getServerName();
+ }
}
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java
index abe9bf5..6b1465d 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java
@@ -30,7 +30,7 @@ public class RegionReplicaUtil {
/**
* The default replicaId for the region
*/
- private static final int DEFAULT_REPLICA_ID = 0;
+ static final int DEFAULT_REPLICA_ID = 0;
/**
* Returns the HRegionInfo for the given replicaId. HRegionInfo's correspond to
@@ -62,4 +62,7 @@ public class RegionReplicaUtil {
return getRegionInfoForReplica(regionInfo, DEFAULT_REPLICA_ID);
}
+ public static boolean isDefaultReplica(int replicaId) {
+ return DEFAULT_REPLICA_ID == replicaId;
+ }
}
diff --git hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index edffd18..575827f 100644
--- hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.client;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -43,8 +45,11 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
@@ -53,6 +58,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
@Category(MediumTests.class)
public class TestAsyncProcess {
@@ -64,8 +70,9 @@ public class TestAsyncProcess {
private static final byte[] FAILS = "FAILS".getBytes();
private static final Configuration conf = new Configuration();
- private static ServerName sn = ServerName.valueOf("localhost:10,1254");
- private static ServerName sn2 = ServerName.valueOf("localhost:140,12540");
+ private static ServerName sn = ServerName.valueOf("s1:1,1");
+ private static ServerName sn2 = ServerName.valueOf("s2:2,2");
+ private static ServerName sn3 = ServerName.valueOf("s3:3,3");
private static HRegionInfo hri1 =
new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
private static HRegionInfo hri2 =
@@ -76,6 +83,16 @@ public class TestAsyncProcess {
private static HRegionLocation loc2 = new HRegionLocation(hri2, sn);
private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
+ // Replica stuff
+ private static HRegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1),
+ hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2);
+ private static HRegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1);
+ private static RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn),
+ new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3));
+ private static RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2),
+ new HRegionLocation(hri2r1, sn3));
+ private static RegionLocations hrls3 = new RegionLocations(new HRegionLocation(hri3, sn3), null);
+
private static final String success = "success";
private static Exception failure = new Exception("failure");
@@ -139,6 +156,7 @@ public class TestAsyncProcess {
ClusterConnection hc, Configuration conf, boolean useGlobalErrors, boolean dummy) {
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue(), new CountingThreadFactory(new AtomicInteger())) {
+ @Override
public void execute(Runnable command) {
throw new RejectedExecutionException("test under failure");
}
@@ -158,7 +176,17 @@ public class TestAsyncProcess {
protected RpcRetryingCaller createCaller(MultiServerCallable callable) {
callsCt.incrementAndGet();
final MultiResponse mr = createMultiResponse(
- callable.getMulti(), nbMultiResponse, nbActions);
+ callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
+ @Override
+ public void addResponse(MultiResponse mr, byte[] regionName, Action a) {
+ if (Arrays.equals(FAILS, a.getAction().getRow())) {
+ mr.add(regionName, a.getOriginalIndex(), failure);
+ } else {
+ mr.add(regionName, a.getOriginalIndex(), success);
+ }
+ }
+ });
+
return new RpcRetryingCaller(100, 10) {
@Override
public MultiResponse callWithoutRetries(RetryingCallable callable,
@@ -204,23 +232,106 @@ public class TestAsyncProcess {
}
}
- static MultiResponse createMultiResponse(
- final MultiAction multi, AtomicInteger nbMultiResponse, AtomicInteger nbActions) {
+ class MyAsyncProcessWithReplicas extends MyAsyncProcess {
+ private Set failures = new TreeSet(new Bytes.ByteArrayComparator());
+ private long primarySleepMs = 0, replicaSleepMs = 0;
+ private Map customPrimarySleepMs = new HashMap();
+ private final AtomicLong replicaCalls = new AtomicLong(0);
+
+ public void addFailures(HRegionInfo... hris) {
+ for (HRegionInfo hri : hris) {
+ failures.add(hri.getRegionName());
+ }
+ }
+
+ public long getReplicaCallCount() {
+ return replicaCalls.get();
+ }
+
+ public void setPrimaryCallDelay(ServerName server, long primaryMs) {
+ customPrimarySleepMs.put(server, primaryMs);
+ }
+
+ public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) {
+ super(hc, conf);
+ }
+
+ public void setCallDelays(long primaryMs, long replicaMs) {
+ this.primarySleepMs = primaryMs;
+ this.replicaSleepMs = replicaMs;
+ }
+
+ @Override
+ protected RpcRetryingCaller createCaller(
+ MultiServerCallable callable) {
+ final MultiResponse mr = createMultiResponse(
+ callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
+ @Override
+ public void addResponse(MultiResponse mr, byte[] regionName, Action a) {
+ if (failures.contains(regionName)) {
+ mr.add(regionName, a.getOriginalIndex(), failure);
+ } else {
+ boolean isStale = !RegionReplicaUtil.isDefaultReplica(a.getReplicaId());
+ mr.add(regionName, a.getOriginalIndex(),
+ Result.create(new Cell[0], null, isStale));
+ }
+ }
+ });
+ // Currently AsyncProcess either sends all-replica, or all-primary request.
+ final boolean isDefault = RegionReplicaUtil.isDefaultReplica(
+ callable.getMulti().actions.values().iterator().next().iterator().next().getReplicaId());
+ final ServerName server = ((MultiServerCallable>)callable).getServerName();
+ String debugMsg = "Call to " + server + ", primary=" + isDefault + " with "
+ + callable.getMulti().actions.size() + " entries: ";
+ for (byte[] region : callable.getMulti().actions.keySet()) {
+ debugMsg += "[" + Bytes.toStringBinary(region) + "], ";
+ }
+ LOG.debug(debugMsg);
+ if (!isDefault) {
+ replicaCalls.incrementAndGet();
+ }
+
+ return new RpcRetryingCaller(100, 10) {
+ @Override
+ public MultiResponse callWithoutRetries(RetryingCallable callable, int callTimeout)
+ throws IOException, RuntimeException {
+ long sleep = -1;
+ if (isDefault) {
+ Long customSleep = customPrimarySleepMs.get(server);
+ sleep = (customSleep == null ? primarySleepMs : customSleep.longValue());
+ } else {
+ sleep = replicaSleepMs;
+ }
+ if (sleep != 0) {
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ }
+ }
+ return mr;
+ }
+ };
+ }
+ }
+
+ static MultiResponse createMultiResponse(final MultiAction multi,
+ AtomicInteger nbMultiResponse, AtomicInteger nbActions, ResponseGenerator gen) {
final MultiResponse mr = new MultiResponse();
nbMultiResponse.incrementAndGet();
for (Map.Entry>> entry : multi.actions.entrySet()) {
byte[] regionName = entry.getKey();
for (Action a : entry.getValue()) {
nbActions.incrementAndGet();
- if (Arrays.equals(FAILS, a.getAction().getRow())) {
- mr.add(regionName, a.getOriginalIndex(), failure);
- } else {
- mr.add(regionName, a.getOriginalIndex(), success);
- }
+ gen.addResponse(mr, regionName, a);
}
}
return mr;
}
+
+ private static interface ResponseGenerator {
+ void addResponse(final MultiResponse mr, byte[] regionName, Action a);
+ }
+
/**
* Returns our async process.
*/
@@ -233,9 +344,8 @@ public class TestAsyncProcess {
}
@Override
- public HRegionLocation locateRegion(final TableName tableName,
- final byte[] row) {
- return loc1;
+ public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException {
+ return new RegionLocations(loc1);
}
}
@@ -253,18 +363,18 @@ public class TestAsyncProcess {
}
@Override
- public HRegionLocation locateRegion(final TableName tableName,
- final byte[] row) {
+ public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException {
int i = 0;
- for (HRegionLocation hr:hrl){
- if (Arrays.equals(row, hr.getRegionInfo().getStartKey())){
- usedRegions[i] = true;
- return hr;
+ for (HRegionLocation hr : hrl){
+ if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) {
+ usedRegions[i] = true;
+ return new RegionLocations(hr);
}
i++;
}
return null;
}
+
}
@Test
@@ -284,6 +394,7 @@ public class TestAsyncProcess {
ClusterConnection hc = createHConnection();
final AtomicInteger updateCalled = new AtomicInteger(0);
Batch.Callback