diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableCallable.java new file mode 100644 index 0000000..8568017 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableCallable.java @@ -0,0 +1,25 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +interface CancellableCallable extends RetryingCallable { + public void startCancel(); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java index 90fb1c2..9fc9cc6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -169,7 +168,7 @@ public class ClientSmallScanner extends ClientScanner { ScanRequest request = RequestConverter.buildScanRequest(getLocation() .getRegionInfo().getRegionName(), getScan(), getCaching(), true); ScanResponse response = null; - PayloadCarryingRpcController controller = controllerFactory.newController(); + controller = controllerFactory.newController(); try { controller.setPriority(getTableName()); controller.setCallTimeout(timeout); @@ -183,8 +182,8 @@ public class ClientSmallScanner extends ClientScanner { @Override public ScannerCallable getScannerCallableForReplica(int id) { - return new SmallScannerCallable((ClusterConnection)connection, tableName, getScan(), scanMetrics, - controllerFactory, getCaching(), id); + return new SmallScannerCallable((ClusterConnection)connection, tableName, getScan(), + scanMetrics, controllerFactory, getCaching(), id); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java new file mode 100644 index 0000000..bd4e9ab --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java @@ -0,0 +1,166 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import org.htrace.Trace; + +/** + * A completion service for the RpcRetryingCallerFactory. + * Keeps the list of the futures, and allows to cancel them all. + * This means as well that it can be used for a small set of tasks only. + *
Implementation is not Thread safe. + */ +@InterfaceAudience.Private +public class ResultBoundedCompletionService { + private final RpcRetryingCallerFactory retryingCallerFactory; + private final Executor executor; + private final QueueingFuture[] tasks; // all the tasks + private volatile QueueingFuture completed = null; + + class QueueingFuture implements RunnableFuture { + private final CancellableCallable future; + private T result = null; + private ExecutionException exeEx = null; + private volatile boolean canceled; + private final int callTimeout; + private final RpcRetryingCaller retryingCaller; + private boolean resultObtained = false; + + + public QueueingFuture(CancellableCallable future, int callTimeout) { + this.future = future; + this.callTimeout = callTimeout; + this.retryingCaller = retryingCallerFactory.newCaller(); + } + + @SuppressWarnings("unchecked") + @Override + public void run() { + try { + if (!canceled) { + result = + this.retryingCaller.callWithRetries(future, callTimeout); + resultObtained = true; + } + } catch (Throwable t) { + exeEx = new ExecutionException(t); + } finally { + if (!canceled && completed == null) { + completed = (QueueingFuture) QueueingFuture.this; + synchronized (tasks) { + tasks.notify(); + } + } + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (resultObtained || exeEx != null) return false; + retryingCaller.cancel(); + future.startCancel(); + canceled = true; + return true; + } + + @Override + public boolean isCancelled() { + return canceled; + } + + @Override + public boolean isDone() { + return resultObtained || exeEx != null; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + try { + return get(1000, TimeUnit.DAYS); + } catch (TimeoutException e) { + throw new RuntimeException("You did wait for 1000 days here?", e); + } + } + + @Override + public T get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + synchronized (tasks) { + if (resultObtained) { + return result; + } + if (exeEx != null) { + throw exeEx; + } + unit.timedWait(tasks, timeout); + } + if (resultObtained) { + return result; + } + if (exeEx != null) { + throw exeEx; + } + + throw new TimeoutException("timeout=" + timeout + ", " + unit); + } + } + + @SuppressWarnings("unchecked") + public ResultBoundedCompletionService( + RpcRetryingCallerFactory retryingCallerFactory, Executor executor, + int maxTasks) { + this.retryingCallerFactory = retryingCallerFactory; + this.executor = executor; + this.tasks = new QueueingFuture[maxTasks]; + } + + + public void submit(CancellableCallable task, int callTimeout, int id) { + QueueingFuture newFuture = new QueueingFuture(task, callTimeout); + executor.execute(Trace.wrap(newFuture)); + tasks[id] = newFuture; + } + + public QueueingFuture take() throws InterruptedException { + synchronized (tasks) { + while (completed == null) tasks.wait(); + } + return completed; + } + + public QueueingFuture poll(long timeout, TimeUnit unit) throws InterruptedException { + synchronized (tasks) { + if (completed == null) unit.timedWait(tasks, timeout); + } + return completed; + } + + public void cancelAll() { + for (QueueingFuture future : tasks) { + if (future != null) future.cancel(true); + } + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 57accce..c7244d9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -27,12 +27,9 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -53,7 +50,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import com.google.protobuf.ServiceException; -import org.htrace.Trace; /** * Caller that goes to replica if the primary region does no answer within a configurable @@ -74,7 +70,7 @@ public class RpcRetryingCallerWithReadReplicas { private final int callTimeout; private final int retries; private final RpcControllerFactory rpcControllerFactory; - private final RpcRetryingCallerFactory rpcRetryingCallerFactory; + final RpcRetryingCallerFactory rpcRetryingCallerFactory; public RpcRetryingCallerWithReadReplicas( RpcControllerFactory rpcControllerFactory, TableName tableName, @@ -99,9 +95,10 @@ public class RpcRetryingCallerWithReadReplicas { * - we need to stop retrying when the call is completed * - we can be interrupted */ - class ReplicaRegionServerCallable extends RegionServerCallable { + class ReplicaRegionServerCallable extends RegionServerCallable + implements CancellableCallable { final int id; - private final PayloadCarryingRpcController controller; + protected final PayloadCarryingRpcController controller; public ReplicaRegionServerCallable(int id, HRegionLocation location) { super(RpcRetryingCallerWithReadReplicas.this.cConnection, @@ -112,6 +109,7 @@ public class RpcRetryingCallerWithReadReplicas { controller.setPriority(tableName); } + @Override public void startCancel() { controller.startCancel(); } @@ -194,7 +192,8 @@ public class RpcRetryingCallerWithReadReplicas { RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId() : RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow()); - ResultBoundedCompletionService cs = new ResultBoundedCompletionService(pool, rl.size()); + ResultBoundedCompletionService cs = + new ResultBoundedCompletionService(this.rpcRetryingCallerFactory, pool, rl.size()); if(isTargetReplicaSpecified) { addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId()); @@ -273,12 +272,12 @@ public class RpcRetryingCallerWithReadReplicas { * @param min - the id of the first replica, inclusive * @param max - the id of the last replica, inclusive. */ - private void addCallsForReplica(ResultBoundedCompletionService cs, + private void addCallsForReplica(ResultBoundedCompletionService cs, RegionLocations rl, int min, int max) { for (int id = min; id <= max; id++) { HRegionLocation hrl = rl.getRegionLocation(id); ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl); - cs.submit(callOnReplica, callTimeout); + cs.submit(callOnReplica, callTimeout, id); } } @@ -308,137 +307,4 @@ public class RpcRetryingCallerWithReadReplicas { return rl; } - - - /** - * A completion service for the RpcRetryingCallerFactory. - * Keeps the list of the futures, and allows to cancel them all. - * This means as well that it can be used for a small set of tasks only. - *
Implementation is not Thread safe. - */ - public class ResultBoundedCompletionService { - private final Executor executor; - private final QueueingFuture[] tasks; // all the tasks - private volatile QueueingFuture completed = null; - - class QueueingFuture implements RunnableFuture { - private final ReplicaRegionServerCallable future; - private Result result = null; - private ExecutionException exeEx = null; - private volatile boolean canceled; - private final int callTimeout; - private final RpcRetryingCaller retryingCaller; - - - public QueueingFuture(ReplicaRegionServerCallable future, int callTimeout) { - this.future = future; - this.callTimeout = callTimeout; - this.retryingCaller = rpcRetryingCallerFactory.newCaller(); - } - - @Override - public void run() { - try { - if (!canceled) { - result = - rpcRetryingCallerFactory.newCaller().callWithRetries(future, callTimeout); - } - } catch (Throwable t) { - exeEx = new ExecutionException(t); - } finally { - if (!canceled && completed == null) { - completed = QueueingFuture.this; - synchronized (tasks) { - tasks.notify(); - } - } - } - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - if (result != null || exeEx != null) return false; - retryingCaller.cancel(); - future.startCancel(); - canceled = true; - return true; - } - - @Override - public boolean isCancelled() { - return canceled; - } - - @Override - public boolean isDone() { - return result != null || exeEx != null; - } - - @Override - public Result get() throws InterruptedException, ExecutionException { - try { - return get(1000, TimeUnit.DAYS); - } catch (TimeoutException e) { - throw new RuntimeException("You did wait for 1000 days here?", e); - } - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE", - justification="Is this an issue?") - @Override - public Result get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - synchronized (tasks) { - if (result != null) { - return result; - } - if (exeEx != null) { - throw exeEx; - } - unit.timedWait(tasks, timeout); - } - // Findbugs says this null check is redundant. Will result be set across the wait above? - if (result != null) { - return result; - } - if (exeEx != null) { - throw exeEx; - } - - throw new TimeoutException("timeout=" + timeout + ", " + unit); - } - } - - public ResultBoundedCompletionService(Executor executor, int maxTasks) { - this.executor = executor; - this.tasks = new QueueingFuture[maxTasks]; - } - - - public void submit(ReplicaRegionServerCallable task, int callTimeout) { - QueueingFuture newFuture = new QueueingFuture(task, callTimeout); - executor.execute(Trace.wrap(newFuture)); - tasks[task.id] = newFuture; - } - - public QueueingFuture take() throws InterruptedException { - synchronized (tasks) { - while (completed == null) tasks.wait(); - } - return completed; - } - - public QueueingFuture poll(long timeout, TimeUnit unit) throws InterruptedException { - synchronized (tasks) { - if (completed == null) unit.timedWait(tasks, timeout); - } - return completed; - } - - public void cancelAll() { - for (QueueingFuture future : tasks) { - if (future != null) future.cancel(true); - } - } - } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 0aecef2..c9f2a71 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -88,6 +88,7 @@ public class ScannerCallable extends RegionServerCallable { protected boolean isRegionServerRemote = true; private long nextCallSeq = 0; protected RpcControllerFactory controllerFactory; + protected PayloadCarryingRpcController controller; /** * @param connection which connection @@ -123,6 +124,10 @@ public class ScannerCallable extends RegionServerCallable { this.controllerFactory = rpcControllerFactory; } + public PayloadCarryingRpcController getController() { + return controller; + } + /** * @param reload force reload of server location * @throws IOException @@ -191,7 +196,7 @@ public class ScannerCallable extends RegionServerCallable { incRPCcallsMetrics(); request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq); ScanResponse response = null; - PayloadCarryingRpcController controller = controllerFactory.newController(); + controller = controllerFactory.newController(); controller.setPriority(getTableName()); controller.setCallTimeout(callTimeout); try { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 0de658b..318d757 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.BoundedCompletionService; import org.apache.hadoop.hbase.util.Pair; /** * This class has the logic for handling scanners for regions with and without replicas. @@ -70,7 +69,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { private int scannerTimeout; private Set outstandingCallables = new HashSet(); - public ScannerCallableWithReplicas (TableName tableName, ClusterConnection cConnection, + public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, int retries, int scannerTimeout, int caching, Configuration conf, RpcRetryingCaller caller) { @@ -134,8 +133,10 @@ class ScannerCallableWithReplicas implements RetryingCallable { // allocate a boundedcompletion pool of some multiple of number of replicas. // We want to accomodate some RPCs for redundant replica scans (but are still in progress) - BoundedCompletionService> cs = - new BoundedCompletionService>(pool, rl.size() * 5); + ResultBoundedCompletionService> cs = + new ResultBoundedCompletionService>( + new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf), pool, + rl.size() * 5); List exceptions = null; int submitted = 0, completed = 0; @@ -192,7 +193,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { } finally { // We get there because we were interrupted or because one or more of the // calls succeeded or failed. In all case, we stop all our tasks. - cs.cancelAll(true); + cs.cancelAll(); } if (exceptions != null && !exceptions.isEmpty()) { @@ -226,8 +227,14 @@ class ScannerCallableWithReplicas implements RetryingCallable { // want to wait for the "close" to happen yet. The "wait" will happen when // the table is closed (when the awaitTermination of the underlying pool is called) s.setClose(); - RetryingRPC r = new RetryingRPC(s); - pool.submit(r); + final RetryingRPC r = new RetryingRPC(s); + pool.submit(new Callable(){ + @Override + public Void call() throws Exception { + r.call(scannerTimeout); + return null; + } + }); } // now clear outstandingCallables since we scheduled a close for all the contained scanners outstandingCallables.clear(); @@ -244,16 +251,16 @@ class ScannerCallableWithReplicas implements RetryingCallable { } private int addCallsForCurrentReplica( - BoundedCompletionService> cs, RegionLocations rl) { + ResultBoundedCompletionService> cs, RegionLocations rl) { RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); outstandingCallables.add(currentScannerCallable); - cs.submit(retryingOnReplica); + cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id); return 1; } private int addCallsForOtherReplicas( - BoundedCompletionService> cs, RegionLocations rl, int min, - int max) { + ResultBoundedCompletionService> cs, RegionLocations rl, + int min, int max) { if (scan.getConsistency() == Consistency.STRONG) { return 0; // not scheduling on other replicas for strong consistency } @@ -267,32 +274,74 @@ class ScannerCallableWithReplicas implements RetryingCallable { } outstandingCallables.add(s); RetryingRPC retryingOnReplica = new RetryingRPC(s); - cs.submit(retryingOnReplica); + cs.submit(retryingOnReplica, scannerTimeout, id); } return max - min + 1; } - class RetryingRPC implements Callable> { + class RetryingRPC implements CancellableCallable> { final ScannerCallable callable; + RpcRetryingCaller caller; + private volatile boolean canceled = false; RetryingRPC(ScannerCallable callable) { this.callable = callable; - } - - @Override - public Pair call() throws IOException { // For the Consistency.STRONG (default case), we reuse the caller // to keep compatibility with what is done in the past // For the Consistency.TIMELINE case, we can't reuse the caller // since we could be making parallel RPCs (caller.callWithRetries is synchronized // and we can't invoke it multiple times at the same time) - RpcRetryingCaller caller = ScannerCallableWithReplicas.this.caller; + this.caller = ScannerCallableWithReplicas.this.caller; if (scan.getConsistency() == Consistency.TIMELINE) { - caller = new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf). + this.caller = new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf). newCaller(); } - Result[] res = caller.callWithRetries(callable, scannerTimeout); - return new Pair(res, callable); + } + + @Override + public Pair call(int callTimeout) throws IOException { + // since the retries is done within the ResultBoundedCompletionService, + // we don't invoke callWithRetries here + if (canceled) { + return null; + } + Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout); + return new Pair(res, this.callable); + } + + @Override + public void prepare(boolean reload) throws IOException { + if (canceled) return; + + if (Thread.interrupted()) { + throw new InterruptedIOException(); + } + + callable.prepare(reload); + } + + @Override + public void throwable(Throwable t, boolean retrying) { + callable.throwable(t, retrying); + } + + @Override + public String getExceptionMessageAdditionalDetail() { + return callable.getExceptionMessageAdditionalDetail(); + } + + @Override + public long sleep(long pause, int tries) { + return callable.sleep(pause, tries); + } + + @Override + public void startCancel() { + canceled = true; + caller.cancel(); + if (callable.getController() != null) { + callable.getController().startCancel(); + } } }