diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java index 50c3d2c..50b0fb2 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java @@ -21,7 +21,8 @@ package org.apache.hadoop.hbase.client; import java.util.ArrayList; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -49,7 +50,7 @@ import org.apache.htrace.Trace; public class ResultBoundedCompletionService { private static final Log LOG = LogFactory.getLog(ResultBoundedCompletionService.class); private final RpcRetryingCallerFactory retryingCallerFactory; - private final Executor executor; + private final ExecutorService executorService; private final QueueingFuture[] tasks; // all the tasks private final ArrayList completedTasks; // completed tasks private volatile boolean cancelled = false; @@ -63,6 +64,7 @@ public class ResultBoundedCompletionService { private final RpcRetryingCaller retryingCaller; private boolean resultObtained = false; private final int replicaId; // replica id + private Future taskFuture; public QueueingFuture(RetryingCallable future, int callTimeout, int id) { @@ -72,6 +74,14 @@ public class ResultBoundedCompletionService { this.replicaId = id; } + public Future getTaskFuture() { + return taskFuture; + } + + public void setTaskFuture(final Future taskFuture) { + this.taskFuture = taskFuture; + } + @SuppressWarnings("unchecked") @Override public void run() { @@ -101,6 +111,11 @@ public class ResultBoundedCompletionService { if (resultObtained || exeEx != null) return false; retryingCaller.cancel(); if (future instanceof Cancellable) ((Cancellable)future).cancel(); + + // Interrupt the working thread and free it to the pool + if (taskFuture != null) { + taskFuture.cancel(true); + } cancelled = true; return true; } @@ -157,10 +172,10 @@ public class ResultBoundedCompletionService { @SuppressWarnings("unchecked") public ResultBoundedCompletionService( - RpcRetryingCallerFactory retryingCallerFactory, Executor executor, + RpcRetryingCallerFactory retryingCallerFactory, ExecutorService executorService, int maxTasks) { this.retryingCallerFactory = retryingCallerFactory; - this.executor = executor; + this.executorService = executorService; this.tasks = new QueueingFuture[maxTasks]; this.completedTasks = new ArrayList<>(maxTasks); } @@ -168,7 +183,7 @@ public class ResultBoundedCompletionService { public void submit(RetryingCallable task, int callTimeout, int id) { QueueingFuture newFuture = new QueueingFuture<>(task, callTimeout, id); - executor.execute(Trace.wrap(newFuture)); + newFuture.setTaskFuture(executorService.submit(Trace.wrap(newFuture))); tasks[id] = newFuture; }