Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestController.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestController.java (revision a172b480fe06c6dec6bf6b83c4434ead3a1207d0) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestController.java (date 1561568629136) @@ -118,4 +118,23 @@ */ void waitForFreeSlot(long id, int periodToTrigger, Consumer trigger) throws InterruptedIOException; + + /** + * Wait until there is at least size slot for size new tasks. + * @param size the number of requested slots + * @param id the caller's id + * @param periodToTrigger The period to invoke the trigger. This value is a + * hint. The real period depends on the implementation. + * @param trigger The object to call periodically. + * @throws java.io.InterruptedIOException If the waiting is interrupted + */ + void waitForFreeSlot(int size, long id, int periodToTrigger, Consumer trigger) throws InterruptedIOException; + + /** + * Wait until all slots are free. + * @param id the caller's id + * @throws java.io.InterruptedIOException If the waiting is interrupted + */ + void waitForAllFreeSlot(long id) throws InterruptedIOException; + } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java (revision a172b480fe06c6dec6bf6b83c4434ead3a1207d0) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java (date 1561568630082) @@ -140,8 +140,6 @@ private final AtomicInteger callIdCnt = new AtomicInteger(0); - private final ScheduledFuture cleanupIdleConnectionTask; - private int maxConcurrentCallsPerServer; private static final LoadingCache concurrentCounterCache = @@ -188,14 +186,6 @@ this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf)); - this.cleanupIdleConnectionTask = IDLE_CONN_SWEEPER.scheduleAtFixedRate(new Runnable() { - - @Override - public void run() { - cleanupIdleConnections(); - } - }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS); - if (LOG.isDebugEnabled()) { LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive=" + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO @@ -450,6 +440,7 @@ + connection.remoteId); connections.removeValue(remoteId, connection); connection.shutdown(); + connection.cleanupConnection(); } } } @@ -491,13 +482,13 @@ connToClose = connections.values(); connections.clear(); } - cleanupIdleConnectionTask.cancel(true); for (T conn : connToClose) { - conn.shutdown(); + // conn may be null in case of cancellation + if (conn != null) conn.shutdown(); } closeInternal(); for (T conn : connToClose) { - conn.cleanupConnection(); + if (conn != null) conn.cleanupConnection(); } } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java (revision a172b480fe06c6dec6bf6b83c4434ead3a1207d0) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java (date 1561568629143) @@ -149,6 +149,15 @@ if (error instanceof FallbackDisallowedException) { return; } + // do not schedule anything if not needed... + boolean isOverKrb = false; + try { + isOverKrb = shouldAuthenticateOverKrb(); + } catch (IOException e) { + return; + } + if (!isOverKrb) + return; synchronized (this) { if (reloginInProgress) { return; @@ -159,9 +168,7 @@ @Override public void run() { try { - if (shouldAuthenticateOverKrb()) { - relogin(); - } + relogin(); } catch (IOException e) { LOG.warn("Relogin failed", e); } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java (revision a172b480fe06c6dec6bf6b83c4434ead3a1207d0) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java (date 1561569174887) @@ -186,6 +186,10 @@ private final int numAttempt; private final ServerName server; private final Set callsInProgress; + private long firstStartNanoTime; + private long startNanoTime; + private long elapseNanoTime; + private long numReject = -1; @VisibleForTesting SingleServerRequestRunnable( MultiAction multiAction, int numAttempt, ServerName server, @@ -248,6 +252,30 @@ } } } + + public void onStart() { + ++numReject; + startNanoTime = System.nanoTime(); + if (numReject == 0) { + firstStartNanoTime = startNanoTime; + } + } + + public void onFinish() { + elapseNanoTime = System.nanoTime() - startNanoTime; + } + + public long getElapseNanoTime() { + return elapseNanoTime; + } + + public long getNumReject() { + return numReject; + } + + public long getRejectedElapseNanoTime() { + return startNanoTime - firstStartNanoTime; + } } private final Batch.Callback callback; @@ -518,7 +546,8 @@ * @param numAttempt the attempt number. * @param actionsForReplicaThread original actions for replica thread; null on non-first call. */ - void sendMultiAction(Map actionsByServer, + // Must be synchronized because of the background thread writeBufferPeriodicFlushTimer + synchronized void sendMultiAction(Map actionsByServer, int numAttempt, List actionsForReplicaThread, boolean reuseThread) { // Run the last item on the same thread if we are already on a send thread. // We hope most of the time it will be the only item, so we can cut down on threads. @@ -543,22 +572,36 @@ && numAttempt % HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER != 0) { runnable.run(); } else { - try { - pool.submit(runnable); - } catch (Throwable t) { - if (t instanceof RejectedExecutionException) { - // This should never happen. But as the pool is provided by the end user, - // let's secure this a little. - LOG.warn("id=" + asyncProcess.id + ", task rejected by pool. Unexpected." + - " Server=" + server.getServerName(), t); - } else { - // see #HBASE-14359 for more details - LOG.warn("Caught unexpected exception/error: ", t); - } - asyncProcess.decTaskCounters(multiAction.getRegions(), server); - // We're likely to fail again, but this will increment the attempt counter, - // so it will finish. - receiveGlobalFailure(multiAction, server, numAttempt, t); + boolean completed = false; + int nbTry = 0; + while(!completed) { + try { + ++nbTry; + pool.submit(runnable); + completed = true; + } catch (Throwable t) { + if (t instanceof RejectedExecutionException) { + if ((nbTry % 1000) == 0) { + LOG.warn("#" + asyncProcess.id + ", the task was rejected by the pool. This is unexpected." + + " Server is " + server.getServerName() + " (try " + nbTry + ")", t); + } else if (LOG.isDebugEnabled()) { + LOG.debug("#" + asyncProcess.id + ", the task was rejected by the pool. This is unexpected." + + " Server is " + server.getServerName() + " (try " + nbTry + ")", t); + } + try { + Thread.sleep(10); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } else { + // see #HBASE-14359 for more details + LOG.warn("Caught unexpected exception/error: ", t); + } + asyncProcess.decTaskCounters(multiAction.getRegions(), server); + // We're likely to fail again, but this will increment the attempt counter, + // so it will finish. + receiveGlobalFailure(multiAction, server, numAttempt, t); + } } } } @@ -1167,6 +1210,10 @@ return error.toString(); } + public boolean isFinished() { + return actionsInProgress.get() == 0; + } + @Override public void waitUntilDone() throws InterruptedIOException { try { Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java (revision a172b480fe06c6dec6bf6b83c4434ead3a1207d0) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java (date 1561568630094) @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; @@ -93,6 +94,8 @@ = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); @VisibleForTesting final ConcurrentMap taskCounterPerServer = new ConcurrentHashMap<>(); + + final ReentrantLock lock = new ReentrantLock(); /** * The number of tasks simultaneously executed on the cluster. */ @@ -244,11 +247,26 @@ public void incTaskCounters(Collection regions, ServerName sn) { tasksInProgress.incrementAndGet(); - computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet(); + incrementMapForKey(taskCounterPerServer,sn); - regions.forEach((regBytes) - -> computeIfAbsent(taskCounterPerRegion, regBytes, AtomicInteger::new).incrementAndGet() - ); + regions.forEach((regBytes) -> incrementMapForKey(taskCounterPerRegion,regBytes)); + } + + private void incrementMapForKey(Map collection, K key) { + if (collection.containsKey(key)) { + collection.get(key).incrementAndGet(); + } else { + lock.lock(); + try { + if (collection.containsKey(key)) { + collection.get(key).incrementAndGet(); + } else { + collection.put(key, new AtomicInteger(1)); + } + } finally { + lock.unlock(); + } + } } @Override @@ -328,6 +346,16 @@ waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, id, periodToTrigger, trigger); } + @Override + public void waitForFreeSlot(int numberOfTask,long id, int periodToTrigger, Consumer trigger) throws InterruptedIOException { + waitForMaximumCurrentTasks(maxTotalConcurrentTasks - numberOfTask, id, periodToTrigger, trigger); + } + + @Override + public void waitForAllFreeSlot(long id) throws InterruptedIOException { + waitForMaximumCurrentTasks(0, id, 1000000, null); + } + /** * limit the heapsize of total submitted data. Reduce the limit of heapsize * for submitting quickly if there is no running task. Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorThreadPoolExecutor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorThreadPoolExecutor.java (date 1561568629131) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorThreadPoolExecutor.java (date 1561568629131) @@ -0,0 +1,197 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Threads; + +import java.lang.reflect.Field; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + +@SuppressWarnings("WeakerAccess") +public class BufferedMutatorThreadPoolExecutor extends ThreadPoolExecutor { + + private final Object _lock = new Object(); + + private Map ssrMap; + + private long ssrCacheMaxStored = 0; + private AtomicLong ssrCachePutCount = new AtomicLong(0); + private long ssrCacheRemoveCount = 0; + + private long totalExecutedTaskCount = 0; + private long totalRejectedTaskCount = 0; + private long totalExecutedNanoTime = 0; + private long totalRejectedNanoTime = 0; + private AtomicLong totalMissingTaskCount = new AtomicLong(0); + + public BufferedMutatorThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue workQueue, final ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + ssrMap = new ConcurrentHashMap<>(this.getMaximumPoolSize()); + } + + public static BufferedMutatorThreadPoolExecutor getPoolExecutor(Configuration conf) { + int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); + if (maxThreads == 0) { + maxThreads = conf.getInt("hbase.client.max.total.tasks", Integer.MAX_VALUE); + } + if (maxThreads == 0) { + throw new IllegalArgumentException("hbase.client.max.total.tasks must be >0"); + } + long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60); + + // Using the "direct handoff" approach, new threads will only be created + // if it is necessary and will grow unbounded. This could be bad but in HCM + // we only create as many Runnables as there are region servers. It means + // it also scales when new region servers are added. + BufferedMutatorThreadPoolExecutor pool = new BufferedMutatorThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS, + new SynchronousQueue<>(), Threads.newDaemonThreadFactory("htable")); + pool.allowCoreThreadTimeOut(true); + return pool; + } + + @Override + public Future submit(final Runnable r) { + if (r instanceof AsyncRequestFutureImpl.SingleServerRequestRunnable) { + final AsyncRequestFutureImpl.SingleServerRequestRunnable task = ((AsyncRequestFutureImpl.SingleServerRequestRunnable) r); + task.onStart(); + } + return super.submit(r); + } + + @Override + protected void beforeExecute(final Thread t, final Runnable r) { + // if t is null, this means we will get a RejectedExecutionException, and NEVER be called in afterExecute + // To avoid a memory leak, add in the map only if t is NOT null! + if (t != null && r instanceof Future) { + final Future f = (Future) r; + final Object o = TaskDiscoverer.findRealTask(r); + if (o instanceof AsyncRequestFutureImpl.SingleServerRequestRunnable) { + final AsyncRequestFutureImpl.SingleServerRequestRunnable task = ((AsyncRequestFutureImpl.SingleServerRequestRunnable) o); + ssrMap.put(t, task); + ssrCachePutCount.incrementAndGet(); + } + } + super.beforeExecute(t, r); + } + + @Override + protected void afterExecute(final Runnable r, final Throwable t) { + super.afterExecute(r, t); + + final Object o = ssrMap.remove(Thread.currentThread()); + if (o instanceof AsyncRequestFutureImpl.SingleServerRequestRunnable) { + final AsyncRequestFutureImpl.SingleServerRequestRunnable task = ((AsyncRequestFutureImpl.SingleServerRequestRunnable) o); + task.onFinish(); + synchronized (_lock) { + ++ssrCacheRemoveCount; + ssrCacheMaxStored = Math.max(ssrCacheMaxStored, ssrCachePutCount.get() - ssrCacheRemoveCount); + ++totalExecutedTaskCount; + totalExecutedNanoTime += task.getElapseNanoTime(); + totalRejectedTaskCount += task.getNumReject(); + totalRejectedNanoTime += task.getRejectedElapseNanoTime(); + } + } else { + totalMissingTaskCount.incrementAndGet(); + } + + } + + public long getAndResetSsrCacheMaxStored() { + long ret = ssrCacheMaxStored; + ssrCacheMaxStored = 0; + ssrCachePutCount.set(0); + ssrCacheRemoveCount = 0; + return ret; + } + + public long getAvgRejectedNanoTime() { + if (totalRejectedTaskCount == 0) { + return 0; + } + return totalRejectedNanoTime / totalRejectedTaskCount; + } + + public long getTotalMissingTaskCount() { + return totalMissingTaskCount.get(); + } + + public long getAvgExecutedNanoTime() { + if (totalExecutedTaskCount == 0) { + return 0; + } + return totalExecutedNanoTime / totalExecutedTaskCount; + } + + public long getTotalExecutedTaskCount() { + return totalExecutedTaskCount; + } + + public long getTotalExecutedNanoTime() { + return totalExecutedNanoTime; + } + + public long getTotalRejectedTaskCount() { + return totalRejectedTaskCount; + } + + public long getTotalRejectedNanoTime() { + return totalRejectedNanoTime; + } + + public static class TaskDiscoverer { + + private final static Field callableInFutureTask; + private static final Class adapterClass; + private static final Field runnableInAdapter; + + static { + try { + callableInFutureTask = FutureTask.class.getDeclaredField("callable"); + callableInFutureTask.setAccessible(true); + adapterClass = Executors.callable(() -> { + }).getClass(); + runnableInAdapter = adapterClass.getDeclaredField("task"); + runnableInAdapter.setAccessible(true); + } catch (NoSuchFieldException e) { + throw new ExceptionInInitializerError(e); + } + } + + public static Object findRealTask(final Runnable task) { + if (task instanceof FutureTask) { + try { + Object callable = callableInFutureTask.get(task); + if (adapterClass.isInstance(callable)) { + return runnableInAdapter.get(callable); + } else { + return callable; + } + } catch (IllegalAccessException e) { + throw new IllegalStateException(e); + } + } + throw new ClassCastException("Not a FutureTask"); + } + } + +} Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java (revision a172b480fe06c6dec6bf6b83c4434ead3a1207d0) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java (date 1561568629126) @@ -24,13 +24,7 @@ import java.io.IOException; import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -255,12 +249,13 @@ RequestController.Checker checker = requestController.newChecker(); boolean firstIter = true; do { - // Wait until there is at least one slot for a new task. - requestController.waitForFreeSlot(id, periodToLog, getLogger(tableName, -1)); int posInList = -1; if (!firstIter) { checker.reset(); } + + Set retainedServers = new TreeSet<>(); + Iterator it = rows.iterator(); while (it.hasNext()) { Row r = it.next(); @@ -309,10 +304,15 @@ // TODO: replica-get is not supported on this path byte[] regionName = loc.getRegionInfo().getRegionName(); addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); + retainedServers.add(loc.getServerName()); it.remove(); } } firstIter = false; + + // Wait until there is at least one slot per server + requestController.waitForFreeSlot(retainedServers.size(),id, periodToLog, getLogger(tableName, -1)); + } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null)); if (retainedActions.isEmpty()) return NO_REQS_RESULT; @@ -321,6 +321,10 @@ locationErrors, locationErrorRows, actionsByServer); } + public void waitAllSlot() throws InterruptedIOException { + requestController.waitForAllFreeSlot(id); + } + AsyncRequestFuture submitMultiActions(AsyncProcessTask task, List retainedActions, long nonceGroup, List locationErrors, List locationErrorRows, Map actionsByServer) { Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java (revision a172b480fe06c6dec6bf6b83c4434ead3a1207d0) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java (date 1561568630105) @@ -32,6 +32,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -93,6 +95,9 @@ private final boolean cleanupPoolOnClose; private volatile boolean closed = false; private final AsyncProcess ap; + private List asfList; + private int maxThreads; + private ReentrantLock lock = new ReentrantLock(); @VisibleForTesting BufferedMutatorImpl(ClusterConnection conn, BufferedMutatorParams params, AsyncProcess ap) { @@ -109,6 +114,11 @@ this.pool = params.getPool(); cleanupPoolOnClose = false; } + maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); + if (maxThreads <= 0) { + maxThreads = conf.getInt("hbase.client.max.total.tasks", Integer.MAX_VALUE); + } + asfList = new ArrayList(maxThreads*4); ConnectionConfiguration tableConf = new ConnectionConfiguration(conf); this.writeBufferSize = params.getWriteBufferSize() != UNSET ? @@ -235,6 +245,13 @@ } // Stop any running Periodic Flush timer. disableWriteBufferPeriodicFlush(); + ap.waitAllSlot(); + try { + // Let time to the periodic flush thread to exit (task are finished, but not the code after) + Thread.sleep(5); + } catch (InterruptedException e) { + throw new IOException(e); + } try { // As we can have an operation in progress even if the buffer is empty, we call // doFlush at least one time. @@ -276,8 +293,19 @@ @Override public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsException { - checkClose(); - doFlush(true); + // This will avoid concurrency between period flush, flush() and close() + // mutate are not synchronized, because it use doFlush(false) + if (writeBufferPeriodicFlushTimer != null) { + lock.lock(); + } + try { + checkClose(); + doFlush(true); + } finally { + if (writeBufferPeriodicFlushTimer != null) { + lock.unlock(); + } + } } /** @@ -302,11 +330,52 @@ } asf = ap.submit(createTask(access)); } - // DON'T do the wait in the try-with-resources. Otherwise, the undealt mutations won't - // be released. - asf.waitUntilDone(); - if (asf.hasError()) { - errors.add(asf.getErrors()); + + if (flushAll) { + // if we have setWriteBufferPeriodicFlushTimeoutMs we may have concurrent update + List waitList; + synchronized(asfList) { + waitList = new ArrayList<>(asfList); + } + // DON'T do the wait in the try-with-resources. Otherwise, the undealt mutations won't + // be released. + for(AsyncRequestFuture toWait:waitList) { + toWait.waitUntilDone(); + errors.add(toWait.getErrors()); + } + synchronized(asfList) { + asfList.removeAll(waitList); + } + asf.waitUntilDone(); + if (asf.hasError()) { + errors.add(asf.getErrors()); + } + } else { + // Do some cleanup in asfList to decrease memory + int nbRemoved = 0; + while (asfList.size() >= maxThreads*4) { + synchronized(asfList) { + Iterator it = asfList.iterator(); + while (it.hasNext()) { + AsyncRequestFutureImpl toCheck = (AsyncRequestFutureImpl) it.next(); + if (toCheck.isFinished()) { + it.remove(); + nbRemoved++; + } + } + if (nbRemoved == 0) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } + } + } + } + synchronized(asfList) { + // Add the asf in the list + asfList.add(asf); + } } }