Index: activemq-core/src/test/java/org/apache/activemq/thread/PooledTaskRunnerTest.java =================================================================== --- activemq-core/src/test/java/org/apache/activemq/thread/PooledTaskRunnerTest.java (revision 650911) +++ activemq-core/src/test/java/org/apache/activemq/thread/PooledTaskRunnerTest.java (working copy) @@ -21,8 +21,12 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import junit.framework.TestCase; @@ -60,6 +64,62 @@ runner.shutdown(); } + + public void testWakeupResultsInThreadSafeCalls() throws Exception { + + ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, getName()); + thread.setDaemon(true); + thread.setPriority(Thread.NORM_PRIORITY); + return thread; + } + }); + final CountDownLatch doneLatch = new CountDownLatch( 100 ); + final AtomicInteger clashCount = new AtomicInteger(); + final AtomicInteger count = new AtomicInteger(); + + + final PooledTaskRunner runner = new PooledTaskRunner(executor, new Task() { + String threadUnSafeVal = null; + public boolean iterate() { + if (threadUnSafeVal != null) { + clashCount.incrementAndGet(); + } + threadUnSafeVal = Thread.currentThread().getName(); + count.incrementAndGet(); + doneLatch.countDown(); + if (!threadUnSafeVal.equals(Thread.currentThread().getName())) { + clashCount.incrementAndGet(); + } + threadUnSafeVal = null; + return false; + } + }, 1 ); + + Runnable doWakeup = new Runnable() { + public void run() { + try { + runner.wakeup(); + } catch (InterruptedException ignored) { + } + } + }; + + final int iterations = 1000; + for (int i=0; i< iterations; i++) { + if (i%100 == 0) { + Thread.sleep(10); + } + executor.execute(doWakeup); + } + + doneLatch.await(20, TimeUnit.SECONDS); + assertEquals("thread safety clash", 0, clashCount.get()); + assertTrue("called more than once", count.get() > 1); + runner.shutdown(); + } + public void testShutsDownAfterRunnerFailure() throws Exception { Future future = executor.submit( new Callable() { public Object call() throws Exception { Index: activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java =================================================================== --- activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java (revision 650911) +++ activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java (working copy) @@ -127,24 +127,23 @@ } finally { synchronized( runable ) { iterating = false; - } - } - - synchronized (runable) { - if (shutdown) { - queued = false; runable.notifyAll(); - return; - } + if (shutdown) { + queued = false; + runable.notifyAll(); + return; + } - // If we could not iterate all the items - // then we need to re-queue. - if (!done) { - queued = true; - } + // If we could not iterate all the items + // then we need to re-queue. + if (!done) { + queued = true; + } - if (queued) { - executor.execute(runable); + if (queued) { + executor.execute(runable); + } + } } }