Index: lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMerging.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMerging.java (revision 1590316) +++ lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMerging.java (working copy) @@ -16,7 +16,6 @@ */ import java.io.IOException; -import java.util.ArrayList; import java.util.Random; import java.util.concurrent.atomic.AtomicReference; @@ -329,6 +328,11 @@ @Override public void close() {} + + @Override + public int getMaxMergeCount() { + return 1; + } } // LUCENE-1013 Index: lucene/core/src/test/org/apache/lucene/TestMergeSchedulerExternal.java =================================================================== --- lucene/core/src/test/org/apache/lucene/TestMergeSchedulerExternal.java (revision 1590316) +++ lucene/core/src/test/org/apache/lucene/TestMergeSchedulerExternal.java (working copy) @@ -125,8 +125,12 @@ } @Override + public int getMaxMergeCount() { + return 1; + } + + @Override public void close() throws IOException {} - } public void testCustomMergeScheduler() throws Exception { Index: lucene/core/src/java/org/apache/lucene/index/SerialMergeScheduler.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/SerialMergeScheduler.java (revision 1590316) +++ lucene/core/src/java/org/apache/lucene/index/SerialMergeScheduler.java (working copy) @@ -19,25 +19,44 @@ import java.io.IOException; -/** A {@link MergeScheduler} that simply does each merge - * sequentially, using the current thread. */ +/** A {@link MergeScheduler} that does each merge + * using the current (indexing) thread. + * + *

This merge scheduler allows only one merge to run at a + * time, and if a new merge needs to kick off while + * one is already running, the thread will block by default + * until the first merge completes. */ + public class SerialMergeScheduler extends MergeScheduler { /** Sole constructor. */ public SerialMergeScheduler() { } - /** Just do the merges in sequence. We do this - * "synchronized" so that even if the application is using - * multiple threads, only one merge may run at a time. */ @Override - synchronized public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { + public int getMaxMergeCount() { + return 1; + } - while(true) { + /** Just do the merges in sequence, only allowing one + * incoming indexing thread to be merging at once. */ + @Override + public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { + + while (true) { + + maybeStall(writer); + MergePolicy.OneMerge merge = writer.getNextMerge(); - if (merge == null) + if (merge == null) { break; - writer.merge(merge); + } + + try { + writer.merge(merge); + } finally { + mergeFinished(); + } } } Index: lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java (revision 1590316) +++ lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java (working copy) @@ -20,6 +20,8 @@ import java.io.Closeable; import java.io.IOException; +import org.apache.lucene.util.ThreadInterruptedException; + /**

Expert: {@link IndexWriter} uses an instance * implementing this interface to execute the merges * selected by a {@link MergePolicy}. The default @@ -47,6 +49,53 @@ @Override public abstract void close() throws IOException; + /** The maximum count of current merges allowed to run at + * once before indexing threads that are producing + * segments are stalled by {@link #maybeStall}. */ + public abstract int getMaxMergeCount(); + + /** Subclass calls this before merging. This default + * implementation checks if there are too many merges + * running, and if so, it stalls the incoming thread + * until the merges catch up. This is a simple but + * effective denial-of-service/adversary protection, to + * ensure the threads creating new segments (commit, + * getReader, flushing due to triggers) can't get ahead + * of merging. Applications can override this to perform + * alternative throttling solutions. */ + protected synchronized void maybeStall(IndexWriter writer) { + + long startStallTime = 0; + + while (writer.getRunningMergeCount() >= getMaxMergeCount() && writer.hasPendingMerges()) { + // The number of running merges is too high, and at + // least one new merge is pending, so we stall the + // current (segment-creating) thread: + if (startStallTime == 0) { + startStallTime = System.currentTimeMillis(); + if (writer.infoStream != null && writer.infoStream.isEnabled("MS")) { + writer.infoStream.message("MS", "too many merges (" + writer.getRunningMergeCount() + " vs max=" + getMaxMergeCount() + "); stalling current thread..."); + } + } + + try { + wait(); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + } + + if (startStallTime != 0 && writer.infoStream != null && writer.infoStream.isEnabled("MS")) { + writer.infoStream.message("MS", "stalled for " + (System.currentTimeMillis()-startStallTime) + " msec"); + } + } + + /** Subclass must call this after finishing each merge. */ + protected synchronized void mergeFinished() { + // Wakes up any threads that stalled + notifyAll(); + } + @Override public MergeScheduler clone() { try { Index: lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (revision 1590316) +++ lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (working copy) @@ -121,6 +121,7 @@ } /** See {@link #setMaxMergesAndThreads}. */ + @Override public int getMaxMergeCount() { return maxMergeCount; } @@ -141,8 +142,9 @@ * Thread.MAX_PRIORITY-maxThreadCount, so that CMS has * room to set relative priority among threads. */ public synchronized void setMergeThreadPriority(int pri) { - if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY) + if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY) { throw new IllegalArgumentException("priority must be in range " + Thread.MIN_PRIORITY + " .. " + Thread.MAX_PRIORITY + " inclusive"); + } mergeThreadPriority = pri; updateMergeThreads(); } @@ -202,16 +204,14 @@ // pause the thread if maxThreadCount is smaller than the number of merge threads. final boolean doPause = threadIdx < activeMergeCount - maxThreadCount; - if (verbose()) { - if (doPause != merge.getPause()) { + if (doPause != merge.getPause()) { + if (verbose()) { if (doPause) { message("pause thread " + mergeThread.getName()); } else { message("unpause thread " + mergeThread.getName()); } } - } - if (doPause != merge.getPause()) { merge.setPause(doPause); } @@ -252,8 +252,9 @@ // Default to slightly higher priority than our // calling thread mergeThreadPriority = 1+Thread.currentThread().getPriority(); - if (mergeThreadPriority > Thread.MAX_PRIORITY) + if (mergeThreadPriority > Thread.MAX_PRIORITY) { mergeThreadPriority = Thread.MAX_PRIORITY; + } } } @@ -289,7 +290,9 @@ } } finally { // finally, restore interrupt status: - if (interrupted) Thread.currentThread().interrupt(); + if (interrupted) { + Thread.currentThread().interrupt(); + } } } @@ -334,34 +337,8 @@ // pending merges, until it's empty: while (true) { - long startStallTime = 0; - while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) { - // This means merging has fallen too far behind: we - // have already created maxMergeCount threads, and - // now there's at least one more merge pending. - // Note that only maxThreadCount of - // those created merge threads will actually be - // running; the rest will be paused (see - // updateMergeThreads). We stall this producer - // thread to prevent creation of new segments, - // until merging has caught up: - startStallTime = System.currentTimeMillis(); - if (verbose()) { - message(" too many merges; stalling..."); - } - try { - wait(); - } catch (InterruptedException ie) { - throw new ThreadInterruptedException(ie); - } - } + maybeStall(writer); - if (verbose()) { - if (startStallTime != 0) { - message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec"); - } - } - MergePolicy.OneMerge merge = writer.getNextMerge(); if (merge == null) { if (verbose()) { @@ -369,7 +346,7 @@ } return; } - + boolean success = false; try { if (verbose()) { @@ -481,17 +458,18 @@ setRunningMerge(merge); doMerge(merge); + // Notify here in case any threads were stalled by + // tooManyMerges; they will notice that the + // pending merge has been pulled and possibly + // resume: + synchronized(ConcurrentMergeScheduler.this) { + ConcurrentMergeScheduler.this.mergeFinished(); + } + // Subsequent times through the loop we do any new // merge that writer says is necessary: merge = tWriter.getNextMerge(); - // Notify here in case any threads were stalled; - // they will notice that the pending merge has - // been pulled and possibly resume: - synchronized(ConcurrentMergeScheduler.this) { - ConcurrentMergeScheduler.this.notifyAll(); - } - if (merge != null) { updateMergeThreads(); if (verbose()) { @@ -509,20 +487,16 @@ } catch (Throwable exc) { // Ignore the exception if it was due to abort: - if (!(exc instanceof MergePolicy.MergeAbortedException)) { - //System.out.println(Thread.currentThread().getName() + ": CMS: exc"); - //exc.printStackTrace(System.out); - if (!suppressExceptions) { - // suppressExceptions is normally only set during - // testing. - handleMergeException(exc); - } + if ((exc instanceof MergePolicy.MergeAbortedException) == false && suppressExceptions == false) { + // suppressExceptions is normally only set during + // testing. + handleMergeException(exc); } } finally { done = true; synchronized(ConcurrentMergeScheduler.this) { updateMergeThreads(); - ConcurrentMergeScheduler.this.notifyAll(); + ConcurrentMergeScheduler.this.mergeFinished(); } } } Index: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (revision 1590316) +++ lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -1951,6 +1951,11 @@ } } + /** Returns number of merges currently running. */ + public synchronized int getRunningMergeCount() { + return runningMerges.size(); + } + /** * Expert: returns true if there are merges waiting to be scheduled. * Index: lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java (revision 1590316) +++ lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java (working copy) @@ -43,8 +43,12 @@ public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) {} @Override + public int getMaxMergeCount() { + return 0; + } + + @Override public MergeScheduler clone() { return this; } - }