Index: org/apache/commons/pool/impl/GenericObjectPool.java =================================================================== --- org/apache/commons/pool/impl/GenericObjectPool.java (revision 630627) +++ org/apache/commons/pool/impl/GenericObjectPool.java (working copy) @@ -487,11 +487,16 @@ _factory = factory; _maxActive = maxActive; _lifo = lifo; + _maxWait = maxWait; switch(whenExhaustedAction) { case WHEN_EXHAUSTED_BLOCK: case WHEN_EXHAUSTED_FAIL: + _whenExhaustedAction = whenExhaustedAction; + _semaphore = new FairnessSemaphore(_maxActive); + break; case WHEN_EXHAUSTED_GROW: _whenExhaustedAction = whenExhaustedAction; + _semaphore = new FairnessSemaphore(-1); break; default: throw new IllegalArgumentException("whenExhaustedAction " + whenExhaustedAction + " not recognized."); @@ -496,7 +501,6 @@ default: throw new IllegalArgumentException("whenExhaustedAction " + whenExhaustedAction + " not recognized."); } - _maxWait = maxWait; _maxIdle = maxIdle; _minIdle = minIdle; _testOnBorrow = testOnBorrow; @@ -532,7 +536,9 @@ */ public synchronized void setMaxActive(int maxActive) { _maxActive = maxActive; - notifyAll(); + if (_whenExhaustedAction != WHEN_EXHAUSTED_GROW) { + _semaphore.setSize(_maxActive); + } } /** @@ -561,9 +567,12 @@ switch(whenExhaustedAction) { case WHEN_EXHAUSTED_BLOCK: case WHEN_EXHAUSTED_FAIL: + _whenExhaustedAction = whenExhaustedAction; + _semaphore.setSize(_maxActive); + break; case WHEN_EXHAUSTED_GROW: _whenExhaustedAction = whenExhaustedAction; - notifyAll(); + _semaphore.setSize(-1); break; default: throw new IllegalArgumentException("whenExhaustedAction " + whenExhaustedAction + " not recognized."); @@ -607,7 +616,6 @@ */ public synchronized void setMaxWait(long maxWait) { _maxWait = maxWait; - notifyAll(); } /** @@ -627,7 +635,6 @@ */ public synchronized void setMaxIdle(int maxIdle) { _maxIdle = maxIdle; - notifyAll(); } /** @@ -644,7 +651,6 @@ */ public synchronized void setMinIdle(int minIdle) { _minIdle = minIdle; - notifyAll(); } /** @@ -903,7 +909,6 @@ setTimeBetweenEvictionRunsMillis(conf.timeBetweenEvictionRunsMillis); setSoftMinEvictableIdleTimeMillis(conf.softMinEvictableIdleTimeMillis); setLifo(conf.lifo); - notifyAll(); } //-- ObjectPool methods ------------------------------------------ @@ -908,8 +913,11 @@ //-- ObjectPool methods ------------------------------------------ + FairnessSemaphore _semaphore; + public Object borrowObject() throws Exception { - long starttime = System.currentTimeMillis(); + _semaphore.acquire((_whenExhaustedAction == WHEN_EXHAUSTED_FAIL) ? 0: _maxWait); + for(;;) { ObjectTimestampPair pair = null; @@ -921,49 +929,6 @@ } catch(NoSuchElementException e) { ; /* ignored */ } - - // otherwise - if(null == pair) { - // check if we can create one - // (note we know that the num sleeping is 0, else we wouldn't be here) - if(_maxActive < 0 || _numActive < _maxActive) { - // allow new object to be created - } else { - // the pool is exhausted - switch(_whenExhaustedAction) { - case WHEN_EXHAUSTED_GROW: - // allow new object to be created - break; - case WHEN_EXHAUSTED_FAIL: - throw new NoSuchElementException("Pool exhausted"); - case WHEN_EXHAUSTED_BLOCK: - try { - if(_maxWait <= 0) { - wait(); - } else { - // this code may be executed again after a notify then continue cycle - // so, need to calculate the amount of time to wait - final long elapsed = (System.currentTimeMillis() - starttime); - final long waitTime = _maxWait - elapsed; - if (waitTime > 0) - { - wait(waitTime); - } - } - } catch(InterruptedException e) { - Thread.currentThread().interrupt(); - throw e; - } - if(_maxWait > 0 && ((System.currentTimeMillis() - starttime) >= _maxWait)) { - throw new NoSuchElementException("Timeout waiting for idle object"); - } else { - continue; // keep looping - } - default: - throw new IllegalArgumentException("WhenExhaustedAction property " + _whenExhaustedAction + " not recognized."); - } - } - } _numActive++; } @@ -976,10 +941,10 @@ newlyCreated = true; } finally { if (!newlyCreated) { - // object cannot be created + // object cannot be created and throwing Exception synchronized (this) { _numActive--; - notifyAll(); + _semaphore.release(); } } } @@ -1002,7 +967,7 @@ } synchronized (this) { _numActive--; - notifyAll(); + _semaphore.release(); } if(newlyCreated) { throw new NoSuchElementException("Could not create a validated object, cause: " + e.getMessage()); @@ -1022,7 +987,7 @@ } finally { synchronized (this) { _numActive--; - notifyAll(); // _numActive has changed + _semaphore.release(); } } } @@ -1040,7 +1005,6 @@ it.remove(); } _pool.clear(); - notifyAll(); // num sleeping has changed } /** @@ -1086,9 +1050,10 @@ // "behavior flag",decrementNumActive, from addObjectToPool. synchronized(this) { _numActive--; - notifyAll(); } } + } finally { + _semaphore.release(); } } @@ -1135,7 +1100,6 @@ if (decrementNumActive) { synchronized(this) { _numActive--; - notifyAll(); } } } Index: org/apache/commons/pool/impl/FairnessSemaphore.java =================================================================== --- org/apache/commons/pool/impl/FairnessSemaphore.java (revision 0) +++ org/apache/commons/pool/impl/FairnessSemaphore.java (revision 0) @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.commons.pool.impl; + +import java.util.LinkedList; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * Semaphore with fairness feature. + * @author Takayuki Kaneko + */ +public class FairnessSemaphore { + private int currentCount; + + private int size; + + private List latches; + + /** + * constructor. + * @param size semaphore size, that assures maximum concurrent thread number + */ + public FairnessSemaphore(int size) { + latches = new LinkedList(); + currentCount = 0; + this.size = size; + } + + /** + * acquire semaphore with unlimited timeout. + * @throws InterruptedException + */ + public void acquire() throws InterruptedException { + acquire(-1); + } + + /** + * acquire semaphore with timeout + * @param waitTime maximum waiting time before getting semaphore + * @throws InterruptedException + */ + public void acquire(long waitTime) throws InterruptedException { + if (size < 0) { + return; + } + + TimerLatch l = null; + synchronized (this) { + currentCount++; + if (currentCount > size) { + l = new TimerLatch(); + latches.add(l); + } + } + if (l != null) { + try { + l.checkAndWait(waitTime); + } catch (NoSuchElementException e) { + synchronized (this) { + currentCount--; + latches.remove(l); + } + throw e; + } catch (InterruptedException e) { + synchronized (this) { + currentCount--; + latches.remove(l); + } + throw e; + } + } + } + + /** + * release semaphore + */ + public void release() { + if (size < 0) { + return; + } + synchronized (this) { + currentCount--; + + // notify to the latch that is before timeout. + while (latches.size() > 0) { + TimerLatch l = (TimerLatch) latches.remove(0); + if (l.notifyLatch()) { + break; + } + } + } + } + + /** + * set semaphore size + * @param size semaphore size + */ + public synchronized void setSize(int size) { + this.size = size; + } + + private static class TimerLatch { + private boolean notified = false; + private long startTime; + private boolean timeouted = false; + + public synchronized void checkAndWait(long waitTime) + throws InterruptedException { + startTime = System.currentTimeMillis(); + if (waitTime < 0) { + while (!notified) { + wait(); + } + } else { + while (!notified) { + long remainTime = startTime + waitTime + - System.currentTimeMillis(); + if (remainTime <= 0) { + timeouted = true; + throw new NoSuchElementException( + "Timeout waiting for acuiring semaphore"); + } + wait(remainTime); + } + } + } + + /** + * notify latch + * @return if this latch is timeout, return false. + */ + public synchronized boolean notifyLatch() { + if (timeouted) { + return false; + } + notified = true; + notify(); + return true; + } + } +}