diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index bb57bb0..39d2359 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -190,10 +190,13 @@ final class DocumentsWriterFlushControl implements MemoryController { Long bytes = flushingWriters.remove(dwpt); flushBytes -= bytes.longValue(); perThreadPool.recycle(dwpt); - stallControl.updateStalled(this); assert assertMemory(); } finally { - notifyAll(); + try { + stallControl.updateStalled(this); + } finally { + notifyAll(); + } } } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java index 80dbf9b..93b22e4 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java @@ -100,9 +100,19 @@ final class DocumentsWriterStallControl { */ void updateStalled(MemoryController controller) { do { - // if we have more flushing / blocked DWPT than numActiveDWPT we stall! - // don't stall if we have queued flushes - threads should be hijacked instead - while (controller.netBytes() > controller.stallLimitBytes()) { + final long netBytes = controller.netBytes(); + final long flushBytes = controller.flushBytes(); + final long limit = controller.stallLimitBytes(); + assert netBytes >= flushBytes; + assert limit > 0; + /* + * we block indexing threads if net byte grows due to slow flushes + * yet, for small ram buffers and large documents we can easily + * reach the limit without any ongoing flushes. we need to ensure + * that we don't stall/block if an ongoing or pending flush can + * not free up enough memory to release the stall lock. + */ + while (netBytes > limit && (netBytes - flushBytes) < limit) { if (sync.trySetStalled()) { assert wasStalled = true; return; @@ -125,6 +135,7 @@ final class DocumentsWriterStallControl { static interface MemoryController { long netBytes(); + long flushBytes(); long stallLimitBytes(); } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java index 4605213..bb9d3f4 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java @@ -40,6 +40,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { SimpleMemCtrl memCtrl = new SimpleMemCtrl(); memCtrl.limit = 1000; memCtrl.netBytes = 1000; + memCtrl.flushBytes = 20; ctrl.updateStalled(memCtrl); Thread[] waitThreads = waitThreads(atLeast(1), ctrl); start(waitThreads); @@ -49,6 +50,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { // now stall threads and wake them up again memCtrl.netBytes = 1001; + memCtrl.flushBytes = 100; ctrl.updateStalled(memCtrl); waitThreads = waitThreads(atLeast(1), ctrl); start(waitThreads); @@ -56,6 +58,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { assertTrue(ctrl.hasBlocked()); assertTrue(ctrl.anyStalledThreads()); memCtrl.netBytes = 50; + memCtrl.flushBytes = 0; ctrl.updateStalled(memCtrl); assertFalse(ctrl.anyStalledThreads()); join(waitThreads, 500); @@ -76,9 +79,12 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { SimpleMemCtrl memCtrl = new SimpleMemCtrl(); memCtrl.limit = 1000; memCtrl.netBytes = 1; + memCtrl.flushBytes = 0; + int iters = atLeast(1000); for (int j = 0; j < iters; j++) { memCtrl.netBytes = baseBytes + random().nextInt(1000); + memCtrl.flushBytes = random().nextInt((int)memCtrl.netBytes); ctrl.updateStalled(memCtrl); if (random().nextInt(5) == 0) { // thread 0 only updates ctrl.waitIfStalled(); @@ -112,6 +118,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { SimpleMemCtrl memCtrl = new SimpleMemCtrl(); memCtrl.limit = 1000; memCtrl.netBytes = 1; + memCtrl.flushBytes = 0; ctrl.updateStalled(memCtrl); final AtomicBoolean stop = new AtomicBoolean(false); final AtomicBoolean checkPoint = new AtomicBoolean(true); @@ -173,6 +180,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { stop.set(true); memCtrl.limit = 1000; memCtrl.netBytes = 1; + memCtrl.flushBytes = 0; ctrl.updateStalled(memCtrl); if (checkPoint.get()) { latches[1].countDown(); @@ -253,6 +261,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { SimpleMemCtrl memCtrl = new SimpleMemCtrl(); memCtrl.limit = 1000; memCtrl.netBytes = release ? 1 : 2000; + memCtrl.flushBytes = random().nextInt((int)memCtrl.netBytes); while (!stop.get()) { int internalIters = release && random().nextBoolean() ? atLeast(5) : 1; for (int i = 0; i < internalIters; i++) { @@ -338,6 +347,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { private static class SimpleMemCtrl implements MemoryController { long netBytes; long limit; + long flushBytes; @Override public long netBytes() { @@ -348,6 +358,11 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase { public long stallLimitBytes() { return limit; } + + @Override + public long flushBytes() { + return flushBytes; + } } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestNRTThreads.java b/lucene/core/src/test/org/apache/lucene/index/TestNRTThreads.java index e5b74f1..e9af7cb 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestNRTThreads.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestNRTThreads.java @@ -36,7 +36,7 @@ public class TestNRTThreads extends ThreadedIndexingAndSearchingTestCase { boolean anyOpenDelFiles = false; - DirectoryReader r = IndexReader.open(writer, true); + DirectoryReader r = DirectoryReader.open(writer, true); while (System.currentTimeMillis() < stopTime && !failed.get()) { if (random().nextBoolean()) { @@ -63,7 +63,7 @@ public class TestNRTThreads extends ThreadedIndexingAndSearchingTestCase { if (VERBOSE) { System.out.println("TEST: now open"); } - r = IndexReader.open(writer, true); + r = DirectoryReader.open(writer, true); } if (VERBOSE) { System.out.println("TEST: got new reader=" + r); @@ -110,7 +110,7 @@ public class TestNRTThreads extends ThreadedIndexingAndSearchingTestCase { r2 = writer.getReader(); } else { writer.commit(); - r2 = IndexReader.open(dir); + r2 = DirectoryReader.open(dir); } return newSearcher(r2); }