Index: lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java =================================================================== --- lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java (revision 1590385) +++ lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java (working copy) @@ -865,9 +865,7 @@ } else if (rarely(r)) { int maxThreadCount = TestUtil.nextInt(random(), 1, 4); int maxMergeCount = TestUtil.nextInt(random(), maxThreadCount, maxThreadCount + 4); - ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); - cms.setMaxMergesAndThreads(maxMergeCount, maxThreadCount); - c.setMergeScheduler(cms); + c.setMergeScheduler(new ConcurrentMergeScheduler(maxMergeCount, maxThreadCount)); } if (r.nextBoolean()) { if (rarely(r)) { Index: lucene/test-framework/src/java/org/apache/lucene/util/TestUtil.java =================================================================== --- lucene/test-framework/src/java/org/apache/lucene/util/TestUtil.java (revision 1590385) +++ lucene/test-framework/src/java/org/apache/lucene/util/TestUtil.java (working copy) @@ -767,11 +767,6 @@ tmp.setSegmentsPerTier(Math.min(5, tmp.getSegmentsPerTier())); tmp.setNoCFSRatio(1.0); } - MergeScheduler ms = w.getConfig().getMergeScheduler(); - if (ms instanceof ConcurrentMergeScheduler) { - // wtf... shouldnt it be even lower since its 1 by default?!?! - ((ConcurrentMergeScheduler) ms).setMaxMergesAndThreads(3, 2); - } } /** Checks some basic behaviour of an AttributeImpl Index: lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java =================================================================== --- lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java (revision 1590385) +++ lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java (working copy) @@ -110,19 +110,15 @@ "org.apache.lucene.index.ConcurrentMergeScheduler"); if (mergeScheduler.equals(NoMergeScheduler.class.getName())) { iwConf.setMergeScheduler(NoMergeScheduler.INSTANCE); + } else if (mergeScheduler.equals(ConcurrentMergeScheduler.class.getName())) { + iwConf.setMergeScheduler(new ConcurrentMergeScheduler(config.get("concurrent.merge.scheduler.max.merge.count", ConcurrentMergeScheduler.DEFAULT_MAX_MERGE_COUNT), + config.get("concurrent.merge.scheduler.max.thread.count", ConcurrentMergeScheduler.DEFAULT_MAX_THREAD_COUNT))); } else { try { iwConf.setMergeScheduler(Class.forName(mergeScheduler).asSubclass(MergeScheduler.class).newInstance()); } catch (Exception e) { throw new RuntimeException("unable to instantiate class '" + mergeScheduler + "' as merge scheduler", e); } - - if (mergeScheduler.equals("org.apache.lucene.index.ConcurrentMergeScheduler")) { - ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) iwConf.getMergeScheduler(); - int maxThreadCount = config.get("concurrent.merge.scheduler.max.thread.count", ConcurrentMergeScheduler.DEFAULT_MAX_THREAD_COUNT); - int maxMergeCount = config.get("concurrent.merge.scheduler.max.merge.count", ConcurrentMergeScheduler.DEFAULT_MAX_MERGE_COUNT); - cms.setMaxMergesAndThreads(maxMergeCount, maxThreadCount); - } } final String defaultCodec = config.get("default.codec", null); Index: lucene/core/src/test/org/apache/lucene/index/TestNoMergeScheduler.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestNoMergeScheduler.java (revision 1590385) +++ lucene/core/src/test/org/apache/lucene/index/TestNoMergeScheduler.java (working copy) @@ -54,7 +54,7 @@ // context, including ones from Object. So just filter out Object. If in // the future MergeScheduler will extend a different class than Object, // this will need to change. - if (m.getDeclaringClass() != Object.class) { + if (m.getDeclaringClass() != Object.class && (Modifier.isFinal(m.getModifiers()) == false)) { assertTrue(m + " is not overridden !", m.getDeclaringClass() == NoMergeScheduler.class); } } Index: lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java (revision 1590385) +++ lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java (working copy) @@ -2479,7 +2479,7 @@ iwc.setMergeScheduler(new ConcurrentMergeScheduler() { @Override - public void doMerge(MergePolicy.OneMerge merge) throws IOException { + public void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { mergeStarted.countDown(); try { closeStarted.await(); @@ -2487,7 +2487,7 @@ Thread.currentThread().interrupt(); throw new RuntimeException(ie); } - super.doMerge(merge); + super.doMerge(writer, merge); } @Override Index: lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMerging.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMerging.java (revision 1590385) +++ lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMerging.java (working copy) @@ -16,7 +16,6 @@ */ import java.io.IOException; -import java.util.ArrayList; import java.util.Random; import java.util.concurrent.atomic.AtomicReference; @@ -312,6 +311,10 @@ // Just intercepts all merges & verifies that we are never // merging a segment with >= 20 (maxMergeDocs) docs private class MyMergeScheduler extends MergeScheduler { + public MyMergeScheduler() { + super(1); + } + @Override synchronized public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { Index: lucene/core/src/test/org/apache/lucene/index/TestSerialMergeScheduler.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestSerialMergeScheduler.java (revision 0) +++ lucene/core/src/test/org/apache/lucene/index/TestSerialMergeScheduler.java (working copy) @@ -0,0 +1,106 @@ +package org.apache.lucene.index; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.codecs.lucene41.Lucene41PostingsFormat; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.IndexWriterConfig.OpenMode; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MockDirectoryWrapper; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.TestUtil; + +public class TestSerialMergeScheduler extends LuceneTestCase { + + // Just counts total and in-flight merges, asserting that + // at most 1 merge runs at once: + private class MergeCountingIndexWriter extends IndexWriter { + + public final AtomicInteger totalMergeCount = new AtomicInteger(); + + private final AtomicInteger runningMergeCount = new AtomicInteger(); + + public MergeCountingIndexWriter(Directory dir, IndexWriterConfig iwc) throws IOException { + super(dir, iwc); + } + + @Override + public void merge(MergePolicy.OneMerge merge) throws IOException { + totalMergeCount.incrementAndGet(); + int count = runningMergeCount.incrementAndGet(); + assertTrue(count + " merges running", count <= 1); + try { + super.merge(merge); + } finally { + runningMergeCount.decrementAndGet(); + } + } + } + + public void testOnlyOneMergeAtOnce() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + iwc.setMaxBufferedDocs(2); + iwc.setMergeScheduler(new SerialMergeScheduler()); + TieredMergePolicy tmp = new TieredMergePolicy(); + tmp.setSegmentsPerTier(2.0); + iwc.setMergePolicy(tmp); + + final MergeCountingIndexWriter writer = new MergeCountingIndexWriter(dir, iwc); + + final CountDownLatch startingGun = new CountDownLatch(1); + + Thread[] threads = new Thread[4]; + for(int i=0;i Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY) + public void setMergeThreadPriority(int pri) { + if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY) { throw new IllegalArgumentException("priority must be in range " + Thread.MIN_PRIORITY + " .. " + Thread.MAX_PRIORITY + " inclusive"); + } mergeThreadPriority = pri; updateMergeThreads(); } - /** Sorts {@link MergeThread}s; larger merges come first. */ - protected static final Comparator compareByMergeDocCount = new Comparator() { - @Override - public int compare(MergeThread t1, MergeThread t2) { - final MergePolicy.OneMerge m1 = t1.getCurrentMerge(); - final MergePolicy.OneMerge m2 = t2.getCurrentMerge(); - - final int c1 = m1 == null ? Integer.MAX_VALUE : m1.totalDocCount; - final int c2 = m2 == null ? Integer.MAX_VALUE : m2.totalDocCount; + private void finishMergeThread(MergeThread thread) { + mergeFinished(); + mergeThreads.remove(thread); + updateMergeThreads(); + } - return c2 - c1; - } - }; - /** * Called whenever the running merges have changed, to pause & unpause * threads. This method sorts the merge threads by their merge size in @@ -173,45 +152,39 @@ // process of stopping (ie have an active merge): final List activeMerges = new ArrayList<>(); - int threadIdx = 0; - while (threadIdx < mergeThreads.size()) { - final MergeThread mergeThread = mergeThreads.get(threadIdx); - if (!mergeThread.isAlive()) { - // Prune any dead threads - mergeThreads.remove(threadIdx); - continue; + synchronized (mergeThreads) { + Iterator it = mergeThreads.iterator(); + while (it.hasNext()) { + final MergeThread mergeThread = it.next(); + if (mergeThread.isAlive() == false) { + // Prune any dead threads + it.remove(); + } else { + activeMerges.add(mergeThread); + } } - if (mergeThread.getCurrentMerge() != null) { - activeMerges.add(mergeThread); - } - threadIdx++; } - // Sort the merge threads in descending order. - CollectionUtil.timSort(activeMerges, compareByMergeDocCount); + // Sort the merge threads in descending order by doc count: + CollectionUtil.timSort(activeMerges); int pri = mergeThreadPriority; final int activeMergeCount = activeMerges.size(); - for (threadIdx=0;threadIdx - * if (verbose()) { - * message("your message"); - * } - * - */ - protected boolean verbose() { - return writer != null && writer.infoStream.isEnabled("CMS"); - } - - /** - * Outputs the given message - this method assumes {@link #verbose()} was - * called and returned true. - */ - protected void message(String message) { - writer.infoStream.message("CMS", message); - } - - private synchronized void initMergeThreadPriority() { + private void initMergeThreadPriority() { if (mergeThreadPriority == -1) { // Default to slightly higher priority than our // calling thread mergeThreadPriority = 1+Thread.currentThread().getPriority(); - if (mergeThreadPriority > Thread.MAX_PRIORITY) + if (mergeThreadPriority > Thread.MAX_PRIORITY) { mergeThreadPriority = Thread.MAX_PRIORITY; + } } } @@ -268,12 +220,10 @@ try { while (true) { MergeThread toSync = null; - synchronized (this) { - for (MergeThread t : mergeThreads) { - if (t.isAlive()) { - toSync = t; - break; - } + for (MergeThread t : mergeThreads) { + if (t.isAlive()) { + toSync = t; + break; } } if (toSync != null) { @@ -289,124 +239,77 @@ } } finally { // finally, restore interrupt status: - if (interrupted) Thread.currentThread().interrupt(); - } - } - - /** - * Returns the number of merge threads that are alive. Note that this number - * is ≤ {@link #mergeThreads} size. - */ - protected synchronized int mergeThreadCount() { - int count = 0; - for (MergeThread mt : mergeThreads) { - if (mt.isAlive() && mt.getCurrentMerge() != null) { - count++; + if (interrupted) { + Thread.currentThread().interrupt(); } } - return count; } @Override - public synchronized void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { + public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { assert !Thread.holdsLock(writer); - this.writer = writer; - initMergeThreadPriority(); - dir = writer.getDirectory(); - - // First, quickly run through the newly proposed merges - // and add any orthogonal merges (ie a merge not - // involving segments already pending to be merged) to - // the queue. If we are way behind on merging, many of - // these newly proposed merges will likely already be - // registered. - if (verbose()) { message("now merge"); - message(" index: " + writer.segString()); + message("index: " + writer.segString()); } // Iterate, pulling from the IndexWriter's queue of // pending merges, until it's empty: while (true) { - long startStallTime = 0; - while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) { - // This means merging has fallen too far behind: we - // have already created maxMergeCount threads, and - // now there's at least one more merge pending. - // Note that only maxThreadCount of - // those created merge threads will actually be - // running; the rest will be paused (see - // updateMergeThreads). We stall this producer - // thread to prevent creation of new segments, - // until merging has caught up: - startStallTime = System.currentTimeMillis(); - if (verbose()) { - message(" too many merges; stalling..."); - } - try { - wait(); - } catch (InterruptedException ie) { - throw new ThreadInterruptedException(ie); - } - } - - if (verbose()) { - if (startStallTime != 0) { - message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec"); - } - } - MergePolicy.OneMerge merge = writer.getNextMerge(); if (merge == null) { if (verbose()) { - message(" no more merges pending; now return"); + message("no more merges pending; now return"); } return; } + maybeStall(writer); + boolean success = false; try { if (verbose()) { - message(" consider merge " + writer.segString(merge.segments)); + message("run merge " + writer.segString(merge.segments)); } // OK to spawn a new merge thread to handle this // merge: - final MergeThread merger = getMergeThread(writer, merge); + MergeThread merger = getMergeThread(writer, merge); mergeThreads.add(merger); if (verbose()) { - message(" launch new thread [" + merger.getName() + "]"); + message("launch new thread [" + merger.getName() + "]"); } merger.start(); - // Must call this after starting the thread else - // the new thread is removed from mergeThreads - // (since it's not alive yet): - updateMergeThreads(); + success = true; - success = true; } finally { if (!success) { + mergeFinished(); writer.mergeFinish(merge); } } + + // Must call this after starting the thread else + // the new thread is removed from mergeThreads + // (since it's not alive yet): + updateMergeThreads(); } } /** Does the actual merge, by calling {@link IndexWriter#merge} */ - protected void doMerge(MergePolicy.OneMerge merge) throws IOException { + protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { writer.merge(merge); } /** Create and return a new MergeThread */ - protected synchronized MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { + protected MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { final MergeThread thread = new MergeThread(writer, merge); thread.setThreadPriority(mergeThreadPriority); thread.setDaemon(true); @@ -416,41 +319,17 @@ /** Runs a merge thread, which may run one or more merges * in sequence. */ - protected class MergeThread extends Thread { + protected class MergeThread extends Thread implements Comparable { - IndexWriter tWriter; - MergePolicy.OneMerge startMerge; - MergePolicy.OneMerge runningMerge; - private volatile boolean done; + final MergePolicy.OneMerge merge; + final IndexWriter writer; /** Sole constructor. */ - public MergeThread(IndexWriter writer, MergePolicy.OneMerge startMerge) { - this.tWriter = writer; - this.startMerge = startMerge; + public MergeThread(IndexWriter writer, MergePolicy.OneMerge merge) { + this.writer = writer; + this.merge = merge; } - /** Record the currently running merge. */ - public synchronized void setRunningMerge(MergePolicy.OneMerge merge) { - runningMerge = merge; - } - - /** Return the currently running merge. */ - public synchronized MergePolicy.OneMerge getRunningMerge() { - return runningMerge; - } - - /** Return the current merge, or null if this {@code - * MergeThread} is done. */ - public synchronized MergePolicy.OneMerge getCurrentMerge() { - if (done) { - return null; - } else if (runningMerge != null) { - return runningMerge; - } else { - return startMerge; - } - } - /** Set the priority of this thread. */ public void setThreadPriority(int pri) { try { @@ -464,65 +343,47 @@ } } + /** Sorts by descending doc count */ @Override + public int compareTo(MergeThread other) { + return other.merge.totalDocCount - merge.totalDocCount; + } + + @Override public void run() { - - // First time through the while loop we do the merge - // that we were started with: - MergePolicy.OneMerge merge = this.startMerge; - - try { - if (verbose()) { - message(" merge thread: start"); - } + boolean aborted = false; - while(true) { - setRunningMerge(merge); - doMerge(merge); - - // Subsequent times through the loop we do any new - // merge that writer says is necessary: - merge = tWriter.getNextMerge(); - - // Notify here in case any threads were stalled; - // they will notice that the pending merge has - // been pulled and possibly resume: - synchronized(ConcurrentMergeScheduler.this) { - ConcurrentMergeScheduler.this.notifyAll(); - } - - if (merge != null) { - updateMergeThreads(); - if (verbose()) { - message(" merge thread: do another merge " + tWriter.segString(merge.segments)); - } - } else { - break; - } - } - + try { if (verbose()) { - message(" merge thread: done"); + message("merge thread: start"); } - + doMerge(writer, merge); } catch (Throwable exc) { - // Ignore the exception if it was due to abort: - if (!(exc instanceof MergePolicy.MergeAbortedException)) { - //System.out.println(Thread.currentThread().getName() + ": CMS: exc"); - //exc.printStackTrace(System.out); - if (!suppressExceptions) { - // suppressExceptions is normally only set during - // testing. - handleMergeException(exc); - } + if (exc instanceof MergePolicy.MergeAbortedException) { + aborted = true; + } else if (suppressExceptions == false) { + // suppressExceptions is normally only set during + // testing. + handleMergeException(merge, exc); } } finally { - done = true; - synchronized(ConcurrentMergeScheduler.this) { - updateMergeThreads(); - ConcurrentMergeScheduler.this.notifyAll(); + finishMergeThread(this); + } + + if (verbose()) { + message("merge thread: done"); + } + + // Let CMS run new merges if necessary: + if (aborted == false) { + try { + merge(writer, MergeTrigger.MERGE_FINISHED, true); + } catch (AlreadyClosedException ace) { + // OK + } catch (IOException ioe) { + throw new RuntimeException(ioe); } } } @@ -530,7 +391,7 @@ /** Called when an exception is hit in a background merge * thread */ - protected void handleMergeException(Throwable exc) { + protected void handleMergeException(MergePolicy.OneMerge merge, Throwable exc) { try { // When an exception is hit during merge, IndexWriter // removes any partial files and then allows another @@ -542,7 +403,7 @@ } catch (InterruptedException ie) { throw new ThreadInterruptedException(ie); } - throw new MergePolicy.MergeException(exc, dir); + throw new MergePolicy.MergeException(exc, merge.info.info.dir); } private boolean suppressExceptions; @@ -561,7 +422,7 @@ public String toString() { StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": "); sb.append("maxThreadCount=").append(maxThreadCount).append(", "); - sb.append("maxMergeCount=").append(maxMergeCount).append(", "); + sb.append("maxMergeCount=").append(maxMergeCount).append(", "); sb.append("mergeThreadPriority=").append(mergeThreadPriority); return sb.toString(); } @@ -569,9 +430,8 @@ @Override public MergeScheduler clone() { ConcurrentMergeScheduler clone = (ConcurrentMergeScheduler) super.clone(); - clone.writer = null; - clone.dir = null; clone.mergeThreads = new ArrayList<>(); + clone.mergeThreadPriority = -1; return clone; } } Index: lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java (revision 1590385) +++ lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java (working copy) @@ -32,8 +32,9 @@ /** The single instance of {@link NoMergeScheduler} */ public static final MergeScheduler INSTANCE = new NoMergeScheduler(); + // prevent instantiation private NoMergeScheduler() { - // prevent instantiation + super(0); } @Override @@ -46,5 +47,4 @@ public MergeScheduler clone() { return this; } - } Index: lucene/core/src/java/org/apache/lucene/index/SerialMergeScheduler.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/SerialMergeScheduler.java (revision 1590385) +++ lucene/core/src/java/org/apache/lucene/index/SerialMergeScheduler.java (working copy) @@ -19,25 +19,47 @@ import java.io.IOException; -/** A {@link MergeScheduler} that simply does each merge - * sequentially, using the current thread. */ +/** A {@link MergeScheduler} that does each merge + * using the current (indexing) thread. + * + *

This merge scheduler allows only one merge to run at a + * time, and if a new merge needs to kick off while + * one is already running, the thread will block by default + * until the first merge completes. */ + public class SerialMergeScheduler extends MergeScheduler { /** Sole constructor. */ public SerialMergeScheduler() { + super(1); } - /** Just do the merges in sequence. We do this - * "synchronized" so that even if the application is using - * multiple threads, only one merge may run at a time. */ + /** Just do the merges in sequence, only allowing one + * incoming indexing thread to be merging at once. */ @Override - synchronized public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { + public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { - while(true) { + while (true) { + MergePolicy.OneMerge merge = writer.getNextMerge(); - if (merge == null) + if (merge == null) { + if (verbose()) { + message("no more merges pending; now return"); + } break; - writer.merge(merge); + } + + maybeStall(writer); + + if (verbose()) { + message("run merge=" + writer.segString(merge.segments)); + } + + try { + writer.merge(merge); + } finally { + mergeFinished(); + } } } Index: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (revision 1590385) +++ lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -704,6 +704,7 @@ mergePolicy = config.getMergePolicy(); mergePolicy.setIndexWriter(this); mergeScheduler = config.getMergeScheduler(); + mergeScheduler.setInfoStream(infoStream); codec = config.getCodec(); bufferedUpdatesStream = new BufferedUpdatesStream(infoStream); @@ -1951,13 +1952,9 @@ } } - /** - * Expert: returns true if there are merges waiting to be scheduled. - * - * @lucene.experimental - */ - public synchronized boolean hasPendingMerges() { - return pendingMerges.size() != 0; + /** Returns number of merges currently running. */ + public synchronized int getRunningMergeCount() { + return runningMerges.size(); } /** @@ -3600,9 +3597,11 @@ } } } + } catch (OutOfMemoryError oom) { handleOOM(oom, "merge"); } + if (merge.info != null && !merge.isAborted()) { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "merge time " + (System.currentTimeMillis()-t0) + " msec for " + merge.info.info.getDocCount() + " docs"); Index: lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java (revision 1590385) +++ lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java (working copy) @@ -19,7 +19,11 @@ import java.io.Closeable; import java.io.IOException; +import java.util.concurrent.Semaphore; +import org.apache.lucene.util.ThreadInterruptedException; +import org.apache.lucene.util.InfoStream; + /**

Expert: {@link IndexWriter} uses an instance * implementing this interface to execute the merges * selected by a {@link MergePolicy}. The default @@ -31,9 +35,15 @@ */ public abstract class MergeScheduler implements Closeable, Cloneable { + private Semaphore permits; + private InfoStream infoStream; + protected final int maxMergeCount; + /** Sole constructor. (For invocation by subclass * constructors, typically implicit.) */ - protected MergeScheduler() { + protected MergeScheduler(int maxMergeCount) { + this.maxMergeCount = maxMergeCount; + this.permits = new Semaphore(maxMergeCount, true); } /** Run the merges provided by {@link IndexWriter#getNextMerge()}. @@ -47,10 +57,81 @@ @Override public abstract void close() throws IOException; + /** The maximum count of current merges allowed to run at + * once before indexing threads that are producing + * segments are stalled by {@link #maybeStall}. */ + public final int getMaxMergeCount() { + return maxMergeCount; + } + + /** IndexWriter calls this on init. */ + final void setInfoStream(InfoStream infoStream) { + this.infoStream = infoStream; + } + + /** + * Returns true if verbosing is enabled. This method is usually used in + * conjunction with {@link #message(String)}, like that: + * + *

+   * if (verbose()) {
+   *   message("your message");
+   * }
+   * 
+ */ + protected boolean verbose() { + return infoStream != null && infoStream.isEnabled("MS"); + } + + /** + * Outputs the given message - this method assumes {@link #verbose()} was + * called and returned true. + */ + protected void message(String message) { + infoStream.message("MS", message); + } + + + /** Subclass calls this after getting a new merge from + * IndexWriter and before merging. Be sure to also call + * {@link #mergeFinished} once the merge is done. This default + * implementation checks if there are too many merges + * running, and if so, it stalls the incoming thread + * until the merges catch up. This is a simple but + * effective denial-of-service/adversary protection, to + * ensure the threads creating new segments (commit, + * getReader, flushing due to triggers) can't get ahead + * of merging. Applications can override this to perform + * alternative throttling solutions. */ + protected void maybeStall(IndexWriter writer) { + if (permits.tryAcquire() == false) { + if (verbose()) { + message("too many merges (" + writer.getRunningMergeCount() + " vs max=" + getMaxMergeCount() + "); stalling current thread..."); + } + long start = System.currentTimeMillis(); + try { + permits.acquire(); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + if (verbose()) { + message("stalled for " + (System.currentTimeMillis()-start) + " msec"); + } + } + } + + /** Subclass must call this after finishing each merge. */ + protected void mergeFinished() { + permits.release(); + } + @Override public MergeScheduler clone() { try { - return (MergeScheduler) super.clone(); + MergeScheduler clone = (MergeScheduler) super.clone(); + clone.infoStream = null; + clone.permits = new Semaphore(clone.maxMergeCount, true); + return clone; } catch (CloneNotSupportedException e) { throw new Error(e); }