From 25b6103dadba16e85db0a8c5f2fc44ecf9fc3f2a Mon Sep 17 00:00:00 2001 From: sershe Date: Tue, 18 Feb 2014 23:37:17 +0000 Subject: [PATCH 06/45] HBASE-10356 Failover RPC's for multi-get git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1569559 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/hadoop/hbase/HRegionInfo.java | 2 +- .../org/apache/hadoop/hbase/RegionLocations.java | 5 + .../org/apache/hadoop/hbase/client/Action.java | 23 +- .../apache/hadoop/hbase/client/AsyncProcess.java | 580 +++++++++++++++++---- .../hadoop/hbase/client/ClusterConnection.java | 6 + .../hadoop/hbase/client/ConnectionAdapter.java | 6 + .../hadoop/hbase/client/ConnectionManager.java | 13 +- .../apache/hadoop/hbase/client/MultiAction.java | 11 - .../hadoop/hbase/client/MultiServerCallable.java | 6 + .../hadoop/hbase/client/RegionReplicaUtil.java | 5 +- .../hadoop/hbase/client/TestAsyncProcess.java | 332 ++++++++++-- .../hbase/client/CoprocessorHConnection.java | 13 +- .../hbase/client/HConnectionTestingUtility.java | 3 + 13 files changed, 852 insertions(+), 153 deletions(-) diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java index 0f846b5..59a3248 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java @@ -143,7 +143,7 @@ public class HRegionInfo implements Comparable { public static final byte REPLICA_ID_DELIMITER = (byte)'_'; private static final int MAX_REPLICA_ID = 0xFFFF; - private static final int DEFAULT_REPLICA_ID = 0; + static final int DEFAULT_REPLICA_ID = 0; /** * Does region name contain its encoded name? * @param regionName region name diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java index cdf1180..b5db549 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase; import java.util.Collection; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.util.Bytes; /** @@ -261,6 +262,10 @@ public class RegionLocations { return locations; } + public HRegionLocation getDefaultRegionLocation() { + return locations[HRegionInfo.DEFAULT_REPLICA_ID]; + } + /** * Returns the first not-null region location in the list */ diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java index c3e2104..5147c25 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; /** * A Get, Put, Increment, Append, or Delete associated with it's region. Used internally by @@ -27,18 +28,34 @@ import org.apache.hadoop.hbase.HConstants; * the index from the original request. */ @InterfaceAudience.Private +//TODO: R is never used public class Action implements Comparable { // TODO: This class should not be visible outside of the client package. private Row action; private int originalIndex; private long nonce = HConstants.NO_NONCE; + private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID; public Action(Row action, int originalIndex) { super(); this.action = action; - this.originalIndex = originalIndex; + this.originalIndex = originalIndex; } + /** + * Creates an action for a particular replica from original action. + * @param action Original action. + * @param replicaId Replica id for the new action. + */ + public Action(Action action, int replicaId) { + super(); + this.action = action.action; + this.nonce = action.nonce; + this.originalIndex = action.originalIndex; + this.replicaId = replicaId; + } + + public void setNonce(long nonce) { this.nonce = nonce; } @@ -55,6 +72,10 @@ public class Action implements Comparable { return originalIndex; } + public int getReplicaId() { + return replicaId; + } + @SuppressWarnings("rawtypes") @Override public int compareTo(Object o) { diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 714daeb..9419932 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.coprocessor.Batch; @@ -50,7 +51,6 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.htrace.Trace; - import com.google.common.annotations.VisibleForTesting; /** @@ -89,9 +89,11 @@ import com.google.common.annotations.VisibleForTesting; *

*/ class AsyncProcess { - private static final Log LOG = LogFactory.getLog(AsyncProcess.class); + protected static final Log LOG = LogFactory.getLog(AsyncProcess.class); protected static final AtomicLong COUNTER = new AtomicLong(); + public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout"; + /** * The context used to wait for results from one submit call. * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts), @@ -102,7 +104,7 @@ class AsyncProcess { public boolean hasError(); public RetriesExhaustedWithDetailsException getErrors(); public List getFailedOperations(); - public Object[] getResults(); + public Object[] getResults() throws InterruptedIOException; /** Wait until all tasks are executed, successfully or not. */ public void waitUntilDone() throws InterruptedIOException; } @@ -122,6 +124,27 @@ class AsyncProcess { public void waitUntilDone() throws InterruptedIOException {} }; + /** Sync point for calls to multiple replicas for the same user request (Get). + * Created and put in the results array (we assume replica calls require results) when + * the replica calls are launched. See results for details of this process. + * POJO, all fields are public. To modify them, the object itself is locked. */ + private static class ReplicaResultState { + public ReplicaResultState(int callCount) { + this.callCount = callCount; + } + + /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */ + int callCount; + /** Call that succeeds sets the count to 0 and sets this to result. Call that fails but + * is not last, adds error to list. If all calls fail the last one sets this to list. */ + Object result = null; + /** Errors for which it is not decided whether we will report them to user. If one of the + * calls succeeds, we will discard the errors that may have happened in the other calls. */ + BatchErrors replicaErrors = null; + } + + + // TODO: many of the fields should be made private protected final long id; protected final ClusterConnection hConnection; @@ -160,6 +183,7 @@ class AsyncProcess { protected int numTries; protected int serverTrackerTimeout; protected int timeout; + protected long primaryCallTimeout; // End configuration settings. protected static class BatchErrors { @@ -192,6 +216,12 @@ class AsyncProcess { actions.clear(); addresses.clear(); } + + public synchronized void merge(BatchErrors other) { + throwables.addAll(other.throwables); + actions.addAll(other.actions); + addresses.addAll(other.addresses); + } } public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, @@ -212,6 +242,7 @@ class AsyncProcess { HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.primaryCallTimeout = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10); this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS); @@ -270,7 +301,8 @@ class AsyncProcess { /** * Extract from the rows list what we can submit. The rows we can not submit are kept in the - * list. + * list. Does not send requests to replicas (not currently used for anything other + * than streaming puts anyway). * * @param pool ExecutorService to use. * @param tableName The table for which this request is needed. @@ -311,7 +343,7 @@ class AsyncProcess { Row r = it.next(); HRegionLocation loc; try { - loc = findDestLocation(tableName, r); + loc = findDestLocation(tableName, r, true).getDefaultRegionLocation(); } catch (IOException ex) { locationErrors = new ArrayList(); locationErrorRows = new ArrayList(); @@ -329,7 +361,9 @@ class AsyncProcess { Action action = new Action(r, ++posInList); setNonce(ng, r, action); retainedActions.add(action); - addAction(loc, action, actionsByServer, nonceGroup); + // TODO: replica-get is not supported on this path + byte[] regionName = loc.getRegionInfo().getRegionName(); + addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); it.remove(); } } @@ -347,7 +381,7 @@ class AsyncProcess { ars.manageError(originalIndex, row, false, locationErrors.get(i), null); } } - ars.sendMultiAction(actionsByServer, 1); + ars.sendMultiAction(actionsByServer, 1, null); return ars; } @@ -359,13 +393,12 @@ class AsyncProcess { * @param actionsByServer the multiaction per server * @param nonceGroup Nonce group. */ - private void addAction(HRegionLocation loc, Action action, + private void addAction(ServerName server, byte[] regionName, Action action, Map> actionsByServer, long nonceGroup) { - final byte[] regionName = loc.getRegionInfo().getRegionName(); - MultiAction multiAction = actionsByServer.get(loc.getServerName()); + MultiAction multiAction = actionsByServer.get(server); if (multiAction == null) { multiAction = new MultiAction(); - actionsByServer.put(loc.getServerName(), multiAction); + actionsByServer.put(server, multiAction); } if (action.hasNonce() && !multiAction.hasNonceGroup()) { multiAction.setNonceGroup(nonceGroup); @@ -380,10 +413,12 @@ class AsyncProcess { * @param row the row * @return the destination. */ - private HRegionLocation findDestLocation(TableName tableName, Row row) throws IOException { + private RegionLocations findDestLocation( + TableName tableName, Row row, boolean checkPrimary) throws IOException { if (row == null) throw new IllegalArgumentException("#" + id + ", row cannot be null"); - HRegionLocation loc = hConnection.locateRegion(tableName, row.getRow()); - if (loc == null) { + RegionLocations loc = hConnection.locateRegionAll(tableName, row.getRow()); + if (loc == null + || (checkPrimary && (loc.isEmpty() || loc.getDefaultRegionLocation() == null))) { throw new IOException("#" + id + ", no location found, aborting submit for" + " tableName=" + tableName + " rowkey=" + Arrays.toString(row.getRow())); } @@ -516,6 +551,144 @@ class AsyncProcess { * scheduling children. This is why lots of code doesn't require any synchronization. */ protected class AsyncRequestFutureImpl implements AsyncRequestFuture { + + /** + * Runnable (that can be submitted to thread pool) that waits for when it's time + * to issue replica calls, finds region replicas, groups the requests by replica and + * issues the calls (on separate threads, via sendMultiAction). + * This is done on a separate thread because we don't want to wait on user thread for + * our asynchronous call, and usually we have to wait before making replica calls. + */ + private final class ReplicaCallIssuingRunnable implements Runnable { + private final long startTime; + private final List> initialActions; + + public ReplicaCallIssuingRunnable(List> initialActions, long startTime) { + this.initialActions = initialActions; + this.startTime = startTime; + } + + @Override + public void run() { + boolean done = false; + if (primaryCallTimeout > 0) { + try { + done = waitUntilDone(startTime + primaryCallTimeout); + } catch (InterruptedException ex) { + LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage()); + return; + } + } + if (done) return; // Done within primary timeout + Map> actionsByServer = + new HashMap>(); + if (replicaGetIndices == null) { + for (int i = 0; i < results.length; ++i) { + addReplicaActions(i, actionsByServer); + } + } else { + for (int i = 0; i < replicaGetIndices.length; ++i) { + addReplicaActions(replicaGetIndices[i], actionsByServer); + } + } + if (actionsByServer.isEmpty()) return; // Nothing to do - done or no replicas found. + sendMultiAction(actionsByServer, 1, null); + } + + /** + * Add replica actions to action map by server. + * @param index Index of the original action. + * @param actionsByServer The map by server to add it to. + */ + private void addReplicaActions( + int index, Map> actionsByServer) { + if (results[index] != null) return; // opportunistic. Never goes from non-null to null. + Action action = initialActions.get(index); + RegionLocations loc = null; + try { + // For perf, we assume that this location coming from cache, since we just got location + // from meta for the primary call. If it turns out to not be the case, we'd need local + // cache since we want to keep as little time as possible before replica call. + loc = findDestLocation(tableName, action.getAction(), false); + } catch (IOException ex) { + manageError(action.getOriginalIndex(), action.getAction(), false, ex, null); + LOG.error("Cannot get location - no replica calls for some actions", ex); + return; + } + HRegionLocation[] locs = loc.getRegionLocations(); + int replicaCount = 0; + for (int i = 1; i < locs.length; ++i) { + replicaCount += (locs[i] != null) ? 1 : 0; + } + if (replicaCount == 0) { + LOG.warn("No replicas found for " + action.getAction()); + return; + } + synchronized (replicaResultLock) { + // Don't run replica calls if the original has finished. We could do it e.g. if + // original has already failed before first replica call (unlikely given retries), + // but that would require additional synchronization w.r.t. returning to caller. + if (results[index] != null) return; + // We set the number of calls here. After that any path must call setResult/setError. + results[index] = new ReplicaResultState(replicaCount + 1); + } + for (int i = 1; i < locs.length; ++i) { + if (locs[i] == null) continue; + addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(), + new Action(action, i), actionsByServer, nonceGroup); + } + } + } + + /** + * Runnable (that can be submitted to thread pool) that submits MultiAction to a + * single server. The server call is synchronous, therefore we do it on a thread pool. + */ + private final class SingleServerRequestRunnable implements Runnable { + private final MultiAction multiAction; + private final int numAttempt; + private final ServerName server; + + private SingleServerRequestRunnable( + MultiAction multiAction, int numAttempt, ServerName server) { + this.multiAction = multiAction; + this.numAttempt = numAttempt; + this.server = server; + } + + @Override + public void run() { + MultiResponse res; + try { + MultiServerCallable callable = createCallable(server, tableName, multiAction); + try { + res = createCaller(callable).callWithoutRetries(callable, timeout); + } catch (IOException e) { + // The service itself failed . It may be an error coming from the communication + // layer, but, as well, a functional error raised by the server. + receiveGlobalFailure(multiAction, server, numAttempt, e); + return; + } catch (Throwable t) { + // This should not happen. Let's log & retry anyway. + LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." + + " Retrying. Server is " + server + ", tableName=" + tableName, t); + receiveGlobalFailure(multiAction, server, numAttempt, t); + return; + } + + // Normal case: we received an answer from the server, and it's not an exception. + receiveMultiAction(multiAction, server, res, numAttempt); + } catch (Throwable t) { + // Something really bad happened. We are on the send thread that will now die. + LOG.error("Internal AsyncProcess #" + id + " error for " + + tableName + " processing for " + server, t); + throw new RuntimeException(t); + } finally { + decTaskCounters(multiAction.getRegions(), server); + } + } + } + private final Batch.Callback callback; private final BatchErrors errors; private final ConnectionManager.ServerErrorTracker errorsByServer; @@ -524,7 +697,21 @@ class AsyncProcess { private final TableName tableName; private final AtomicLong actionsInProgress = new AtomicLong(-1); + /** The lock controls access to results. It is only held when populating results where + * there might be several callers (eventual consistency gets). For other requests, + * there's one unique call going on per result index. */ + private final Object replicaResultLock = new Object(); + /** Result array. Null if results are not needed. Otherwise, each index corresponds to + * the action index in initial actions submitted. For most request types, has null-s for + * requests that are not done, and result/exception for those that are done. + * For eventual-consistency gets, initially the same applies; at some point, replica calls + * might be started, and ReplicaResultState is put at the corresponding indices. The + * returning calls check the type to detect when this is the case. After all calls are done, + * ReplicaResultState-s are replaced with results for the user. */ private final Object[] results; + /** Indices of replica gets in results. If null, all or no actions are replica-gets. */ + private final int[] replicaGetIndices; + private final boolean hasAnyReplicaGets; private final long nonceGroup; public AsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, @@ -545,6 +732,51 @@ class AsyncProcess { } else { this.results = needResults ? new Object[actions.size()] : null; } + List replicaGetIndices = null; + boolean hasAnyReplicaGets = false; + if (needResults) { + // Check to see if any requests might require replica calls. + // We expect that many requests will consist of all or no multi-replica gets; in such + // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will + // store the list of action indexes for which replica gets are possible, and set + // hasAnyReplicaGets to true. + boolean hasAnyNonReplicaReqs = false; + int posInList = 0; + for (Action action : actions) { + boolean isReplicaGet = isReplicaGet(action.getAction()); + if (isReplicaGet) { + hasAnyReplicaGets = true; + if (hasAnyNonReplicaReqs) { // Mixed case + if (replicaGetIndices == null) { + replicaGetIndices = new ArrayList(actions.size() - 1); + } + replicaGetIndices.add(posInList); + } + } else if (!hasAnyNonReplicaReqs) { + // The first non-multi-replica request in the action list. + hasAnyNonReplicaReqs = true; + if (posInList > 0) { + // Add all the previous requests to the index lists. We know they are all + // replica-gets because this is the first non-multi-replica request in the list. + replicaGetIndices = new ArrayList(actions.size() - 1); + for (int i = 0; i < posInList; ++i) { + replicaGetIndices.add(i); + } + } + } + ++posInList; + } + } + this.hasAnyReplicaGets = hasAnyReplicaGets; + if (replicaGetIndices != null) { + this.replicaGetIndices = new int[replicaGetIndices.size()]; + int i = 0; + for (Integer el : replicaGetIndices) { + this.replicaGetIndices[i++] = el; + } + } else { + this.replicaGetIndices = null; + } this.errorsByServer = createServerErrorTracker(); this.errors = (globalErrors != null) ? globalErrors : new BatchErrors(); } @@ -560,21 +792,40 @@ class AsyncProcess { final Map> actionsByServer = new HashMap>(); - HRegionLocation loc; + boolean isReplica = false; for (Action action : currentActions) { + RegionLocations locs = null; try { - loc = findDestLocation(tableName, action.getAction()); + locs = findDestLocation(tableName, action.getAction(), false); } catch (IOException ex) { // There are multiple retries in locateRegion already. No need to add new. // We can't continue with this row, hence it's the last retry. manageError(action.getOriginalIndex(), action.getAction(), false, ex, null); continue; } - addAction(loc, action, actionsByServer, nonceGroup); - } + boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); + if (isReplica && !isReplicaAction) { + // This is the property of the current implementation, not a requirement. + throw new AssertionError("Replica and non-replica actions in the same retry"); + } + isReplica = isReplicaAction; + HRegionLocation loc = locs.getRegionLocation(action.getReplicaId()); + if (loc == null || loc.getServerName() == null) { + // On retry, we couldn't find location for some replica we saw before. + String str = "Cannot find location for replica " + action.getReplicaId(); + LOG.error(str); + manageError(action.getOriginalIndex(), action.getAction(), + false, new IOException(str), null); + continue; + } + byte[] regionName = loc.getRegionInfo().getRegionName(); + addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); + } + // If this is a first attempt to group and send, no replicas, we need replica thread. if (!actionsByServer.isEmpty()) { - sendMultiAction(actionsByServer, numAttempt); + boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets); + sendMultiAction(actionsByServer, numAttempt, doStartReplica ? currentActions : null); } } @@ -584,51 +835,22 @@ class AsyncProcess { * * @param actionsByServer the actions structured by regions * @param numAttempt the attempt number. + * @param actionsForReplicaThread original actions for replica thread; null on non-first call. */ - private void sendMultiAction( - Map> actionsByServer, final int numAttempt) { + private void sendMultiAction(Map> actionsByServer, + int numAttempt, List> actionsForReplicaThread) { // Run the last item on the same thread if we are already on a send thread. // We hope most of the time it will be the only item, so we can cut down on threads. - int reuseThreadCountdown = (numAttempt > 1) ? actionsByServer.size() : Integer.MAX_VALUE; + int actionsRemaining = actionsByServer.size(); + // This iteration is by server (the HRegionLocation comparator is by server portion only). for (Map.Entry> e : actionsByServer.entrySet()) { final ServerName server = e.getKey(); final MultiAction multiAction = e.getValue(); incTaskCounters(multiAction.getRegions(), server); - Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() { - @Override - public void run() { - MultiResponse res; - try { - MultiServerCallable callable = createCallable(server, tableName, multiAction); - try { - res = createCaller(callable).callWithoutRetries(callable, timeout); - } catch (IOException e) { - // The service itself failed . It may be an error coming from the communication - // layer, but, as well, a functional error raised by the server. - receiveGlobalFailure(multiAction, server, numAttempt, e); - return; - } catch (Throwable t) { - // This should not happen. Let's log & retry anyway. - LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." + - " Retrying. Server is " + server.getServerName() + ", tableName=" + tableName, t); - receiveGlobalFailure(multiAction, server, numAttempt, t); - return; - } - - // Normal case: we received an answer from the server, and it's not an exception. - receiveMultiAction(multiAction, server, res, numAttempt); - } catch (Throwable t) { - // Something really bad happened. We are on the send thread that will now die. - LOG.error("Internal AsyncProcess #" + id + " error for " - + tableName + " processing for " + server, t); - throw new RuntimeException(t); - } finally { - decTaskCounters(multiAction.getRegions(), server); - } - } - }); - --reuseThreadCountdown; - if (reuseThreadCountdown == 0) { + Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", + new SingleServerRequestRunnable(multiAction, numAttempt, server)); + --actionsRemaining; + if ((numAttempt > 1) && actionsRemaining == 0) { runnable.run(); } else { try { @@ -645,6 +867,30 @@ class AsyncProcess { } } } + if (actionsForReplicaThread != null) { + startWaitingForReplicaCalls(actionsForReplicaThread); + } + } + + /** + * Starts waiting to issue replica calls on a different thread; or issues them immediately. + */ + private void startWaitingForReplicaCalls(List> actionsForReplicaThread) { + long startTime = EnvironmentEdgeManager.currentTimeMillis(); + ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable( + actionsForReplicaThread, startTime); + if (primaryCallTimeout == 0) { + // Start replica calls immediately. + replicaRunnable.run(); + } else { + // Start the thread that may kick off replica gets. + // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea. + try { + pool.submit(replicaRunnable); + } catch (RejectedExecutionException ree) { + LOG.warn("#" + id + ", replica task was rejected by the pool - no replica calls", ree); + } + } } /** @@ -665,11 +911,11 @@ class AsyncProcess { if (!canRetry) { // Batch.Callback was not called on failure in 0.94. We keep this. - errors.add(throwable, row, server); - if (results != null) { - setResult(originalIndex, row, throwable); - } - decActionCounter(); + setError(originalIndex, row, throwable, server); + } else { + // See if we are dealing with a replica action that was completed from other server. + // Doesn't have to be synchronized, worst case we'd retry and be unable to set result. + canRetry = !isActionComplete(originalIndex, row); } return canRetry; @@ -685,15 +931,17 @@ class AsyncProcess { */ private void receiveGlobalFailure( MultiAction rsActions, ServerName server, int numAttempt, Throwable t) { - // Do not use the exception for updating cache because it might be coming from - // any of the regions in the MultiAction. - byte[] row = rsActions.actions.values().iterator().next().get(0).getAction().getRow(); - hConnection.updateCachedLocations(tableName, null, row, null, server); errorsByServer.reportServerError(server); boolean canRetry = errorsByServer.canRetryMore(numAttempt); List> toReplay = new ArrayList>(); for (Map.Entry>> e : rsActions.actions.entrySet()) { + byte[] regionName = e.getKey(); + byte[] row = e.getValue().iterator().next().getAction().getRow(); + // Do not use the exception for updating cache because it might be coming from + // any of the regions in the MultiAction. + // TODO: depending on type of exception we might not want to update cache at all? + hConnection.updateCachedLocations(tableName, regionName, row, null, server); for (Action action : e.getValue()) { if (manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, server)) { toReplay.add(action); @@ -791,14 +1039,16 @@ class AsyncProcess { // Failure: retry if it's make sense else update the errors lists if (result == null || result instanceof Throwable) { Row row = sentAction.getAction(); - if (!regionFailureRegistered) { // We're doing this once per location. + // Register corresponding failures once per server/once per region. + if (!regionFailureRegistered) { regionFailureRegistered = true; - // The location here is used as a server name. - hConnection.updateCachedLocations(tableName, regionName, row.getRow(), result, server); - if (failureCount == 0) { - errorsByServer.reportServerError(server); - canRetry = errorsByServer.canRetryMore(numAttempt); - } + hConnection.updateCachedLocations( + tableName, regionName, row.getRow(), result, server); + } + if (failureCount == 0) { + errorsByServer.reportServerError(server); + // We determine canRetry only once for all calls, after reporting server failure. + canRetry = errorsByServer.canRetryMore(numAttempt); } ++failureCount; if (manageError( @@ -809,16 +1059,14 @@ class AsyncProcess { if (callback != null) { try { //noinspection unchecked + // TODO: would callback expect a replica region name if it gets one? this.callback.update(regionName, sentAction.getAction().getRow(), (CResult)result); } catch (Throwable t) { LOG.error("User callback threw an exception for " + Bytes.toStringBinary(regionName) + ", ignoring", t); } } - if (results != null) { - setResult(sentAction.getOriginalIndex(), sentAction.getAction(), result); - } - decActionCounter(); + setResult(sentAction, result); } } } @@ -881,38 +1129,185 @@ class AsyncProcess { return sb.toString(); } - private void setResult(int index, Row row, Object result) { - if (result == null) throw new RuntimeException("Result cannot be set to null"); - if (results[index] != null) throw new RuntimeException("Result was already set"); - results[index] = result; + /** + * Sets the non-error result from a particular action. + * @param action Action (request) that the server responded to. + * @param result The result. + */ + private void setResult(Action action, Object result) { + ReplicaResultState state = null; + boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); + if (results == null || ((state = trySetResultSimple( + action.getOriginalIndex(), action.getAction(), result, isStale)) == null)) { + decActionCounter(); + return; // Simple case, no replica requests. + } + synchronized (state) { + if (state.callCount == 0) return; // someone already set the result + state.result = result; + state.callCount = 0; + state.replicaErrors = null; // no longer matters + } + decActionCounter(); + } + + /** + * Sets the error from a particular action. + * @param index Original action index. + * @param row Original request. + * @param throwable The resulting error. + * @param server The source server. + */ + private void setError(int index, Row row, Throwable throwable, ServerName server) { + ReplicaResultState state = null; + if (results == null + || ((state = trySetResultSimple(index, row, throwable, false)) == null)) { + errors.add(throwable, row, server); + decActionCounter(); + return; // Simple case, no replica requests. + } + BatchErrors target = null; // Error will be added to final errors, or temp replica errors. + boolean isActionDone = false; + synchronized (state) { + switch (state.callCount) { + case 0: return; // someone already set the result + case 1: { // All calls failed, we are the last error. + state.result = throwable; + target = errors; + isActionDone = true; + break; + } + default: { + assert state.callCount > 1; + if (state.replicaErrors == null) { + state.replicaErrors = new BatchErrors(); + } + target = state.replicaErrors; + break; + } + } + --state.callCount; + } + target.add(throwable, row, server); + if (!isActionDone) return; + if (state.replicaErrors != null) { // last call, no need to lock + errors.merge(state.replicaErrors); + state.replicaErrors = null; + } + decActionCounter(); + } + + /** + * Checks if the action is complete; used on error to prevent needless retries. + * Does not synchronize, assuming element index/field accesses are atomic. + * This is an opportunistic optimization check, doesn't have to be strict. + * @param index Original action index. + * @param row Original request. + */ + private boolean isActionComplete(int index, Row row) { + if (!isReplicaGet(row)) return false; + Object resObj = results[index]; + return (resObj != null) && (!(resObj instanceof ReplicaResultState) + || ((ReplicaResultState)resObj).callCount == 0); + } + + /** + * Tries to set the result or error for a particular action as if there were no replica calls. + * @return null if successful; replica state if there were in fact replica calls. + */ + private ReplicaResultState trySetResultSimple( + int index, Row row, Object result, boolean isFromReplica) { + Object resObj = null; + if (!isReplicaGet(row)) { + if (isFromReplica) { + throw new AssertionError("Unexpected stale result for " + row); + } + results[index] = result; + } else { + synchronized (replicaResultLock) { + if ((resObj = results[index]) == null) { + if (isFromReplica) { + throw new AssertionError("Unexpected stale result for " + row); + } + results[index] = result; + } + } + } + return (resObj == null || !(resObj instanceof ReplicaResultState)) + ? null : (ReplicaResultState)resObj; } private void decActionCounter() { - actionsInProgress.decrementAndGet(); + if (hasAnyReplicaGets && (actionsInProgress.get() == 1)) { + // Convert replica sync structures to results. + int staleCount = 0; + if (replicaGetIndices == null) { + for (int i = 0; i < results.length; ++i) { + staleCount += convertReplicaResult(i) ? 1 : 0; + } + } else { + for (int i = 0; i < replicaGetIndices.length; ++i) { + staleCount += convertReplicaResult(replicaGetIndices[i]) ? 1 : 0; + } + } + if (!actionsInProgress.compareAndSet(1, 0)) { + throw new AssertionError("Cannot set actions in progress to 0"); + } + if (staleCount > 0) { + LOG.trace("Returning " + staleCount + " stale results"); + } + } else { + actionsInProgress.decrementAndGet(); + } synchronized (actionsInProgress) { actionsInProgress.notifyAll(); } } + private boolean convertReplicaResult(int index) { + if (!(results[index] instanceof ReplicaResultState)) return false; + ReplicaResultState state = (ReplicaResultState)results[index]; + // We know that noone will touch state with 0 callCount, no need to lock + if (state.callCount != 0) { + throw new AssertionError("Actions are done but callcount is " + state.callCount); + } + // TODO: we expect the Result coming from server to already have "isStale" specified. + Object res = results[index] = state.result; + return (res instanceof Result) && ((Result)res).isStale(); + } + @Override public void waitUntilDone() throws InterruptedIOException { - long lastLog = EnvironmentEdgeManager.currentTimeMillis(); - long currentInProgress; try { - while (0 != (currentInProgress = actionsInProgress.get())) { - long now = EnvironmentEdgeManager.currentTimeMillis(); + waitUntilDone(Long.MAX_VALUE); + } catch (InterruptedException iex) { + throw new InterruptedIOException(iex.getMessage()); + } + } + + private boolean waitUntilDone(long cutoff) throws InterruptedException { + boolean hasWait = cutoff != Long.MAX_VALUE; + long lastLog = hasWait ? 0 : EnvironmentEdgeManager.currentTimeMillis(); + long currentInProgress; + while (0 != (currentInProgress = actionsInProgress.get())) { + long now = 0; + if (hasWait && (now = EnvironmentEdgeManager.currentTimeMillis()) > cutoff) { + return false; + } + if (!hasWait) { + // Only log if wait is infinite. + now = EnvironmentEdgeManager.currentTimeMillis(); if (now > lastLog + 10000) { lastLog = now; LOG.info("#" + id + ", waiting for " + currentInProgress + " actions to finish"); } synchronized (actionsInProgress) { if (actionsInProgress.get() == 0) break; - actionsInProgress.wait(100); + actionsInProgress.wait(Math.min(100, hasWait ? (cutoff - now) : Long.MAX_VALUE)); } } - } catch (InterruptedException iex) { - throw new InterruptedIOException(iex.getMessage()); } + return true; } @Override @@ -931,7 +1326,8 @@ class AsyncProcess { } @Override - public Object[] getResults() { + public Object[] getResults() throws InterruptedIOException { + waitUntilDone(); return results; } } @@ -1080,4 +1476,8 @@ class AsyncProcess { return new ConnectionManager.ServerErrorTracker( this.serverTrackerTimeout, this.numTries); } + + private static boolean isReplicaGet(Row row) { + return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE); + } } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index 0c4776d..779d15d 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; @@ -231,4 +232,9 @@ interface ClusterConnection extends HConnection { * @return Default AsyncProcess associated with this connection. */ AsyncProcess getAsyncProcess(); + + /** + * @return All locations for a particular region. + */ + RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException; } \ No newline at end of file diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java index dfc6d00..3038fb2 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; @@ -203,6 +204,11 @@ class ConnectionAdapter implements ClusterConnection { } @Override + public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException { + return wrappedConnection.locateRegionAll(tableName, row); + } + + @Override public void clearRegionCache() { wrappedConnection.clearRegionCache(); } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 15f78c5..d2b58ae 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -946,10 +946,15 @@ class ConnectionManager { } @Override - public HRegionLocation locateRegion(final TableName tableName, - final byte [] row) - throws IOException{ - RegionLocations locations = locateRegion(tableName, row, true, true); + public RegionLocations locateRegionAll( + final TableName tableName, final byte[] row) throws IOException{ + return locateRegion(tableName, row, true, true); + } + + @Override + public HRegionLocation locateRegion( + final TableName tableName, final byte[] row) throws IOException{ + RegionLocations locations = locateRegionAll(tableName, row); return locations == null ? null : locations.getRegionLocation(); } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java index 2ca24dc..cad521a 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java @@ -84,17 +84,6 @@ public final class MultiAction { return actions.keySet(); } - /** - * @return All actions from all regions in this container - */ - public List> allActions() { - List> res = new ArrayList>(); - for (List> lst : actions.values()) { - res.addAll(lst); - } - return res; - } - public boolean hasNonceGroup() { return nonceGroup != HConstants.NO_NONCE; } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index cc26ecf..20cf766 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ServiceException; /** @@ -153,4 +154,9 @@ class MultiServerCallable extends RegionServerCallable { // Use the location we were given in the constructor rather than go look it up. setStub(getConnection().getClient(this.location.getServerName())); } + + @VisibleForTesting + ServerName getServerName() { + return location.getServerName(); + } } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java index abe9bf5..6b1465d 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionReplicaUtil.java @@ -30,7 +30,7 @@ public class RegionReplicaUtil { /** * The default replicaId for the region */ - private static final int DEFAULT_REPLICA_ID = 0; + static final int DEFAULT_REPLICA_ID = 0; /** * Returns the HRegionInfo for the given replicaId. HRegionInfo's correspond to @@ -62,4 +62,7 @@ public class RegionReplicaUtil { return getRegionInfoForReplica(regionInfo, DEFAULT_REPLICA_ID); } + public static boolean isDefaultReplica(int replicaId) { + return DEFAULT_REPLICA_ID == replicaId; + } } diff --git hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index edffd18..575827f 100644 --- hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.client; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -43,8 +45,11 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; @@ -53,6 +58,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; @Category(MediumTests.class) public class TestAsyncProcess { @@ -64,8 +70,9 @@ public class TestAsyncProcess { private static final byte[] FAILS = "FAILS".getBytes(); private static final Configuration conf = new Configuration(); - private static ServerName sn = ServerName.valueOf("localhost:10,1254"); - private static ServerName sn2 = ServerName.valueOf("localhost:140,12540"); + private static ServerName sn = ServerName.valueOf("s1:1,1"); + private static ServerName sn2 = ServerName.valueOf("s2:2,2"); + private static ServerName sn3 = ServerName.valueOf("s3:3,3"); private static HRegionInfo hri1 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1); private static HRegionInfo hri2 = @@ -76,6 +83,16 @@ public class TestAsyncProcess { private static HRegionLocation loc2 = new HRegionLocation(hri2, sn); private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2); + // Replica stuff + private static HRegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1), + hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2); + private static HRegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1); + private static RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn), + new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3)); + private static RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2), + new HRegionLocation(hri2r1, sn3)); + private static RegionLocations hrls3 = new RegionLocations(new HRegionLocation(hri3, sn3), null); + private static final String success = "success"; private static Exception failure = new Exception("failure"); @@ -139,6 +156,7 @@ public class TestAsyncProcess { ClusterConnection hc, Configuration conf, boolean useGlobalErrors, boolean dummy) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue(), new CountingThreadFactory(new AtomicInteger())) { + @Override public void execute(Runnable command) { throw new RejectedExecutionException("test under failure"); } @@ -158,7 +176,17 @@ public class TestAsyncProcess { protected RpcRetryingCaller createCaller(MultiServerCallable callable) { callsCt.incrementAndGet(); final MultiResponse mr = createMultiResponse( - callable.getMulti(), nbMultiResponse, nbActions); + callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() { + @Override + public void addResponse(MultiResponse mr, byte[] regionName, Action a) { + if (Arrays.equals(FAILS, a.getAction().getRow())) { + mr.add(regionName, a.getOriginalIndex(), failure); + } else { + mr.add(regionName, a.getOriginalIndex(), success); + } + } + }); + return new RpcRetryingCaller(100, 10) { @Override public MultiResponse callWithoutRetries(RetryingCallable callable, @@ -204,23 +232,106 @@ public class TestAsyncProcess { } } - static MultiResponse createMultiResponse( - final MultiAction multi, AtomicInteger nbMultiResponse, AtomicInteger nbActions) { + class MyAsyncProcessWithReplicas extends MyAsyncProcess { + private Set failures = new TreeSet(new Bytes.ByteArrayComparator()); + private long primarySleepMs = 0, replicaSleepMs = 0; + private Map customPrimarySleepMs = new HashMap(); + private final AtomicLong replicaCalls = new AtomicLong(0); + + public void addFailures(HRegionInfo... hris) { + for (HRegionInfo hri : hris) { + failures.add(hri.getRegionName()); + } + } + + public long getReplicaCallCount() { + return replicaCalls.get(); + } + + public void setPrimaryCallDelay(ServerName server, long primaryMs) { + customPrimarySleepMs.put(server, primaryMs); + } + + public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) { + super(hc, conf); + } + + public void setCallDelays(long primaryMs, long replicaMs) { + this.primarySleepMs = primaryMs; + this.replicaSleepMs = replicaMs; + } + + @Override + protected RpcRetryingCaller createCaller( + MultiServerCallable callable) { + final MultiResponse mr = createMultiResponse( + callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() { + @Override + public void addResponse(MultiResponse mr, byte[] regionName, Action a) { + if (failures.contains(regionName)) { + mr.add(regionName, a.getOriginalIndex(), failure); + } else { + boolean isStale = !RegionReplicaUtil.isDefaultReplica(a.getReplicaId()); + mr.add(regionName, a.getOriginalIndex(), + Result.create(new Cell[0], null, isStale)); + } + } + }); + // Currently AsyncProcess either sends all-replica, or all-primary request. + final boolean isDefault = RegionReplicaUtil.isDefaultReplica( + callable.getMulti().actions.values().iterator().next().iterator().next().getReplicaId()); + final ServerName server = ((MultiServerCallable)callable).getServerName(); + String debugMsg = "Call to " + server + ", primary=" + isDefault + " with " + + callable.getMulti().actions.size() + " entries: "; + for (byte[] region : callable.getMulti().actions.keySet()) { + debugMsg += "[" + Bytes.toStringBinary(region) + "], "; + } + LOG.debug(debugMsg); + if (!isDefault) { + replicaCalls.incrementAndGet(); + } + + return new RpcRetryingCaller(100, 10) { + @Override + public MultiResponse callWithoutRetries(RetryingCallable callable, int callTimeout) + throws IOException, RuntimeException { + long sleep = -1; + if (isDefault) { + Long customSleep = customPrimarySleepMs.get(server); + sleep = (customSleep == null ? primarySleepMs : customSleep.longValue()); + } else { + sleep = replicaSleepMs; + } + if (sleep != 0) { + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + } + } + return mr; + } + }; + } + } + + static MultiResponse createMultiResponse(final MultiAction multi, + AtomicInteger nbMultiResponse, AtomicInteger nbActions, ResponseGenerator gen) { final MultiResponse mr = new MultiResponse(); nbMultiResponse.incrementAndGet(); for (Map.Entry>> entry : multi.actions.entrySet()) { byte[] regionName = entry.getKey(); for (Action a : entry.getValue()) { nbActions.incrementAndGet(); - if (Arrays.equals(FAILS, a.getAction().getRow())) { - mr.add(regionName, a.getOriginalIndex(), failure); - } else { - mr.add(regionName, a.getOriginalIndex(), success); - } + gen.addResponse(mr, regionName, a); } } return mr; } + + private static interface ResponseGenerator { + void addResponse(final MultiResponse mr, byte[] regionName, Action a); + } + /** * Returns our async process. */ @@ -233,9 +344,8 @@ public class TestAsyncProcess { } @Override - public HRegionLocation locateRegion(final TableName tableName, - final byte[] row) { - return loc1; + public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException { + return new RegionLocations(loc1); } } @@ -253,18 +363,18 @@ public class TestAsyncProcess { } @Override - public HRegionLocation locateRegion(final TableName tableName, - final byte[] row) { + public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException { int i = 0; - for (HRegionLocation hr:hrl){ - if (Arrays.equals(row, hr.getRegionInfo().getStartKey())){ - usedRegions[i] = true; - return hr; + for (HRegionLocation hr : hrl){ + if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) { + usedRegions[i] = true; + return new RegionLocations(hr); } i++; } return null; } + } @Test @@ -284,6 +394,7 @@ public class TestAsyncProcess { ClusterConnection hc = createHConnection(); final AtomicInteger updateCalled = new AtomicInteger(0); Batch.Callback cb = new Batch.Callback() { + @Override public void update(byte[] region, byte[] row, Object result) { updateCalled.incrementAndGet(); } @@ -458,6 +569,7 @@ public class TestAsyncProcess { final Thread myThread = Thread.currentThread(); Thread t = new Thread() { + @Override public void run() { Threads.sleep(2000); myThread.interrupt(); @@ -478,6 +590,7 @@ public class TestAsyncProcess { final long sleepTime = 2000; Thread t2 = new Thread() { + @Override public void run() { Threads.sleep(sleepTime); while (ap.tasksInProgress.get() > 0) { @@ -496,32 +609,33 @@ public class TestAsyncProcess { } private static ClusterConnection createHConnection() throws IOException { - ClusterConnection hc = Mockito.mock(ClusterConnection.class); - - Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE), - Mockito.eq(DUMMY_BYTES_1), Mockito.anyBoolean())).thenReturn(loc1); - Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), - Mockito.eq(DUMMY_BYTES_1))).thenReturn(loc1); - - Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE), - Mockito.eq(DUMMY_BYTES_2), Mockito.anyBoolean())).thenReturn(loc2); - Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), - Mockito.eq(DUMMY_BYTES_2))).thenReturn(loc2); + ClusterConnection hc = createHConnectionCommon(); + setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1)); + setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2)); + setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3)); + setMockLocation(hc, FAILS, new RegionLocations(loc2)); + return hc; + } - Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE), - Mockito.eq(DUMMY_BYTES_3), Mockito.anyBoolean())).thenReturn(loc2); - Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), - Mockito.eq(DUMMY_BYTES_3))).thenReturn(loc3); + private static ClusterConnection createHConnectionWithReplicas() throws IOException { + ClusterConnection hc = createHConnectionCommon(); + setMockLocation(hc, DUMMY_BYTES_1, hrls1); + setMockLocation(hc, DUMMY_BYTES_2, hrls2); + setMockLocation(hc, DUMMY_BYTES_3, hrls3); + return hc; + } - Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE), - Mockito.eq(FAILS), Mockito.anyBoolean())).thenReturn(loc2); - Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), - Mockito.eq(FAILS))).thenReturn(loc2); + private static void setMockLocation(ClusterConnection hc, byte[] row, + RegionLocations result) throws IOException { + Mockito.when(hc.locateRegionAll( + Mockito.eq(DUMMY_TABLE), Mockito.eq(row))).thenReturn(result); + } + private static ClusterConnection createHConnectionCommon() { + ClusterConnection hc = Mockito.mock(ClusterConnection.class); NonceGenerator ng = Mockito.mock(NonceGenerator.class); Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); Mockito.when(hc.getNonceGenerator()).thenReturn(ng); - return hc; } @@ -756,7 +870,124 @@ public class TestAsyncProcess { Assert.assertEquals("nbReg=" + nbReg, nbReg, NB_REGS); } - private void verifyResult(AsyncRequestFuture ars, boolean... expected) { + @Test + public void testReplicaReplicaSuccess() throws Exception { + // Main call takes too long so replicas succeed, except for one region w/o replicas. + // One region has no replica, so the main call succeeds for it. + MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0); + List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); + AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]); + verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE); + Assert.assertEquals(2, ap.getReplicaCallCount()); + } + + @Test + public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception { + // Main call succeeds before replica calls are kicked off. + MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0); + List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); + AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]); + verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE); + Assert.assertEquals(0, ap.getReplicaCallCount()); + } + + @Test + public void testReplicaParallelCallsSucceed() throws Exception { + // Either main or replica can succeed. + MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0); + List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); + AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]); + verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE); + long replicaCalls = ap.getReplicaCallCount(); + Assert.assertTrue(replicaCalls >= 0); + Assert.assertTrue(replicaCalls <= 2); + } + + @Test + public void testReplicaPartialReplicaCall() throws Exception { + // One server is slow, so the result for its region comes from replica, whereas + // the result for other region comes from primary before replica calls happen. + // There should be no replica call for that region at all. + MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0); + ap.setPrimaryCallDelay(sn2, 2000); + List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); + AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]); + verifyReplicaResult(ars, RR.FALSE, RR.TRUE); + Assert.assertEquals(1, ap.getReplicaCallCount()); + } + + @Test + public void testReplicaMainFailsBeforeReplicaCalls() throws Exception { + // Main calls fail before replica calls can start - this is currently not handled. + // It would probably never happen if we can get location (due to retries), + // and it would require additional synchronization. + MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 1); + ap.addFailures(hri1, hri2); + List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); + AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]); + verifyReplicaResult(ars, RR.FAILED, RR.FAILED); + Assert.assertEquals(0, ap.getReplicaCallCount()); + } + + @Test + public void testReplicaReplicaSuccessWithParallelFailures() throws Exception { + // Main calls fails after replica calls start. For two-replica region, one replica call + // also fails. Regardless, we get replica results for both regions. + MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 1); + ap.addFailures(hri1, hri1r2, hri2); + List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); + AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]); + verifyReplicaResult(ars, RR.TRUE, RR.TRUE); + Assert.assertEquals(2, ap.getReplicaCallCount()); + } + + @Test + public void testReplicaAllCallsFailForOneRegion() throws Exception { + // For one of the region, all 3, main and replica, calls fail. For the other, replica + // call fails but its exception should not be visible as it did succeed. + MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 1); + ap.addFailures(hri1, hri1r1, hri1r2, hri2r1); + List rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); + AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]); + verifyReplicaResult(ars, RR.FAILED, RR.FALSE); + // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1 + Assert.assertEquals(3, ars.getErrors().getNumExceptions()); + for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) { + Assert.assertArrayEquals(DUMMY_BYTES_1, ars.getErrors().getRow(i).getRow()); + } + } + + private MyAsyncProcessWithReplicas createReplicaAp( + int replicaAfterMs, int primaryMs, int replicaMs) throws Exception { + return createReplicaAp(replicaAfterMs, primaryMs, replicaMs, -1); + } + + private MyAsyncProcessWithReplicas createReplicaAp( + int replicaAfterMs, int primaryMs, int replicaMs, int retries) throws Exception { + // TODO: this is kind of timing dependent... perhaps it should detect from createCaller + // that the replica call has happened and that way control the ordering. + Configuration conf = new Configuration(); + ClusterConnection conn = createHConnectionWithReplicas(); + conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs); + if (retries > 0) { + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); + } + MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf); + ap.setCallDelays(primaryMs, replicaMs); + return ap; + } + + private static List makeTimelineGets(byte[]... rows) { + List result = new ArrayList(); + for (byte[] row : rows) { + Get get = new Get(row); + get.setConsistency(Consistency.TIMELINE); + result.add(get); + } + return result; + } + + private void verifyResult(AsyncRequestFuture ars, boolean... expected) throws Exception { Object[] actual = ars.getResults(); Assert.assertEquals(expected.length, actual.length); for (int i = 0; i < expected.length; ++i) { @@ -764,6 +995,27 @@ public class TestAsyncProcess { } } + /** After reading TheDailyWtf, I always wanted to create a MyBoolean enum like this! */ + private enum RR { + TRUE, + FALSE, + DONT_CARE, + FAILED + } + + private void verifyReplicaResult(AsyncRequestFuture ars, RR... expecteds) throws Exception { + Object[] actuals = ars.getResults(); + Assert.assertEquals(expecteds.length, actuals.length); + for (int i = 0; i < expecteds.length; ++i) { + Object actual = actuals[i]; + RR expected = expecteds[i]; + Assert.assertEquals(expected == RR.FAILED, actual instanceof Throwable); + if (expected != RR.FAILED && expected != RR.DONT_CARE) { + Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale()); + } + } + } + /** * @param regCnt the region: 1 to 3. * @param success if true, the put will succeed. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java index cc0901f..e2be188 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java @@ -28,12 +28,10 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -49,7 +47,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class CoprocessorHConnection implements ClusterConnection { +class CoprocessorHConnection implements ClusterConnection { private static final NonceGenerator ng = new ConnectionManager.NoNonceGenerator(); /** @@ -60,7 +58,7 @@ public class CoprocessorHConnection implements ClusterConnection { * @return an unmanaged {@link HConnection}. * @throws IOException if we cannot create the basic connection */ - public static ClusterConnection getConnectionForEnvironment(CoprocessorEnvironment env) + static ClusterConnection getConnectionForEnvironment(CoprocessorEnvironment env) throws IOException { ClusterConnection connection = ConnectionManager.createConnectionInternal(env.getConfiguration()); @@ -427,4 +425,9 @@ public class CoprocessorHConnection implements ClusterConnection { public AsyncProcess getAsyncProcess() { return delegate.getAsyncProcess(); } + + @Override + public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException { + return delegate.locateRegionAll(tableName, row); + } } \ No newline at end of file diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index 5a86ab5..f925fea 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -110,6 +111,8 @@ public class HConnectionTestingUtility { thenReturn(loc); Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())). thenReturn(loc); + Mockito.when(c.locateRegionAll((TableName) Mockito.any(), (byte[]) Mockito.any())). + thenReturn(new RegionLocations(loc)); if (admin != null) { // If a call to getAdmin, return this implementation. Mockito.when(c.getAdmin(Mockito.any(ServerName.class))). -- 2.0.0