diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 888370c..3e21efb 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -98,6 +98,7 @@ class AsyncProcess { protected final AtomicBoolean hasError = new AtomicBoolean(false); protected final AtomicLong tasksSent = new AtomicLong(0); protected final AtomicLong tasksDone = new AtomicLong(0); + protected final AtomicLong tasksProcessing = new AtomicLong(0); protected final ConcurrentMap taskCounterPerRegion = new ConcurrentHashMap(); protected final ConcurrentMap taskCounterPerServer = @@ -479,25 +480,28 @@ class AsyncProcess { for (Map.Entry> e : actionsByServer.entrySet()) { final HRegionLocation loc = e.getKey(); final MultiAction multiAction = e.getValue(); - incTaskCounters(multiAction.getRegions(), loc.getServerName()); + tasksProcessing.incrementAndGet(); Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() { @Override public void run() { MultiResponse res; try { MultiServerCallable callable = createCallable(loc, multiAction); + incTaskCounters(multiAction.getRegions(), loc.getServerName()); try { res = createCaller(callable).callWithoutRetries(callable); } catch (IOException e) { - LOG.warn("Call to " + loc.getServerName() + " failed numAttempt=" + numAttempt + - ", resubmitting all since not sure where we are at", e); + LOG.warn("Call to " + loc.getServerName() + " failed numAttempt=" + numAttempt + + ", resubmitting all since not sure where we are at", e); resubmitAll(initialActions, multiAction, loc, numAttempt + 1, e, errorsByServer); return; + } finally { + decTaskCounters(multiAction.getRegions(), loc.getServerName()); } receiveMultiAction(initialActions, multiAction, loc, res, numAttempt, errorsByServer); } finally { - decTaskCounters(multiAction.getRegions(), loc.getServerName()); + decProcessingTasks(); } } }); @@ -507,7 +511,7 @@ class AsyncProcess { } catch (RejectedExecutionException ree) { // This should never happen. But as the pool is provided by the end user, let's secure // this a little. - decTaskCounters(multiAction.getRegions(), loc.getServerName()); + decProcessingTasks(); LOG.warn("The task was rejected by the pool. This is unexpected." + " Server is " + loc.getServerName(), ree); // We're likely to fail again, but this will increment the attempt counter, so it will @@ -758,9 +762,20 @@ class AsyncProcess { */ public void waitUntilDone() throws InterruptedIOException { waitForMaximumCurrentTasks(0); + try { + while (tasksProcessing.get() > 0) { + synchronized (this.tasksProcessing) { + this.tasksProcessing.wait(100); + } + waitForMaximumCurrentTasks(0); + } + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted in waitUntilDone." + " tasksProcessing=" + + tasksProcessing.get() + ", tasksSent=" + tasksSent.get() + ", tasksDone=" + + tasksDone.get()); + } } - public boolean hasError() { return hasError.get(); } @@ -825,6 +840,16 @@ class AsyncProcess { } /** + * Decrements the counters for tasks in processing + */ + protected void decProcessingTasks() { + tasksProcessing.decrementAndGet(); + synchronized (tasksProcessing) { + tasksProcessing.notifyAll(); + } + } + + /** * Creates the server error tracker to use inside process. * Currently, to preserve the main assumption about current retries, and to work well with * the retry-limit-based calculation, the calculation is local per Process object.