diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java index 9b32e93..d6981ea 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.client; +import java.util.ArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.RunnableFuture; @@ -32,13 +33,20 @@ import org.apache.htrace.Trace; * Keeps the list of the futures, and allows to cancel them all. * This means as well that it can be used for a small set of tasks only. *
Implementation is not Thread safe. + * + * CompletedTasks is implemented as a queue, the entry is added based on the time order. I.e, + * when the first task completes (whether it is a success or failure), it is added as a first + * entry in the queue, the next completed task is added as a second entry in the queue, ... + * When iterating through the queue, we know it is based on time order. If the first + * completed task succeeds, it is returned. If it is failure, the iteration goes on until it + * finds a success. */ @InterfaceAudience.Private public class ResultBoundedCompletionService { private final RpcRetryingCallerFactory retryingCallerFactory; private final Executor executor; private final QueueingFuture[] tasks; // all the tasks - private volatile QueueingFuture completed = null; + private final ArrayList completedTasks; // completed tasks private volatile boolean cancelled = false; class QueueingFuture implements RunnableFuture { @@ -49,12 +57,14 @@ public class ResultBoundedCompletionService { private final int callTimeout; private final RpcRetryingCaller retryingCaller; private boolean resultObtained = false; + private final int id; // replica id - public QueueingFuture(RetryingCallable future, int callTimeout) { + public QueueingFuture(RetryingCallable future, int callTimeout, int id) { this.future = future; this.callTimeout = callTimeout; this.retryingCaller = retryingCallerFactory.newCaller(); + this.id = id; } @SuppressWarnings("unchecked") @@ -70,8 +80,8 @@ public class ResultBoundedCompletionService { } finally { synchronized (tasks) { // If this wasn't canceled then store the result. - if (!cancelled && completed == null) { - completed = (QueueingFuture) QueueingFuture.this; + if (!cancelled) { + completedTasks.add(QueueingFuture.this); } // Notify just in case there was someone waiting and this was canceled. @@ -80,6 +90,7 @@ public class ResultBoundedCompletionService { } } } + @Override public boolean cancel(boolean mayInterruptIfRunning) { if (resultObtained || exeEx != null) return false; @@ -129,6 +140,10 @@ public class ResultBoundedCompletionService { throw new TimeoutException("timeout=" + timeout + ", " + unit); } + + public int getId() { + return id; + } } @SuppressWarnings("unchecked") @@ -138,27 +153,44 @@ public class ResultBoundedCompletionService { this.retryingCallerFactory = retryingCallerFactory; this.executor = executor; this.tasks = new QueueingFuture[maxTasks]; + this.completedTasks = new ArrayList<>(maxTasks); } public void submit(RetryingCallable task, int callTimeout, int id) { - QueueingFuture newFuture = new QueueingFuture(task, callTimeout); + QueueingFuture newFuture = new QueueingFuture(task, callTimeout, id); executor.execute(Trace.wrap(newFuture)); tasks[id] = newFuture; } public QueueingFuture take() throws InterruptedException { synchronized (tasks) { - while (completed == null && !cancelled) tasks.wait(); + while (!cancelled && (completedTasks.size() < 1)) tasks.wait(); } - return completed; + return completedTasks.get(0); } public QueueingFuture poll(long timeout, TimeUnit unit) throws InterruptedException { + return poll(timeout, unit, 1); + } + + /** + * Poll for the Nth completed task (index starts from 1 (the 1st), 2 (the second)...) + * + * @param timeout - time to wait before it times out + * @param unit - time unit for timeout + * @param index - the index(th) completed task, index starting from 1 + */ + public QueueingFuture poll(long timeout, TimeUnit unit, int index) throws InterruptedException { + if (index <= 0) { + return null; + } + synchronized (tasks) { - if (completed == null && !cancelled) unit.timedWait(tasks, timeout); + if (!cancelled && (completedTasks.size() < index)) unit.timedWait(tasks, timeout); + if (completedTasks.size() < index) return null; } - return completed; + return completedTasks.get(index - 1); } public void cancelAll() { diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 8d63295..da23455 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -29,6 +29,8 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; @@ -53,6 +55,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; */ @InterfaceAudience.Private public class RpcRetryingCallerWithReadReplicas { + private static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class); protected final ExecutorService pool; protected final ClusterConnection cConnection; protected final Configuration conf; @@ -185,9 +188,12 @@ public class RpcRetryingCallerWithReadReplicas { : RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow()); ResultBoundedCompletionService cs = new ResultBoundedCompletionService(this.rpcRetryingCallerFactory, pool, rl.size()); + int startIndex = 0; + int endIndex = rl.size(); if(isTargetReplicaSpecified) { addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId()); + endIndex = 1; } else { addCallsForReplica(cs, rl, 0, 0); try { @@ -197,7 +203,11 @@ public class RpcRetryingCallerWithReadReplicas { return f.get(); //great we got a response } } catch (ExecutionException e) { - throwEnrichedException(e, retries); + // We ignore the ExecutionException and continue with the secondary replicas + LOG.info("Primary replica returns " + e.getCause()); + + // Skip the result from the primary as we know that there is something wrong + startIndex = 1; } catch (CancellationException e) { throw new InterruptedIOException(); } catch (InterruptedException e) { @@ -209,18 +219,27 @@ public class RpcRetryingCallerWithReadReplicas { } try { - try { - long start = EnvironmentEdgeManager.currentTime(); - Future f = cs.poll(operationTimeout, TimeUnit.MILLISECONDS); - long duration = EnvironmentEdgeManager.currentTime() - start; - if (f == null) { - throw new RetriesExhaustedException("timed out after " + duration + " ms"); + Future f = null; + long start, duration; + long totalDuration = 0; + for (int i = startIndex; i < endIndex; i ++) { + try { + start = EnvironmentEdgeManager.currentTime(); + f = cs.poll(operationTimeout, TimeUnit.MILLISECONDS, i + 1); + duration = EnvironmentEdgeManager.currentTime() - start; + totalDuration += duration; + // Even with operationTimeout less than 0, still loop through the rest as there could + // be other completed tasks before operationTimeout. + operationTimeout -= duration; + if (f == null) { + throw new RetriesExhaustedException("timed out after " + totalDuration + " ms"); + } + return f.get(); + } catch (ExecutionException e) { + // we continue here as we need to loop through all the results. + LOG.info("Replica " + ((ResultBoundedCompletionService.QueueingFuture)f).getId() + + " returns " + e.getCause()); } - return f.get(operationTimeout - duration, TimeUnit.MILLISECONDS); - } catch (ExecutionException e) { - throwEnrichedException(e, retries); - } catch (TimeoutException te) { - throw new RetriesExhaustedException("timed out after " + operationTimeout + " ms"); } } catch (CancellationException e) { throw new InterruptedIOException(); @@ -232,6 +251,7 @@ public class RpcRetryingCallerWithReadReplicas { cs.cancelAll(); } + // TODO: is an exception needed here? return null; // unreachable } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 096841b..672d050 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -160,6 +160,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { // allocate a boundedcompletion pool of some multiple of number of replicas. // We want to accomodate some RPCs for redundant replica scans (but are still in progress) + // TODO: One question here, why do we need * 5? Seems that it is only for rl.size() ResultBoundedCompletionService> cs = new ResultBoundedCompletionService>( RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool, @@ -169,26 +170,28 @@ class ScannerCallableWithReplicas implements RetryingCallable { replicaSwitched.set(false); // submit call for the primary replica. addCallsForCurrentReplica(cs, rl); + int startIndex = 0; try { // wait for the timeout to see whether the primary responds back Future> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds if (f != null) { - Pair r = f.get(timeout, TimeUnit.MILLISECONDS); + // After poll, if f is not null, there must be a completed task + Pair r = f.get(); if (r != null && r.getSecond() != null) { updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); } return r == null ? null : r.getFirst(); //great we got a response } } catch (ExecutionException e) { - RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries); + // We ignore the ExecutionException and continue with the replicas + LOG.info("Scan with primary region returns " + e.getCause()); + startIndex = 1; } catch (CancellationException e) { throw new InterruptedIOException(e.getMessage()); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); - } catch (TimeoutException e) { - throw new InterruptedIOException(e.getMessage()); } // submit call for the all of the secondaries at once @@ -196,34 +199,45 @@ class ScannerCallableWithReplicas implements RetryingCallable { addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1); try { - long start = EnvironmentEdgeManager.currentTime(); - Future> f = cs.poll(timeout, TimeUnit.MILLISECONDS); - long duration = EnvironmentEdgeManager.currentTime() - start; - if (f != null) { - Pair r = f.get(timeout - duration, TimeUnit.MILLISECONDS); - if (r != null && r.getSecond() != null) { - updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); + long start, duration; + long totalDuration = 0; + + for (int i = startIndex; i < rl.size(); i ++) { + try { + start = EnvironmentEdgeManager.currentTime(); + Future> f = cs.poll(timeout, TimeUnit.MILLISECONDS, i + 1); + + duration = EnvironmentEdgeManager.currentTime() - start; + totalDuration += duration; + timeout -= duration; + if (f != null) { + // The result is already there, f.get() is supposed to return immediately + Pair r = f.get(); + if (r != null && r.getSecond() != null) { + updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); + } + return r == null ? null : r.getFirst(); // great we got an answer + } else { + throw new IOException("Failed to get result within timeout, timeout=" + + totalDuration + "ms"); + } + } catch (ExecutionException e) { + // Continue to loop through all the results. + LOG.info("Scan region returns " + e.getCause()); } - return r == null ? null : r.getFirst(); // great we got an answer - } else { - throw new IOException("Failed to get result within timeout, timeout=" - + timeout + "ms"); } - } catch (ExecutionException e) { - RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries); } catch (CancellationException e) { throw new InterruptedIOException(e.getMessage()); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); - } catch (TimeoutException e) { - throw new InterruptedIOException(e.getMessage()); } finally { // We get there because we were interrupted or because one or more of the // calls succeeded or failed. In all case, we stop all our tasks. cs.cancelAll(); } - LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable - throw new IOException("Imposible? Arrive at an unreachable line..."); + + // No one succeeds, return null + return null; } private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result,