*** src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java Wed Feb 12 21:35:36 2014 --- src/main/java/org/apache/http/nio/pool/AbstractNIOConnPool.java Wed Jul 23 12:09:54 2014 *************** *** 28,51 **** import java.io.IOException; import java.net.SocketAddress; ! import java.util.HashMap; ! import java.util.HashSet; import java.util.Iterator; - import java.util.LinkedList; - import java.util.ListIterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; - import java.util.concurrent.locks.Lock; - import java.util.concurrent.locks.ReentrantLock; import org.apache.http.annotation.ThreadSafe; import org.apache.http.concurrent.BasicFuture; import org.apache.http.concurrent.FutureCallback; import org.apache.http.nio.reactor.ConnectingIOReactor; import org.apache.http.nio.reactor.IOSession; import org.apache.http.nio.reactor.SessionRequest; --- 28,51 ---- import java.io.IOException; import java.net.SocketAddress; ! import java.util.Collections; ! import java.util.Deque; import java.util.Iterator; import java.util.Map; + import java.util.Queue; import java.util.Set; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.http.annotation.ThreadSafe; import org.apache.http.concurrent.BasicFuture; import org.apache.http.concurrent.FutureCallback; + import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; import org.apache.http.nio.reactor.ConnectingIOReactor; import org.apache.http.nio.reactor.IOSession; import org.apache.http.nio.reactor.SessionRequest; *************** *** 74,88 **** private final ConnectingIOReactor ioreactor; private final NIOConnFactory connFactory; private final SocketAddressResolver addressResolver; ! private final SessionRequestCallback sessionRequestCallback; ! private final Map> routeToPool; ! private final LinkedList> leasingRequests; private final Set pending; private final Set leased; ! private final LinkedList available; ! private final ConcurrentLinkedQueue> completedRequests; private final Map maxPerRoute; ! private final Lock lock; private final AtomicBoolean isShutDown; private volatile int defaultMaxPerRoute; --- 74,88 ---- private final ConnectingIOReactor ioreactor; private final NIOConnFactory connFactory; private final SocketAddressResolver addressResolver; ! // private final SessionRequestCallback sessionRequestCallback; ! private final ConcurrentHashMap> routeToPool; ! private final Deque> leasingRequests; private final Set pending; private final Set leased; ! private final Deque available; ! private final Queue> completedRequests; private final Map maxPerRoute; ! // private final Lock lock; private final AtomicBoolean isShutDown; private volatile int defaultMaxPerRoute; *************** *** 116,130 **** } }; ! this.sessionRequestCallback = new InternalSessionRequestCallback(); ! this.routeToPool = new HashMap>(); ! this.leasingRequests = new LinkedList>(); ! this.pending = new HashSet(); ! this.leased = new HashSet(); ! this.available = new LinkedList(); ! this.maxPerRoute = new HashMap(); this.completedRequests = new ConcurrentLinkedQueue>(); ! this.lock = new ReentrantLock(); this.isShutDown = new AtomicBoolean(false); this.defaultMaxPerRoute = defaultMaxPerRoute; this.maxTotal = maxTotal; --- 116,136 ---- } }; ! //this.sessionRequestCallback = new InternalSessionRequestCallback(); ! // this.routeToPool = new HashMap>(); ! // this.leasingRequests = new ConcurrentLinkedQueue>(); ! // this.pending = new HashSet(); ! // this.leased = new HashSet(); ! // this.available = new ConcurrentLinkedQueue(); ! // this.maxPerRoute = new HashMap(); ! this.routeToPool = new ConcurrentHashMap>(); ! this.leasingRequests = new ConcurrentLinkedDeque>(); ! this.pending = Collections.newSetFromMap(new ConcurrentHashMap()) ; //new HashSet(); ! this.leased = Collections.newSetFromMap(new ConcurrentHashMap()) ; // new HashSet(); ! this.available = new ConcurrentLinkedDeque(); ! this.maxPerRoute = new ConcurrentHashMap(); this.completedRequests = new ConcurrentLinkedQueue>(); ! // this.lock = new ReentrantLock(); this.isShutDown = new AtomicBoolean(false); this.defaultMaxPerRoute = defaultMaxPerRoute; this.maxTotal = maxTotal; *************** *** 148,162 **** this.ioreactor = ioreactor; this.connFactory = connFactory; this.addressResolver = addressResolver; ! this.sessionRequestCallback = new InternalSessionRequestCallback(); ! this.routeToPool = new HashMap>(); ! this.leasingRequests = new LinkedList>(); ! this.pending = new HashSet(); ! this.leased = new HashSet(); ! this.available = new LinkedList(); this.completedRequests = new ConcurrentLinkedQueue>(); ! this.maxPerRoute = new HashMap(); ! this.lock = new ReentrantLock(); this.isShutDown = new AtomicBoolean(false); this.defaultMaxPerRoute = defaultMaxPerRoute; this.maxTotal = maxTotal; --- 154,175 ---- this.ioreactor = ioreactor; this.connFactory = connFactory; this.addressResolver = addressResolver; ! //this.sessionRequestCallback = new InternalSessionRequestCallback(); ! this.routeToPool = new ConcurrentHashMap>(); ! this.leasingRequests = new ConcurrentLinkedDeque>(); ! this.pending = Collections.newSetFromMap(new ConcurrentHashMap()) ; //new HashSet(); ! this.leased = Collections.newSetFromMap(new ConcurrentHashMap()) ; // new HashSet(); ! this.available = new ConcurrentLinkedDeque(); ! this.maxPerRoute = new ConcurrentHashMap(); ! ! // this.routeToPool = new HashMap>(); ! // this.leasingRequests = new LinkedList>(); ! // this.pending = new HashSet(); ! // this.leased = new HashSet(); ! // this.available = new LinkedList(); ! // this.maxPerRoute = new HashMap(); this.completedRequests = new ConcurrentLinkedQueue>(); ! // this.lock = new ReentrantLock(); this.isShutDown = new AtomicBoolean(false); this.defaultMaxPerRoute = defaultMaxPerRoute; this.maxTotal = maxTotal; *************** *** 199,206 **** public void shutdown(final long waitMs) throws IOException { if (this.isShutDown.compareAndSet(false, true)) { fireCallbacks(); ! this.lock.lock(); ! try { for (final SessionRequest sessionRequest: this.pending) { sessionRequest.cancel(); } --- 212,219 ---- public void shutdown(final long waitMs) throws IOException { if (this.isShutDown.compareAndSet(false, true)) { fireCallbacks(); ! // this.lock.lock(); ! // try { for (final SessionRequest sessionRequest: this.pending) { sessionRequest.cancel(); } *************** *** 211,227 **** entry.close(); } for (final RouteSpecificPool pool: this.routeToPool.values()) { pool.shutdown(); } this.routeToPool.clear(); this.leased.clear(); this.pending.clear(); this.available.clear(); this.leasingRequests.clear(); this.ioreactor.shutdown(waitMs); ! } finally { ! this.lock.unlock(); ! } } } --- 224,242 ---- entry.close(); } for (final RouteSpecificPool pool: this.routeToPool.values()) { + if (pool != null) { pool.shutdown(); } + } this.routeToPool.clear(); this.leased.clear(); this.pending.clear(); this.available.clear(); this.leasingRequests.clear(); this.ioreactor.shutdown(waitMs); ! // } finally { ! // this.lock.unlock(); ! // } } } *************** *** 236,242 **** } }; ! this.routeToPool.put(route, pool); } return pool; } --- 251,260 ---- } }; ! final RouteSpecificPool existing = this.routeToPool.putIfAbsent(route, pool); ! if (existing != null) { ! pool = existing ; ! } } return pool; } *************** *** 259,266 **** Args.notNull(tunit, "Time unit"); Asserts.check(!this.isShutDown.get(), "Connection pool shut down"); final BasicFuture future = new BasicFuture(callback); ! this.lock.lock(); ! try { final long timeout = connectTimeout > 0 ? tunit.toMillis(connectTimeout) : 0; final LeaseRequest request = new LeaseRequest(route, state, timeout, leaseTimeout, future); final boolean completed = processPendingRequest(request); --- 277,284 ---- Args.notNull(tunit, "Time unit"); Asserts.check(!this.isShutDown.get(), "Connection pool shut down"); final BasicFuture future = new BasicFuture(callback); ! // this.lock.lock(); ! // try { final long timeout = connectTimeout > 0 ? tunit.toMillis(connectTimeout) : 0; final LeaseRequest request = new LeaseRequest(route, state, timeout, leaseTimeout, future); final boolean completed = processPendingRequest(request); *************** *** 270,278 **** if (request.isDone()) { this.completedRequests.add(request); } ! } finally { ! this.lock.unlock(); ! } fireCallbacks(); return future; } --- 288,296 ---- if (request.isDone()) { this.completedRequests.add(request); } ! // } finally { ! // this.lock.unlock(); ! // } fireCallbacks(); return future; } *************** *** 292,318 **** if (this.isShutDown.get()) { return; } ! this.lock.lock(); ! try { if (this.leased.remove(entry)) { final RouteSpecificPool pool = getPool(entry.getRoute()); pool.free(entry, reusable); if (reusable) { ! this.available.addFirst(entry); onRelease(entry); } else { entry.close(); } processNextPendingRequest(); } ! } finally { ! this.lock.unlock(); ! } fireCallbacks(); } private void processPendingRequests() { ! final ListIterator> it = this.leasingRequests.listIterator(); while (it.hasNext()) { final LeaseRequest request = it.next(); final boolean completed = processPendingRequest(request); --- 310,336 ---- if (this.isShutDown.get()) { return; } ! // this.lock.lock(); ! // try { if (this.leased.remove(entry)) { final RouteSpecificPool pool = getPool(entry.getRoute()); pool.free(entry, reusable); if (reusable) { ! this.available.offerFirst(entry); onRelease(entry); } else { entry.close(); } processNextPendingRequest(); } ! // } finally { ! // this.lock.unlock(); ! // } fireCallbacks(); } private void processPendingRequests() { ! final Iterator> it = this.leasingRequests.iterator(); while (it.hasNext()) { final LeaseRequest request = it.next(); final boolean completed = processPendingRequest(request); *************** *** 326,332 **** } private void processNextPendingRequest() { ! final ListIterator> it = this.leasingRequests.listIterator(); while (it.hasNext()) { final LeaseRequest request = it.next(); final boolean completed = processPendingRequest(request); --- 344,350 ---- } private void processNextPendingRequest() { ! final Iterator> it = this.leasingRequests.iterator(); while (it.hasNext()) { final LeaseRequest request = it.next(); final boolean completed = processPendingRequest(request); *************** *** 369,376 **** } } if (entry != null) { - this.available.remove(entry); this.leased.add(entry); request.completed(entry); onLease(entry); return true; --- 387,394 ---- } } if (entry != null) { this.leased.add(entry); + this.available.remove(entry); request.completed(entry); onLease(entry); return true; *************** *** 400,407 **** } final int totalAvailable = this.available.size(); if (totalAvailable > freeCapacity - 1) { ! if (!this.available.isEmpty()) { ! final E lastUsed = this.available.removeLast(); lastUsed.close(); final RouteSpecificPool otherpool = getPool(lastUsed.getRoute()); otherpool.remove(lastUsed); --- 418,425 ---- } final int totalAvailable = this.available.size(); if (totalAvailable > freeCapacity - 1) { ! final E lastUsed = this.available.pollLast(); ! if (lastUsed != null) { lastUsed.close(); final RouteSpecificPool otherpool = getPool(lastUsed.getRoute()); otherpool.remove(lastUsed); *************** *** 417,430 **** request.failed(ex); return false; } ! final SessionRequest sessionRequest = this.ioreactor.connect( ! remoteAddress, localAddress, route, this.sessionRequestCallback); final int timout = request.getConnectTimeout() < Integer.MAX_VALUE ? (int) request.getConnectTimeout() : Integer.MAX_VALUE; sessionRequest.setConnectTimeout(timout); ! this.pending.add(sessionRequest); ! pool.addPending(sessionRequest, request.getFuture()); return true; } else { return false; --- 435,452 ---- request.failed(ex); return false; } ! final BasicFuture future = request.getFuture(); final SessionRequest sessionRequest = this.ioreactor.connect( ! remoteAddress, localAddress, route, new InternalSessionRequestCallback(pool, future)); ! // actually useless, but kept for compatibility with unit tests (SRU) ! if (!(this.ioreactor instanceof DefaultConnectingIOReactor)) { ! pool.addPending(sessionRequest, future); ! this.pending.add(sessionRequest); final int timout = request.getConnectTimeout() < Integer.MAX_VALUE ? (int) request.getConnectTimeout() : Integer.MAX_VALUE; sessionRequest.setConnectTimeout(timout); ! } ! // End of comment (SRU) return true; } else { return false; *************** *** 448,457 **** } public void validatePendingRequests() { ! this.lock.lock(); ! try { final long now = System.currentTimeMillis(); ! final ListIterator> it = this.leasingRequests.listIterator(); while (it.hasNext()) { final LeaseRequest request = it.next(); final long deadline = request.getDeadline(); --- 470,479 ---- } public void validatePendingRequests() { ! // this.lock.lock(); ! // try { final long now = System.currentTimeMillis(); ! final Iterator> it = this.leasingRequests.iterator(); while (it.hasNext()) { final LeaseRequest request = it.next(); final long deadline = request.getDeadline(); *************** *** 461,469 **** this.completedRequests.add(request); } } ! } finally { ! this.lock.unlock(); ! } fireCallbacks(); } --- 483,491 ---- this.completedRequests.add(request); } } ! // } finally { ! // this.lock.unlock(); ! // } fireCallbacks(); } *************** *** 474,496 **** @SuppressWarnings("unchecked") final T route = (T) request.getAttachment(); - this.lock.lock(); - try { - this.pending.remove(request); final RouteSpecificPool pool = getPool(route); final IOSession session = request.getSession(); try { final C conn = this.connFactory.create(route, session); final E entry = pool.createEntry(request, conn); - this.leased.add(entry); pool.completed(request, entry); onLease(entry); } catch (final IOException ex) { pool.failed(request, ex); } - } finally { - this.lock.unlock(); - } fireCallbacks(); } --- 496,513 ---- @SuppressWarnings("unchecked") final T route = (T) request.getAttachment(); final RouteSpecificPool pool = getPool(route); final IOSession session = request.getSession(); try { final C conn = this.connFactory.create(route, session); final E entry = pool.createEntry(request, conn); pool.completed(request, entry); + this.pending.remove(request); + this.leased.add(entry); onLease(entry); } catch (final IOException ex) { pool.failed(request, ex); } fireCallbacks(); } *************** *** 501,515 **** @SuppressWarnings("unchecked") final T route = (T) request.getAttachment(); ! this.lock.lock(); ! try { this.pending.remove(request); final RouteSpecificPool pool = getPool(route); pool.cancelled(request); processNextPendingRequest(); ! } finally { ! this.lock.unlock(); ! } fireCallbacks(); } --- 518,532 ---- @SuppressWarnings("unchecked") final T route = (T) request.getAttachment(); ! // this.lock.lock(); ! // try { this.pending.remove(request); final RouteSpecificPool pool = getPool(route); pool.cancelled(request); processNextPendingRequest(); ! // } finally { ! // this.lock.unlock(); ! // } fireCallbacks(); } *************** *** 520,534 **** @SuppressWarnings("unchecked") final T route = (T) request.getAttachment(); ! this.lock.lock(); ! try { this.pending.remove(request); final RouteSpecificPool pool = getPool(route); pool.failed(request, request.getException()); processNextPendingRequest(); ! } finally { ! this.lock.unlock(); ! } fireCallbacks(); } --- 537,551 ---- @SuppressWarnings("unchecked") final T route = (T) request.getAttachment(); ! // this.lock.lock(); ! // try { this.pending.remove(request); final RouteSpecificPool pool = getPool(route); pool.failed(request, request.getException()); processNextPendingRequest(); ! // } finally { ! // this.lock.unlock(); ! // } fireCallbacks(); } *************** *** 539,553 **** @SuppressWarnings("unchecked") final T route = (T) request.getAttachment(); ! this.lock.lock(); ! try { this.pending.remove(request); final RouteSpecificPool pool = getPool(route); pool.timeout(request); processNextPendingRequest(); ! } finally { ! this.lock.unlock(); ! } fireCallbacks(); } --- 556,570 ---- @SuppressWarnings("unchecked") final T route = (T) request.getAttachment(); ! // this.lock.lock(); ! // try { this.pending.remove(request); final RouteSpecificPool pool = getPool(route); pool.timeout(request); processNextPendingRequest(); ! // } finally { ! // this.lock.unlock(); ! // } fireCallbacks(); } *************** *** 562,650 **** public void setMaxTotal(final int max) { Args.positive(max, "Max value"); ! this.lock.lock(); ! try { this.maxTotal = max; ! } finally { ! this.lock.unlock(); ! } } public int getMaxTotal() { ! this.lock.lock(); ! try { return this.maxTotal; ! } finally { ! this.lock.unlock(); ! } } public void setDefaultMaxPerRoute(final int max) { Args.positive(max, "Max value"); ! this.lock.lock(); ! try { this.defaultMaxPerRoute = max; ! } finally { ! this.lock.unlock(); ! } } public int getDefaultMaxPerRoute() { ! this.lock.lock(); ! try { return this.defaultMaxPerRoute; ! } finally { ! this.lock.unlock(); ! } } public void setMaxPerRoute(final T route, final int max) { Args.notNull(route, "Route"); Args.positive(max, "Max value"); ! this.lock.lock(); ! try { this.maxPerRoute.put(route, Integer.valueOf(max)); ! } finally { ! this.lock.unlock(); ! } } public int getMaxPerRoute(final T route) { Args.notNull(route, "Route"); ! this.lock.lock(); ! try { return getMax(route); ! } finally { ! this.lock.unlock(); ! } } public PoolStats getTotalStats() { ! this.lock.lock(); ! try { return new PoolStats( this.leased.size(), this.pending.size(), this.available.size(), this.maxTotal); ! } finally { ! this.lock.unlock(); ! } } public PoolStats getStats(final T route) { Args.notNull(route, "Route"); ! this.lock.lock(); ! try { final RouteSpecificPool pool = getPool(route); return new PoolStats( pool.getLeasedCount(), pool.getPendingCount(), pool.getAvailableCount(), getMax(route)); ! } finally { ! this.lock.unlock(); ! } } /** --- 579,667 ---- public void setMaxTotal(final int max) { Args.positive(max, "Max value"); ! // this.lock.lock(); ! // try { this.maxTotal = max; ! // } finally { ! // this.lock.unlock(); ! // } } public int getMaxTotal() { ! // this.lock.lock(); ! // try { return this.maxTotal; ! // } finally { ! // this.lock.unlock(); ! // } } public void setDefaultMaxPerRoute(final int max) { Args.positive(max, "Max value"); ! // this.lock.lock(); ! // try { this.defaultMaxPerRoute = max; ! // } finally { ! // this.lock.unlock(); ! // } } public int getDefaultMaxPerRoute() { ! // this.lock.lock(); ! // try { return this.defaultMaxPerRoute; ! // } finally { ! // this.lock.unlock(); ! // } } public void setMaxPerRoute(final T route, final int max) { Args.notNull(route, "Route"); Args.positive(max, "Max value"); ! // this.lock.lock(); ! // try { this.maxPerRoute.put(route, Integer.valueOf(max)); ! // } finally { ! // this.lock.unlock(); ! // } } public int getMaxPerRoute(final T route) { Args.notNull(route, "Route"); ! // this.lock.lock(); ! // try { return getMax(route); ! // } finally { ! // this.lock.unlock(); ! // } } public PoolStats getTotalStats() { ! // this.lock.lock(); ! // try { return new PoolStats( this.leased.size(), this.pending.size(), this.available.size(), this.maxTotal); ! // } finally { ! // this.lock.unlock(); ! // } } public PoolStats getStats(final T route) { Args.notNull(route, "Route"); ! // this.lock.lock(); ! // try { final RouteSpecificPool pool = getPool(route); return new PoolStats( pool.getLeasedCount(), pool.getPendingCount(), pool.getAvailableCount(), getMax(route)); ! // } finally { ! // this.lock.unlock(); ! // } } /** *************** *** 653,660 **** * @since 4.3 */ protected void enumAvailable(final PoolEntryCallback callback) { ! this.lock.lock(); ! try { final Iterator it = this.available.iterator(); while (it.hasNext()) { final E entry = it.next(); --- 670,677 ---- * @since 4.3 */ protected void enumAvailable(final PoolEntryCallback callback) { ! // this.lock.lock(); ! // try { final Iterator it = this.available.iterator(); while (it.hasNext()) { final E entry = it.next(); *************** *** 667,675 **** } processPendingRequests(); purgePoolMap(); ! } finally { ! this.lock.unlock(); ! } } /** --- 684,692 ---- } processPendingRequests(); purgePoolMap(); ! // } finally { ! // this.lock.unlock(); ! // } } /** *************** *** 678,694 **** * @since 4.3 */ protected void enumLeased(final PoolEntryCallback callback) { ! this.lock.lock(); ! try { final Iterator it = this.leased.iterator(); while (it.hasNext()) { final E entry = it.next(); callback.process(entry); } processPendingRequests(); ! } finally { ! this.lock.unlock(); ! } } /** --- 695,711 ---- * @since 4.3 */ protected void enumLeased(final PoolEntryCallback callback) { ! // this.lock.lock(); ! // try { final Iterator it = this.leased.iterator(); while (it.hasNext()) { final E entry = it.next(); callback.process(entry); } processPendingRequests(); ! // } finally { ! // this.lock.unlock(); ! // } } /** *************** *** 761,767 **** return buffer.toString(); } ! class InternalSessionRequestCallback implements SessionRequestCallback { public void completed(final SessionRequest request) { requestCompleted(request); --- 778,792 ---- return buffer.toString(); } ! public class InternalSessionRequestCallback implements SessionRequestCallback { ! ! private final BasicFuture future; ! private final RouteSpecificPool pool; ! ! public InternalSessionRequestCallback(final RouteSpecificPool pool, final BasicFuture future) { ! this.pool = pool; ! this.future = future; ! } public void completed(final SessionRequest request) { requestCompleted(request); *************** *** 779,784 **** requestTimeout(request); } } - } --- 804,815 ---- requestTimeout(request); } + public void initiated(final SessionRequest request) { + pool.addPending(request, future); + pending.add(request); + final int timout = request.getConnectTimeout() < Integer.MAX_VALUE ? + (int) request.getConnectTimeout() : Integer.MAX_VALUE; + request.setConnectTimeout(timout); + } } } *** src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java Wed Feb 12 21:35:34 2014 --- src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java Wed Jul 23 11:17:42 2014 *************** *** 227,233 **** final SessionRequestImpl sessionRequest = new SessionRequestImpl( remoteAddress, localAddress, attachment, callback); sessionRequest.setConnectTimeout(this.config.getConnectTimeout()); ! this.requestQueue.add(sessionRequest); this.selector.wakeup(); --- 227,233 ---- final SessionRequestImpl sessionRequest = new SessionRequestImpl( remoteAddress, localAddress, attachment, callback); sessionRequest.setConnectTimeout(this.config.getConnectTimeout()); ! callback.initiated(sessionRequest) ; this.requestQueue.add(sessionRequest); this.selector.wakeup(); *** src/main/java/org/apache/http/nio/pool/RouteSpecificPool.java Wed Feb 12 21:35:36 2014 --- src/main/java/org/apache/http/nio/pool/RouteSpecificPool.java Wed Jul 23 12:00:25 2014 *************** *** 27,38 **** package org.apache.http.nio.pool; import java.net.SocketTimeoutException; ! import java.util.HashMap; ! import java.util.HashSet; import java.util.Iterator; - import java.util.LinkedList; - import java.util.Map; import java.util.Set; import org.apache.http.annotation.NotThreadSafe; import org.apache.http.concurrent.BasicFuture; --- 27,38 ---- package org.apache.http.nio.pool; import java.net.SocketTimeoutException; ! import java.util.Collections; ! import java.util.Deque; import java.util.Iterator; import java.util.Set; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentLinkedDeque; import org.apache.http.annotation.NotThreadSafe; import org.apache.http.concurrent.BasicFuture; *************** *** 46,60 **** private final T route; private final Set leased; ! private final LinkedList available; ! private final Map> pending; RouteSpecificPool(final T route) { super(); this.route = route; ! this.leased = new HashSet(); ! this.available = new LinkedList(); ! this.pending = new HashMap>(); } public T getRoute() { --- 46,60 ---- private final T route; private final Set leased; ! private final Deque available; ! private final ConcurrentHashMap> pending; RouteSpecificPool(final T route) { super(); this.route = route; ! this.leased = Collections.newSetFromMap(new ConcurrentHashMap()) ; //new HashSet(); ! this.available = new ConcurrentLinkedDeque(); ! this.pending = new ConcurrentHashMap>(); } public T getRoute() { *************** *** 81,104 **** public E getFree(final Object state) { if (!this.available.isEmpty()) { if (state != null) { final Iterator it = this.available.iterator(); while (it.hasNext()) { final E entry = it.next(); if (state.equals(entry.getState())) { ! it.remove(); ! this.leased.add(entry); ! return entry; } } } final Iterator it = this.available.iterator(); while (it.hasNext()) { final E entry = it.next(); if (entry.getState() == null) { ! it.remove(); ! this.leased.add(entry); ! return entry; } } } --- 81,115 ---- public E getFree(final Object state) { if (!this.available.isEmpty()) { + E selectedEntry = null ; if (state != null) { final Iterator it = this.available.iterator(); while (it.hasNext()) { final E entry = it.next(); if (state.equals(entry.getState())) { ! selectedEntry = entry ; ! break ; } } } + if (selectedEntry == null) { final Iterator it = this.available.iterator(); while (it.hasNext()) { final E entry = it.next(); if (entry.getState() == null) { ! selectedEntry = entry ; ! break ; ! } ! } ! } ! ! if (selectedEntry != null) { ! if (available.remove(selectedEntry)) { ! this.leased.add(selectedEntry); ! return selectedEntry; ! } ! else { ! return getFree(state) ; } } } *************** *** 106,122 **** } public E getLastUsed() { ! if (!this.available.isEmpty()) { ! return this.available.getLast(); ! } else { ! return null; ! } } public boolean remove(final E entry) { Args.notNull(entry, "Pool entry"); - if (!this.available.remove(entry)) { if (!this.leased.remove(entry)) { return false; } } --- 117,129 ---- } public E getLastUsed() { ! return this.available.peekLast(); } public boolean remove(final E entry) { Args.notNull(entry, "Pool entry"); if (!this.leased.remove(entry)) { + if (!this.available.remove(entry)) { return false; } } *************** *** 128,141 **** final boolean found = this.leased.remove(entry); Asserts.check(found, "Entry %s has not been leased from this pool", entry); if (reusable) { ! this.available.addFirst(entry); } } public void addPending( final SessionRequest sessionRequest, final BasicFuture future) { ! this.pending.put(sessionRequest, future); } private BasicFuture removeRequest(final SessionRequest request) { --- 135,148 ---- final boolean found = this.leased.remove(entry); Asserts.check(found, "Entry %s has not been leased from this pool", entry); if (reusable) { ! this.available.offerFirst(entry); } } public void addPending( final SessionRequest sessionRequest, final BasicFuture future) { ! this.pending.putIfAbsent(sessionRequest, future); } private BasicFuture removeRequest(final SessionRequest request) { *** src/main/java/org/apache/http/nio/reactor/SessionRequestCallback.java Wed Feb 12 21:35:36 2014 --- src/main/java/org/apache/http/nio/reactor/SessionRequestCallback.java Wed Jul 23 11:24:51 2014 *************** *** 27,32 **** --- 27,33 ---- package org.apache.http.nio.reactor; + /** * SessionRequestCallback interface can be used to get notifications of * completion of session requests asynchronously without having to wait *************** *** 67,71 **** --- 68,74 ---- * @param request session request. */ void cancelled(SessionRequest request); + + void initiated(SessionRequest sessionRequest); }