diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index 0c7cf74..bb57bb0 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -26,6 +26,7 @@ import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; +import org.apache.lucene.index.DocumentsWriterStallControl.MemoryController; import org.apache.lucene.util.ThreadInterruptedException; /** @@ -40,7 +41,7 @@ import org.apache.lucene.util.ThreadInterruptedException; * {@link IndexWriterConfig#getRAMPerThreadHardLimitMB()} to prevent address * space exhaustion. */ -final class DocumentsWriterFlushControl { +final class DocumentsWriterFlushControl implements MemoryController { private final long hardMaxBytesPerDWPT; private long activeBytes = 0; @@ -88,7 +89,7 @@ final class DocumentsWriterFlushControl { return flushBytes + activeBytes; } - long stallLimitBytes() { + public long stallLimitBytes() { final double maxRamMB = config.getRAMBufferSizeMB(); return maxRamMB != IndexWriterConfig.DISABLE_AUTO_FLUSH ? (long)(2 * (maxRamMB * 1024 * 1024)) : Long.MAX_VALUE; } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java index d61cc6d..40c2cf3 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java @@ -1,6 +1,6 @@ package org.apache.lucene.index; -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -19,6 +19,7 @@ package org.apache.lucene.index; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; +import org.apache.lucene.util.ThreadInterruptedException; /** * Controls the health status of a {@link DocumentsWriter} sessions. This class @@ -55,11 +56,11 @@ final class DocumentsWriterStallControl { boolean tryReset() { final int oldState = getState(); - if (oldState == 0) + if (oldState == 0) { return true; + } if (compareAndSetState(oldState, 0)) { - releaseShared(0); - return true; + return releaseShared(0); } return false; } @@ -97,11 +98,11 @@ final class DocumentsWriterStallControl { * {@link DocumentsWriterStallControl} to healthy and release all threads waiting on * {@link #waitIfStalled()} */ - void updateStalled(DocumentsWriterFlushControl flushControl) { + void updateStalled(MemoryController controller) { do { // if we have more flushing / blocked DWPT than numActiveDWPT we stall! // don't stall if we have queued flushes - threads should be hijacked instead - while (flushControl.netBytes() > flushControl.stallLimitBytes()) { + while (controller.netBytes() > controller.stallLimitBytes()) { if (sync.trySetStalled()) { assert wasStalled = true; return; @@ -111,10 +112,19 @@ final class DocumentsWriterStallControl { } void waitIfStalled() { - sync.acquireShared(0); + try { + sync.acquireSharedInterruptibly(0); + } catch (InterruptedException e) { + throw new ThreadInterruptedException(e); + } } boolean hasBlocked() { // for tests return sync.hasBlockedThreads; } + + static interface MemoryController { + long netBytes(); + long stallLimitBytes(); + } } \ No newline at end of file diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java new file mode 100644 index 0000000..a658234 --- /dev/null +++ b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java @@ -0,0 +1,355 @@ +package org.apache.lucene.index; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.lucene.index.DocumentsWriterStallControl.MemoryController; +import org.apache.lucene.util.LuceneTestCase; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeaks; + +/** + * Tests for {@link DocumentsWriterStallControl} + */ +@ThreadLeaks(failTestIfLeaking = true) +public class TestDocumentsWriterStallControl extends LuceneTestCase { + + public void testSimpleStall() throws InterruptedException { + DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl(); + SimpleMemCtrl memCtrl = new SimpleMemCtrl(); + memCtrl.limit = 1000; + memCtrl.netBytes = 1000; + ctrl.updateStalled(memCtrl); + Thread[] waitThreads = waitThreads(atLeast(1), ctrl); + start(waitThreads); + assertFalse(ctrl.hasBlocked()); + assertFalse(ctrl.anyStalledThreads()); + join(waitThreads, 10); + + // now stall threads and wake them up again + memCtrl.netBytes = 1001; + ctrl.updateStalled(memCtrl); + waitThreads = waitThreads(atLeast(1), ctrl); + start(waitThreads); + awaitState(100, Thread.State.WAITING, waitThreads); + assertTrue(ctrl.hasBlocked()); + assertTrue(ctrl.anyStalledThreads()); + memCtrl.netBytes = 50; + ctrl.updateStalled(memCtrl); + assertFalse(ctrl.anyStalledThreads()); + join(waitThreads, 100); + } + + public void testRandom() throws InterruptedException { + final DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl(); + SimpleMemCtrl memCtrl = new SimpleMemCtrl(); + memCtrl.limit = 1000; + memCtrl.netBytes = 1; + ctrl.updateStalled(memCtrl); + Thread[] stallThreads = new Thread[atLeast(3)]; + for (int i = 0; i < stallThreads.length; i++) { + final int threadId = i; + stallThreads[i] = new Thread() { + public void run() { + int baseBytes = threadId % 2 == 0 ? 500 : 700; + SimpleMemCtrl memCtrl = new SimpleMemCtrl(); + memCtrl.limit = 1000; + memCtrl.netBytes = 1; + int iters = atLeast(1000); + for (int j = 0; j < iters; j++) { + memCtrl.netBytes = baseBytes + random().nextInt(1000); + ctrl.updateStalled(memCtrl); + if (random().nextInt(5) == 0) { // thread 0 only updates + ctrl.waitIfStalled(); + } + } + } + }; + } + start(stallThreads); + long time = System.currentTimeMillis(); + /* + * use a 100 sec timeout to make sure we not hang forever. join will fail in + * that case + */ + while ((System.currentTimeMillis() - time) < 100 * 1000 + && !terminated(stallThreads)) { + ctrl.updateStalled(memCtrl); + if (random().nextBoolean()) { + Thread.yield(); + } else { + Thread.sleep(1); + } + + } + join(stallThreads, 100); + + } + + public void testAccquireReleaseRace() throws InterruptedException { + final DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl(); + SimpleMemCtrl memCtrl = new SimpleMemCtrl(); + memCtrl.limit = 1000; + memCtrl.netBytes = 1; + ctrl.updateStalled(memCtrl); + final AtomicBoolean stop = new AtomicBoolean(false); + final AtomicBoolean checkPoint = new AtomicBoolean(true); + + int numStallers = atLeast(1); + int numReleasers = atLeast(1); + int numWaiters = atLeast(1); + + final CountDownLatch[] latches = new CountDownLatch[] { + new CountDownLatch(numStallers + numReleasers), new CountDownLatch(1), + new CountDownLatch(numWaiters)}; + Thread[] threads = new Thread[numReleasers + numStallers + numWaiters]; + List exceptions = Collections.synchronizedList(new ArrayList()); + for (int i = 0; i < numReleasers; i++) { + threads[i] = new Updater(stop, checkPoint, ctrl, latches, true, exceptions); + } + for (int i = numReleasers; i < numReleasers + numStallers; i++) { + threads[i] = new Updater(stop, checkPoint, ctrl, latches, false, exceptions); + + } + for (int i = numReleasers + numStallers; i < numReleasers + numStallers + + numWaiters; i++) { + threads[i] = new Waiter(stop, checkPoint, ctrl, latches, exceptions); + + } + + start(threads); + int iters = atLeast(20000); + for (int i = 0; i < iters; i++) { + if (checkPoint.get()) { + + latches[0].await(5, TimeUnit.SECONDS); + if (!exceptions.isEmpty()) { + for (Throwable throwable : exceptions) { + throwable.printStackTrace(); + } + fail("got exceptions in threads"); + } + + if (!ctrl.anyStalledThreads()) { + assertTrue( + "control claims no stalled threads but waiter seems to be blocked", + latches[2].await(3, TimeUnit.SECONDS)); + } + checkPoint.set(false); + + latches[1].countDown(); + } + assertFalse(checkPoint.get()); + if (random().nextInt(2) == 0) { + latches[0] = new CountDownLatch(numStallers + numReleasers); + latches[1] = new CountDownLatch(1); + latches[2] = new CountDownLatch(numWaiters); + checkPoint.set(true); + } + + } + + stop.set(true); + memCtrl.limit = 1000; + memCtrl.netBytes = 1; + ctrl.updateStalled(memCtrl); + if (checkPoint.get()) { + latches[1].countDown(); + } + + for (int i = 0; i < threads.length; i++) { + threads[i].join(2000); + if (threads[i].isAlive() && threads[i] instanceof Waiter) { + if (threads[i].getState() == Thread.State.WAITING) { + fail("waiter is not released - anyThreadsStalled: " + + ctrl.anyStalledThreads()); + } + } + } + } + + public static class Waiter extends Thread { + private CountDownLatch[] latches; + private DocumentsWriterStallControl ctrl; + private AtomicBoolean checkPoint; + private AtomicBoolean stop; + private List exceptions; + + public Waiter(AtomicBoolean stop, AtomicBoolean checkPoint, + DocumentsWriterStallControl ctrl, CountDownLatch[] latches, + List exceptions) { + this.stop = stop; + this.checkPoint = checkPoint; + this.ctrl = ctrl; + this.latches = latches; + this.exceptions = exceptions; + } + + public void run() { + try { + while (!stop.get()) { + ctrl.waitIfStalled(); + if (checkPoint.get()) { + CountDownLatch join = latches[2]; + CountDownLatch wait = latches[1]; + join.countDown(); + try { + wait.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } catch (Throwable e) { + e.printStackTrace(); + exceptions.add(e); + } + } + } + + public static class Updater extends Thread { + + private CountDownLatch[] latches; + private DocumentsWriterStallControl ctrl; + private AtomicBoolean checkPoint; + private AtomicBoolean stop; + private boolean release; + private List exceptions; + + public Updater(AtomicBoolean stop, AtomicBoolean checkPoint, + DocumentsWriterStallControl ctrl, CountDownLatch[] latches, + boolean release, List exceptions) { + this.stop = stop; + this.checkPoint = checkPoint; + this.ctrl = ctrl; + this.latches = latches; + this.release = release; + this.exceptions = exceptions; + } + + public void run() { + try { + SimpleMemCtrl memCtrl = new SimpleMemCtrl(); + memCtrl.limit = 1000; + memCtrl.netBytes = release ? 1 : 2000; + while (!stop.get()) { + int internalIters = release && random().nextBoolean() ? atLeast(5) : 1; + for (int i = 0; i < internalIters; i++) { + ctrl.updateStalled(memCtrl); + } + if (checkPoint.get()) { + CountDownLatch join = latches[0]; + CountDownLatch wait = latches[1]; + join.countDown(); + try { + wait.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + Thread.yield(); + } + } catch (Throwable e) { + e.printStackTrace(); + exceptions.add(e); + } + } + + } + + public static boolean terminated(Thread[] threads) { + for (Thread thread : threads) { + if (Thread.State.TERMINATED != thread.getState()) return false; + } + return true; + } + + public static void start(Thread[] tostart) throws InterruptedException { + for (Thread thread : tostart) { + thread.start(); + } + Thread.sleep(1); // let them start + } + + public static void join(Thread[] toJoin, long timeout) + throws InterruptedException { + for (Thread thread : toJoin) { + thread.join(timeout); + assertEquals(thread.getState().toString(), Thread.State.TERMINATED, + thread.getState()); + } + } + + public static Thread[] waitThreads(int num, + final DocumentsWriterStallControl ctrl) { + Thread[] array = new Thread[num]; + for (int i = 0; i < array.length; i++) { + array[i] = new Thread() { + public void run() { + ctrl.waitIfStalled(); + } + }; + } + return array; + } + + public static void awaitState(long timeout, Thread.State state, + Thread... threads) throws InterruptedException { + long t = System.currentTimeMillis(); + while (System.currentTimeMillis() - t <= timeout) { + boolean done = true; + for (Thread thread : threads) { + if (thread.getState() != state) { + done = false; + } + } + if (done) { + return; + } + if (random().nextBoolean()) { + Thread.yield(); + } else { + Thread.sleep(1); + } + } + fail("timed out waiting for state: " + state + " timeout: " + timeout + + " ms"); + } + + private static class SimpleMemCtrl implements MemoryController { + long netBytes; + long limit; + + @Override + public long netBytes() { + return netBytes; + } + + @Override + public long stallLimitBytes() { + return limit; + } + + } +}