diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java index 93b22e4..7ac5fad 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java @@ -39,7 +39,6 @@ import org.apache.lucene.util.ThreadInterruptedException; final class DocumentsWriterStallControl { @SuppressWarnings("serial") private static final class Sync extends AbstractQueuedSynchronizer { - volatile boolean hasBlockedThreads = false; // only with assert Sync() { setState(0); @@ -67,15 +66,10 @@ final class DocumentsWriterStallControl { @Override public int tryAcquireShared(int acquires) { - assert maybeSetHasBlocked(getState()); return getState() == 0 ? 1 : -1; } - // only used for testing - private boolean maybeSetHasBlocked(int state) { - hasBlockedThreads |= getState() != 0; - return true; - } + @Override public boolean tryReleaseShared(int newState) { @@ -130,7 +124,7 @@ final class DocumentsWriterStallControl { } boolean hasBlocked() { // for tests - return sync.hasBlockedThreads; + return sync.hasQueuedThreads(); } static interface MemoryController { @@ -138,4 +132,12 @@ final class DocumentsWriterStallControl { long flushBytes(); long stallLimitBytes(); } + + public boolean isHealthy() { + return sync.isHealthy(); + } + + public boolean isThreadQueued(Thread t) { + return sync.isQueued(t); + } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java index 3a9fe2b..52cf7cd 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java @@ -127,22 +127,19 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { int numStallers = atLeast(1); int numReleasers = atLeast(1); int numWaiters = atLeast(1); - - final CountDownLatch[] latches = new CountDownLatch[] { - new CountDownLatch(numStallers + numReleasers), new CountDownLatch(1), - new CountDownLatch(numWaiters)}; + final Synchonizer sync = new Synchonizer(numStallers + numReleasers, numStallers + numReleasers+numWaiters); Thread[] threads = new Thread[numReleasers + numStallers + numWaiters]; List exceptions = Collections.synchronizedList(new ArrayList()); for (int i = 0; i < numReleasers; i++) { - threads[i] = new Updater(stop, checkPoint, ctrl, latches, true, exceptions); + threads[i] = new Updater(stop, checkPoint, ctrl, sync, true, exceptions); } for (int i = numReleasers; i < numReleasers + numStallers; i++) { - threads[i] = new Updater(stop, checkPoint, ctrl, latches, false, exceptions); + threads[i] = new Updater(stop, checkPoint, ctrl, sync, false, exceptions); } for (int i = numReleasers + numStallers; i < numReleasers + numStallers + numWaiters; i++) { - threads[i] = new Waiter(stop, checkPoint, ctrl, latches, exceptions); + threads[i] = new Waiter(stop, checkPoint, ctrl, sync, exceptions); } @@ -151,7 +148,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { for (int i = 0; i < iters; i++) { if (checkPoint.get()) { - assertTrue("timed out waiting for update threads - deadlock?", latches[0].await(10, TimeUnit.SECONDS)); + assertTrue("timed out waiting for update threads - deadlock?", sync.updateJoin.await(10, TimeUnit.SECONDS)); if (!exceptions.isEmpty()) { for (Throwable throwable : exceptions) { throwable.printStackTrace(); @@ -159,27 +156,38 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { fail("got exceptions in threads"); } - if (!ctrl.anyStalledThreads()) { - assertTrue( - "control claims no stalled threads but waiter seems to be blocked", - latches[2].await(10, TimeUnit.SECONDS)); - } - checkPoint.set(false); + if (ctrl.hasBlocked() && ctrl.isHealthy()) { + assertState(numReleasers, numStallers, numWaiters, threads, ctrl); + + + } - latches[1].countDown(); + checkPoint.set(false); + sync.waiter.countDown(); + sync.leftCheckpoint.await(); } assertFalse(checkPoint.get()); + assertEquals(0, sync.waiter.getCount()); if (random().nextInt(2) == 0) { - latches[0] = new CountDownLatch(numStallers + numReleasers); - latches[1] = new CountDownLatch(1); - latches[2] = new CountDownLatch(numWaiters); + sync.reset(numStallers + numReleasers, numStallers + numReleasers + + numWaiters); checkPoint.set(true); } } + if (!checkPoint.get()) { + sync.reset(numStallers + numReleasers, numStallers + numReleasers + + numWaiters); + checkPoint.set(true); + } + assertTrue(sync.updateJoin.await(10, TimeUnit.SECONDS)); + assertState(numReleasers, numStallers, numWaiters, threads, ctrl); + checkPoint.set(false); stop.set(true); - latches[1].countDown(); + sync.waiter.countDown(); + sync.leftCheckpoint.await(); + for (int i = 0; i < threads.length; i++) { memCtrl.limit = 1000; @@ -196,20 +204,45 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { } } + private void assertState(int numReleasers, int numStallers, int numWaiters, Thread[] threads, DocumentsWriterStallControl ctrl) throws InterruptedException { + int millisToSleep = 100; + while (true) { + if (ctrl.hasBlocked() && ctrl.isHealthy()) { + for (int n = numReleasers + numStallers; n < numReleasers + + numStallers + numWaiters; n++) { + if (ctrl.isThreadQueued(threads[n])) { + if (millisToSleep < 60000) { + Thread.sleep(millisToSleep); + millisToSleep *=2; + break; + } else { + fail("control claims no stalled threads but waiter seems to be blocked "); + } + } + } + break; + } else { + break; + } + } + + } + public static class Waiter extends Thread { - private CountDownLatch[] latches; + private Synchonizer sync; private DocumentsWriterStallControl ctrl; private AtomicBoolean checkPoint; private AtomicBoolean stop; private List exceptions; public Waiter(AtomicBoolean stop, AtomicBoolean checkPoint, - DocumentsWriterStallControl ctrl, CountDownLatch[] latches, + DocumentsWriterStallControl ctrl, Synchonizer sync, List exceptions) { + super("waiter"); this.stop = stop; this.checkPoint = checkPoint; this.ctrl = ctrl; - this.latches = latches; + this.sync = sync; this.exceptions = exceptions; } @@ -218,13 +251,10 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { while (!stop.get()) { ctrl.waitIfStalled(); if (checkPoint.get()) { - CountDownLatch join = latches[2]; - CountDownLatch wait = latches[1]; - join.countDown(); try { - assertTrue(wait.await(10, TimeUnit.SECONDS)); + assertTrue(sync.await()); } catch (InterruptedException e) { - System.out.println("[Waiter] got interrupted - wait count: " + wait.getCount()); + System.out.println("[Waiter] got interrupted - wait count: " + sync.waiter.getCount()); throw new ThreadInterruptedException(e); } } @@ -238,7 +268,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { public static class Updater extends Thread { - private CountDownLatch[] latches; + private Synchonizer sync; private DocumentsWriterStallControl ctrl; private AtomicBoolean checkPoint; private AtomicBoolean stop; @@ -246,12 +276,13 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { private List exceptions; public Updater(AtomicBoolean stop, AtomicBoolean checkPoint, - DocumentsWriterStallControl ctrl, CountDownLatch[] latches, + DocumentsWriterStallControl ctrl, Synchonizer sync, boolean release, List exceptions) { + super("updater"); this.stop = stop; this.checkPoint = checkPoint; this.ctrl = ctrl; - this.latches = latches; + this.sync = sync; this.release = release; this.exceptions = exceptions; } @@ -268,22 +299,24 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { ctrl.updateStalled(memCtrl); } if (checkPoint.get()) { - CountDownLatch join = latches[0]; - CountDownLatch wait = latches[1]; - join.countDown(); + sync.updateJoin.countDown(); try { - assertTrue(wait.await(10, TimeUnit.SECONDS)); + assertTrue(sync.await()); } catch (InterruptedException e) { - System.out.println("[Updater] got interrupted - wait count: " + wait.getCount()); + System.out.println("[Updater] got interrupted - wait count: " + sync.waiter.getCount()); throw new ThreadInterruptedException(e); } + sync.leftCheckpoint.countDown(); + } + if (random().nextBoolean()) { + Thread.yield(); } - Thread.yield(); } } catch (Throwable e) { e.printStackTrace(); exceptions.add(e); } + sync.updateJoin.countDown(); } } @@ -366,4 +399,25 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { } } + + private static final class Synchonizer { + volatile CountDownLatch waiter; + volatile CountDownLatch updateJoin; + volatile CountDownLatch leftCheckpoint; + + public Synchonizer(int numUpdater, int numThreads) { + reset(numUpdater, numThreads); + } + + public void reset(int numUpdaters, int numThreads) { + this.waiter = new CountDownLatch(1); + this.updateJoin = new CountDownLatch(numUpdaters); + this.leftCheckpoint = new CountDownLatch(numUpdaters); + } + + public boolean await() throws InterruptedException { + return waiter.await(10, TimeUnit.SECONDS); + } + + } }