Index: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (revision 1406743) +++ lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -1891,6 +1891,15 @@ } /** + * Expert: returns true if there are merges waiting to be scheduled. + * + * @lucene.experimental + */ + public synchronized boolean hasPendingMerges() { + return pendingMerges.size() != 0; + } + + /** * Close the IndexWriter without committing * any changes that have occurred since the last commit * (or since it was opened, if commit hasn't been called). @@ -2073,7 +2082,7 @@ // they are aborted. while(runningMerges.size() > 0) { if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "now wait for " + runningMerges.size() + " running merge to abort"); + infoStream.message("IW", "now wait for " + runningMerges.size() + " running merge/s to abort"); } doWait(); } Index: lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java (revision 1406743) +++ lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java (working copy) @@ -555,7 +555,7 @@ sb.append("commit=").append(commit == null ? "null" : commit).append("\n"); sb.append("openMode=").append(getOpenMode()).append("\n"); sb.append("similarity=").append(getSimilarity().getClass().getName()).append("\n"); - sb.append("mergeScheduler=").append(getMergeScheduler().getClass().getName()).append("\n"); + sb.append("mergeScheduler=").append(getMergeScheduler()).append("\n"); sb.append("default WRITE_LOCK_TIMEOUT=").append(IndexWriterConfig.WRITE_LOCK_TIMEOUT).append("\n"); sb.append("writeLockTimeout=").append(getWriteLockTimeout()).append("\n"); sb.append("codec=").append(getCodec()).append("\n"); Index: lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (revision 1406743) +++ lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (working copy) @@ -302,7 +302,7 @@ } @Override - public void merge(IndexWriter writer) throws IOException { + public synchronized void merge(IndexWriter writer) throws IOException { assert !Thread.holdsLock(writer); @@ -328,31 +328,34 @@ // pending merges, until it's empty: while (true) { - synchronized(this) { - long startStallTime = 0; - while (mergeThreadCount() >= 1+maxMergeCount) { - startStallTime = System.currentTimeMillis(); - if (verbose()) { - message(" too many merges; stalling..."); - } - try { - wait(); - } catch (InterruptedException ie) { - throw new ThreadInterruptedException(ie); - } + long startStallTime = 0; + while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) { + // This means merging has fallen too far behind: we + // have already created maxMergeCount threads, and + // now there's at least one more merge pending. + // Note that only maxThreadCount of + // those created merge threads will actually be + // running; the rest will be paused (see + // updateMergeThreads). We stall this producer + // thread to prevent creation of new segments, + // until merging has caught up: + startStallTime = System.currentTimeMillis(); + if (verbose()) { + message(" too many merges; stalling..."); } + try { + wait(); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + } - if (verbose()) { - if (startStallTime != 0) { - message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec"); - } + if (verbose()) { + if (startStallTime != 0) { + message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec"); } } - - // TODO: we could be careful about which merges to do in - // the BG (eg maybe the "biggest" ones) vs FG, which - // merges to do first (the easiest ones?), etc. MergePolicy.OneMerge merge = writer.getNextMerge(); if (merge == null) { if (verbose()) { @@ -361,34 +364,28 @@ return; } - // We do this w/ the primary thread to keep - // deterministic assignment of segment names - writer.mergeInit(merge); - boolean success = false; try { - synchronized(this) { - if (verbose()) { - message(" consider merge " + writer.segString(merge.segments)); - } + if (verbose()) { + message(" consider merge " + writer.segString(merge.segments)); + } - // OK to spawn a new merge thread to handle this - // merge: - final MergeThread merger = getMergeThread(writer, merge); - mergeThreads.add(merger); - if (verbose()) { - message(" launch new thread [" + merger.getName() + "]"); - } + // OK to spawn a new merge thread to handle this + // merge: + final MergeThread merger = getMergeThread(writer, merge); + mergeThreads.add(merger); + if (verbose()) { + message(" launch new thread [" + merger.getName() + "]"); + } - merger.start(); + merger.start(); - // Must call this after starting the thread else - // the new thread is removed from mergeThreads - // (since it's not alive yet): - updateMergeThreads(); + // Must call this after starting the thread else + // the new thread is removed from mergeThreads + // (since it's not alive yet): + updateMergeThreads(); - success = true; - } + success = true; } finally { if (!success) { writer.mergeFinish(merge); @@ -482,7 +479,6 @@ // merge that writer says is necessary: merge = tWriter.getNextMerge(); if (merge != null) { - tWriter.mergeInit(merge); updateMergeThreads(); if (verbose()) { message(" merge thread: do another merge " + tWriter.segString(merge.segments)); @@ -546,4 +542,13 @@ void clearSuppressExceptions() { suppressExceptions = false; } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": "); + sb.append("maxThreadCount=").append(maxThreadCount).append(", "); + sb.append("maxMergeCount=").append(maxMergeCount).append(", "); + sb.append("mergeThreadPriority=").append(mergeThreadPriority); + return sb.toString(); + } } Index: lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java (revision 1406743) +++ lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java (working copy) @@ -424,7 +424,10 @@ public void testExceptionOnMergeInit() throws IOException { Directory dir = newDirectory(); IndexWriterConfig conf = newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random())) - .setMaxBufferedDocs(2).setMergeScheduler(new ConcurrentMergeScheduler()).setMergePolicy(newLogMergePolicy()); + .setMaxBufferedDocs(2).setMergePolicy(newLogMergePolicy()); + ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); + cms.setSuppressExceptions(); + conf.setMergeScheduler(cms); ((LogMergePolicy) conf.getMergePolicy()).setMergeFactor(2); MockIndexWriter3 w = new MockIndexWriter3(dir, conf); w.doFail = true; Index: lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java (revision 1406743) +++ lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java (working copy) @@ -18,14 +18,20 @@ */ import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; +import org.apache.lucene.document.TextField; import org.apache.lucene.index.IndexWriterConfig.OpenMode; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util._TestUtil; public class TestConcurrentMergeScheduler extends LuceneTestCase { @@ -245,4 +251,74 @@ directory.close(); } + + // LUCENE-4544 + public void testMaxMergeCount() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig iwc = new IndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + + final int maxMergeCount = _TestUtil.nextInt(random(), 1, 5); + final int maxMergeThreads = _TestUtil.nextInt(random(), 1, maxMergeCount); + final CountDownLatch enoughMergesWaiting = new CountDownLatch(maxMergeCount); + final AtomicInteger runningMergeCount = new AtomicInteger(0); + final AtomicBoolean failed = new AtomicBoolean(); + + if (VERBOSE) { + System.out.println("TEST: maxMergeCount=" + maxMergeCount + " maxMergeThreads=" + maxMergeThreads); + } + + ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler() { + + @Override + protected void doMerge(MergePolicy.OneMerge merge) throws IOException { + try { + // Stall all incoming merges until we see + // maxMergeCount: + int count = runningMergeCount.incrementAndGet(); + try { + assertTrue("count=" + count + " vs maxMergeCount=" + maxMergeCount, count <= maxMergeCount); + enoughMergesWaiting.countDown(); + + // Stall this merge until we see exactly + // maxMergeCount merges waiting + while (true) { + if (enoughMergesWaiting.await(10, TimeUnit.MILLISECONDS) || failed.get()) { + break; + } + } + // Then sleep a bit to give a chance for the bug + // (too many pending merges) to appear: + Thread.sleep(20); + super.doMerge(merge); + } finally { + runningMergeCount.decrementAndGet(); + } + } catch (Throwable t) { + failed.set(true); + writer.mergeFinish(merge); + throw new RuntimeException(t); + } + } + }; + cms.setMaxThreadCount(maxMergeThreads); + cms.setMaxMergeCount(maxMergeCount); + iwc.setMergeScheduler(cms); + iwc.setMaxBufferedDocs(2); + + TieredMergePolicy tmp = new TieredMergePolicy(); + iwc.setMergePolicy(tmp); + tmp.setMaxMergeAtOnce(2); + tmp.setSegmentsPerTier(2); + + IndexWriter w = new IndexWriter(dir, iwc); + Document doc = new Document(); + doc.add(newField("field", "field", TextField.TYPE_NOT_STORED)); + while(enoughMergesWaiting.getCount() != 0 && !failed.get()) { + for(int i=0;i<10;i++) { + w.addDocument(doc); + } + } + w.close(false); + dir.close(); + } }