Index: lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java (revision 1406157) +++ lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java (working copy) @@ -424,7 +424,10 @@ public void testExceptionOnMergeInit() throws IOException { Directory dir = newDirectory(); IndexWriterConfig conf = newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random())) - .setMaxBufferedDocs(2).setMergeScheduler(new ConcurrentMergeScheduler()).setMergePolicy(newLogMergePolicy()); + .setMaxBufferedDocs(2).setMergePolicy(newLogMergePolicy()); + ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); + cms.setSuppressExceptions(); + conf.setMergeScheduler(cms); ((LogMergePolicy) conf.getMergePolicy()).setMergeFactor(2); MockIndexWriter3 w = new MockIndexWriter3(dir, conf); w.doFail = true; Index: lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java (revision 1406157) +++ lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java (working copy) @@ -555,7 +555,7 @@ sb.append("commit=").append(commit == null ? "null" : commit).append("\n"); sb.append("openMode=").append(getOpenMode()).append("\n"); sb.append("similarity=").append(getSimilarity().getClass().getName()).append("\n"); - sb.append("mergeScheduler=").append(getMergeScheduler().getClass().getName()).append("\n"); + sb.append("mergeScheduler=").append(getMergeScheduler()).append("\n"); sb.append("default WRITE_LOCK_TIMEOUT=").append(IndexWriterConfig.WRITE_LOCK_TIMEOUT).append("\n"); sb.append("writeLockTimeout=").append(getWriteLockTimeout()).append("\n"); sb.append("codec=").append(getCodec()).append("\n"); Index: lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (revision 1406157) +++ lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (working copy) @@ -302,7 +302,7 @@ } @Override - public void merge(IndexWriter writer) throws IOException { + public synchronized void merge(IndexWriter writer) throws IOException { assert !Thread.holdsLock(writer); @@ -328,31 +328,34 @@ // pending merges, until it's empty: while (true) { - synchronized(this) { - long startStallTime = 0; - while (mergeThreadCount() >= 1+maxMergeCount) { - startStallTime = System.currentTimeMillis(); - if (verbose()) { - message(" too many merges; stalling..."); - } - try { - wait(); - } catch (InterruptedException ie) { - throw new ThreadInterruptedException(ie); - } + long startStallTime = 0; + while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) { + // This means merging has fallen too far behind: we + // have already created maxMergeCount threads, and + // now there's at least one more merge pending. + // Note that only maxThreadCount of + // those created merge threads will actually be + // running; the rest will be paused (see + // updateMergeThreads). We stall this producer + // thread to prevent creation of new segments, + // until merging has caught up: + startStallTime = System.currentTimeMillis(); + if (verbose()) { + message(" too many merges; stalling..."); } + try { + wait(); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + } - if (verbose()) { - if (startStallTime != 0) { - message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec"); - } + if (verbose()) { + if (startStallTime != 0) { + message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec"); } } - - // TODO: we could be careful about which merges to do in - // the BG (eg maybe the "biggest" ones) vs FG, which - // merges to do first (the easiest ones?), etc. MergePolicy.OneMerge merge = writer.getNextMerge(); if (merge == null) { if (verbose()) { @@ -361,34 +364,28 @@ return; } - // We do this w/ the primary thread to keep - // deterministic assignment of segment names - writer.mergeInit(merge); - boolean success = false; try { - synchronized(this) { - if (verbose()) { - message(" consider merge " + writer.segString(merge.segments)); - } + if (verbose()) { + message(" consider merge " + writer.segString(merge.segments)); + } - // OK to spawn a new merge thread to handle this - // merge: - final MergeThread merger = getMergeThread(writer, merge); - mergeThreads.add(merger); - if (verbose()) { - message(" launch new thread [" + merger.getName() + "]"); - } + // OK to spawn a new merge thread to handle this + // merge: + final MergeThread merger = getMergeThread(writer, merge); + mergeThreads.add(merger); + if (verbose()) { + message(" launch new thread [" + merger.getName() + "]"); + } - merger.start(); + merger.start(); - // Must call this after starting the thread else - // the new thread is removed from mergeThreads - // (since it's not alive yet): - updateMergeThreads(); + // Must call this after starting the thread else + // the new thread is removed from mergeThreads + // (since it's not alive yet): + updateMergeThreads(); - success = true; - } + success = true; } finally { if (!success) { writer.mergeFinish(merge); @@ -482,7 +479,6 @@ // merge that writer says is necessary: merge = tWriter.getNextMerge(); if (merge != null) { - tWriter.mergeInit(merge); updateMergeThreads(); if (verbose()) { message(" merge thread: do another merge " + tWriter.segString(merge.segments)); @@ -546,4 +542,13 @@ void clearSuppressExceptions() { suppressExceptions = false; } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": "); + sb.append("maxThreadCount=").append(maxThreadCount).append(", "); + sb.append("maxMergeCount=").append(maxMergeCount).append(", "); + sb.append("mergeThreadPriority=").append(mergeThreadPriority); + return sb.toString(); + } } Index: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (revision 1406157) +++ lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -1891,6 +1891,15 @@ } /** + * Expert: returns true if there are merges waiting to be scheduled. + * + * @lucene.experimental + */ + public synchronized boolean hasPendingMerges() { + return pendingMerges.size() != 0; + } + + /** * Close the IndexWriter without committing * any changes that have occurred since the last commit * (or since it was opened, if commit hasn't been called).