diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index 0c7cf74..bb57bb0 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -26,6 +26,7 @@ import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; +import org.apache.lucene.index.DocumentsWriterStallControl.MemoryController; import org.apache.lucene.util.ThreadInterruptedException; /** @@ -40,7 +41,7 @@ import org.apache.lucene.util.ThreadInterruptedException; * {@link IndexWriterConfig#getRAMPerThreadHardLimitMB()} to prevent address * space exhaustion. */ -final class DocumentsWriterFlushControl { +final class DocumentsWriterFlushControl implements MemoryController { private final long hardMaxBytesPerDWPT; private long activeBytes = 0; @@ -88,7 +89,7 @@ final class DocumentsWriterFlushControl { return flushBytes + activeBytes; } - long stallLimitBytes() { + public long stallLimitBytes() { final double maxRamMB = config.getRAMBufferSizeMB(); return maxRamMB != IndexWriterConfig.DISABLE_AUTO_FLUSH ? (long)(2 * (maxRamMB * 1024 * 1024)) : Long.MAX_VALUE; } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java index d61cc6d..40c2cf3 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java @@ -1,6 +1,6 @@ package org.apache.lucene.index; -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -19,6 +19,7 @@ package org.apache.lucene.index; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; +import org.apache.lucene.util.ThreadInterruptedException; /** * Controls the health status of a {@link DocumentsWriter} sessions. This class @@ -55,11 +56,11 @@ final class DocumentsWriterStallControl { boolean tryReset() { final int oldState = getState(); - if (oldState == 0) + if (oldState == 0) { return true; + } if (compareAndSetState(oldState, 0)) { - releaseShared(0); - return true; + return releaseShared(0); } return false; } @@ -97,11 +98,11 @@ final class DocumentsWriterStallControl { * {@link DocumentsWriterStallControl} to healthy and release all threads waiting on * {@link #waitIfStalled()} */ - void updateStalled(DocumentsWriterFlushControl flushControl) { + void updateStalled(MemoryController controller) { do { // if we have more flushing / blocked DWPT than numActiveDWPT we stall! // don't stall if we have queued flushes - threads should be hijacked instead - while (flushControl.netBytes() > flushControl.stallLimitBytes()) { + while (controller.netBytes() > controller.stallLimitBytes()) { if (sync.trySetStalled()) { assert wasStalled = true; return; @@ -111,10 +112,19 @@ final class DocumentsWriterStallControl { } void waitIfStalled() { - sync.acquireShared(0); + try { + sync.acquireSharedInterruptibly(0); + } catch (InterruptedException e) { + throw new ThreadInterruptedException(e); + } } boolean hasBlocked() { // for tests return sync.hasBlockedThreads; } + + static interface MemoryController { + long netBytes(); + long stallLimitBytes(); + } } \ No newline at end of file