Index: lucene/test-framework/src/java/org/apache/lucene/index/BaseMergePolicyTestCase.java =================================================================== --- lucene/test-framework/src/java/org/apache/lucene/index/BaseMergePolicyTestCase.java (revision 1590432) +++ lucene/test-framework/src/java/org/apache/lucene/index/BaseMergePolicyTestCase.java (working copy) @@ -40,7 +40,7 @@ final MergeScheduler mergeScheduler = new SerialMergeScheduler() { @Override synchronized public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { - if (!mayMerge.get() && writer.getNextMerge() != null) { + if (!mayMerge.get() && writer.getAndPromoteNextPendingMerge() != null) { throw new AssertionError(); } super.merge(writer, trigger, newMergesFound); Index: lucene/test-framework/src/java/org/apache/lucene/util/TestUtil.java =================================================================== --- lucene/test-framework/src/java/org/apache/lucene/util/TestUtil.java (revision 1590432) +++ lucene/test-framework/src/java/org/apache/lucene/util/TestUtil.java (working copy) @@ -767,11 +767,6 @@ tmp.setSegmentsPerTier(Math.min(5, tmp.getSegmentsPerTier())); tmp.setNoCFSRatio(1.0); } - MergeScheduler ms = w.getConfig().getMergeScheduler(); - if (ms instanceof ConcurrentMergeScheduler) { - // wtf... shouldnt it be even lower since its 1 by default?!?! - ((ConcurrentMergeScheduler) ms).setMaxMergesAndThreads(3, 2); - } } /** Checks some basic behaviour of an AttributeImpl Index: lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java =================================================================== --- lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java (revision 1590432) +++ lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java (working copy) @@ -865,9 +865,7 @@ } else if (rarely(r)) { int maxThreadCount = TestUtil.nextInt(random(), 1, 4); int maxMergeCount = TestUtil.nextInt(random(), maxThreadCount, maxThreadCount + 4); - ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); - cms.setMaxMergesAndThreads(maxMergeCount, maxThreadCount); - c.setMergeScheduler(cms); + c.setMergeScheduler(new ConcurrentMergeScheduler(maxMergeCount, maxThreadCount)); } if (r.nextBoolean()) { if (rarely(r)) { Index: lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java (revision 1590432) +++ lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java (working copy) @@ -275,10 +275,10 @@ System.out.println("TEST: maxMergeCount=" + maxMergeCount + " maxMergeThreads=" + maxMergeThreads); } - ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler() { + ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(maxMergeCount, maxMergeThreads) { @Override - protected void doMerge(MergePolicy.OneMerge merge) throws IOException { + protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { try { // Stall all incoming merges until we see // maxMergeCount: @@ -297,7 +297,7 @@ // Then sleep a bit to give a chance for the bug // (too many pending merges) to appear: Thread.sleep(20); - super.doMerge(merge); + super.doMerge(writer, merge); } finally { runningMergeCount.decrementAndGet(); } @@ -308,7 +308,6 @@ } } }; - cms.setMaxMergesAndThreads(maxMergeCount, maxMergeThreads); iwc.setMergeScheduler(cms); iwc.setMaxBufferedDocs(2); @@ -334,13 +333,13 @@ long totMergedBytes; public TrackingCMS() { - setMaxMergesAndThreads(5, 5); + super(5, 5); } @Override - public void doMerge(MergePolicy.OneMerge merge) throws IOException { + public void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { totMergedBytes += merge.totalBytesSize(); - super.doMerge(merge); + super.doMerge(writer, merge); } } Index: lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java (revision 1590432) +++ lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java (working copy) @@ -1758,17 +1758,16 @@ IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); final MergeScheduler ms = iwc.getMergeScheduler(); if (ms instanceof ConcurrentMergeScheduler) { - final ConcurrentMergeScheduler suppressFakeIOE = new ConcurrentMergeScheduler() { + final ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) ms; + final ConcurrentMergeScheduler suppressFakeIOE = new ConcurrentMergeScheduler(cms.getMaxMergeCount(), cms.getMaxThreadCount()) { @Override - protected void handleMergeException(Throwable exc) { + protected void handleMergeException(MergePolicy.OneMerge merge, Throwable exc) { // suppress only FakeIOException: if (!(exc instanceof FakeIOException)) { - super.handleMergeException(exc); + super.handleMergeException(merge, exc); } } }; - final ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) ms; - suppressFakeIOE.setMaxMergesAndThreads(cms.getMaxMergeCount(), cms.getMaxThreadCount()); suppressFakeIOE.setMergeThreadPriority(cms.getMergeThreadPriority()); iwc.setMergeScheduler(suppressFakeIOE); } Index: lucene/core/src/test/org/apache/lucene/index/TestNoMergeScheduler.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestNoMergeScheduler.java (revision 1590432) +++ lucene/core/src/test/org/apache/lucene/index/TestNoMergeScheduler.java (working copy) @@ -54,7 +54,7 @@ // context, including ones from Object. So just filter out Object. If in // the future MergeScheduler will extend a different class than Object, // this will need to change. - if (m.getDeclaringClass() != Object.class) { + if (m.getDeclaringClass() != Object.class && (Modifier.isFinal(m.getModifiers()) == false)) { assertTrue(m + " is not overridden !", m.getDeclaringClass() == NoMergeScheduler.class); } } Index: lucene/core/src/test/org/apache/lucene/index/TestSerialMergeScheduler.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestSerialMergeScheduler.java (revision 0) +++ lucene/core/src/test/org/apache/lucene/index/TestSerialMergeScheduler.java (working copy) @@ -0,0 +1,106 @@ +package org.apache.lucene.index; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.codecs.lucene41.Lucene41PostingsFormat; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.IndexWriterConfig.OpenMode; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MockDirectoryWrapper; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.TestUtil; + +public class TestSerialMergeScheduler extends LuceneTestCase { + + // Just counts total and in-flight merges, asserting that + // at most 1 merge runs at once: + private class MergeCountingIndexWriter extends IndexWriter { + + public final AtomicInteger totalMergeCount = new AtomicInteger(); + + private final AtomicInteger runningMergeCount = new AtomicInteger(); + + public MergeCountingIndexWriter(Directory dir, IndexWriterConfig iwc) throws IOException { + super(dir, iwc); + } + + @Override + public void merge(MergePolicy.OneMerge merge) throws IOException { + totalMergeCount.incrementAndGet(); + int count = runningMergeCount.incrementAndGet(); + assertTrue(count + " merges running", count <= 1); + try { + super.merge(merge); + } finally { + runningMergeCount.decrementAndGet(); + } + } + } + + public void testOnlyOneMergeAtOnce() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + iwc.setMaxBufferedDocs(2); + iwc.setMergeScheduler(new SerialMergeScheduler()); + TieredMergePolicy tmp = new TieredMergePolicy(); + tmp.setSegmentsPerTier(2.0); + iwc.setMergePolicy(tmp); + + final MergeCountingIndexWriter writer = new MergeCountingIndexWriter(dir, iwc); + + final CountDownLatch startingGun = new CountDownLatch(1); + + Thread[] threads = new Thread[4]; + for(int i=0;i= 20 (maxMergeDocs) docs private class MyMergeScheduler extends MergeScheduler { + public MyMergeScheduler() { + super(1); + } + @Override synchronized public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { while(true) { - MergePolicy.OneMerge merge = writer.getNextMerge(); + MergePolicy.OneMerge merge = writer.getAndPromoteNextPendingMerge(); if (merge == null) { break; } @@ -448,7 +451,7 @@ reader.close(); // Reopen - writer = new IndexWriter(directory, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random())).setOpenMode(OpenMode.APPEND).setMergePolicy(newLogMergePolicy())); + writer = new IndexWriter(directory, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())).setOpenMode(OpenMode.APPEND).setMergePolicy(newLogMergePolicy())); } writer.shutdown(); } Index: lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java (revision 1590432) +++ lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java (working copy) @@ -2479,7 +2479,7 @@ iwc.setMergeScheduler(new ConcurrentMergeScheduler() { @Override - public void doMerge(MergePolicy.OneMerge merge) throws IOException { + public void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { mergeStarted.countDown(); try { closeStarted.await(); @@ -2487,7 +2487,7 @@ Thread.currentThread().interrupt(); throw new RuntimeException(ie); } - super.doMerge(merge); + super.doMerge(writer, merge); } @Override Index: lucene/core/src/test/org/apache/lucene/TestMergeSchedulerExternal.java =================================================================== --- lucene/core/src/test/org/apache/lucene/TestMergeSchedulerExternal.java (revision 1590432) +++ lucene/core/src/test/org/apache/lucene/TestMergeSchedulerExternal.java (working copy) @@ -63,14 +63,14 @@ } @Override - protected void handleMergeException(Throwable t) { + protected void handleMergeException(MergePolicy.OneMerge merge, Throwable t) { excCalled = true; } @Override - protected void doMerge(MergePolicy.OneMerge merge) throws IOException { + protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { mergeCalled = true; - super.doMerge(merge); + super.doMerge(writer, merge); } } @@ -112,11 +112,14 @@ } private static class ReportingMergeScheduler extends MergeScheduler { + public ReportingMergeScheduler() { + super(1); + } @Override public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { OneMerge merge = null; - while ((merge = writer.getNextMerge()) != null) { + while ((merge = writer.getAndPromoteNextPendingMerge()) != null) { if (VERBOSE) { System.out.println("executing merge " + merge.segString(writer.getDirectory())); } @@ -126,7 +129,6 @@ @Override public void close() throws IOException {} - } public void testCustomMergeScheduler() throws Exception { Index: lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (revision 1590432) +++ lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (working copy) @@ -17,21 +17,23 @@ * limitations under the License. */ +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; +import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.ThreadInterruptedException; -import org.apache.lucene.util.CollectionUtil; -import java.io.IOException; -import java.util.List; -import java.util.ArrayList; -import java.util.Comparator; - /** A {@link MergeScheduler} that runs each merge using a * separate thread. * *

Specify the max number of threads that may run at - * once, and the maximum number of simultaneous merges - * with {@link #setMaxMergesAndThreads}.

+ * once and the maximum backlog of merges + * using {@link #ConcurrentMergeScheduler(int,int)}.

* *

If the number of merges exceeds the max number of threads * then the largest merges are paused until one of the smaller @@ -44,10 +46,10 @@ */ public class ConcurrentMergeScheduler extends MergeScheduler { - private int mergeThreadPriority = -1; + private volatile int mergeThreadPriority = -1; /** List of currently active {@link MergeThread}s. */ - protected List mergeThreads = new ArrayList<>(); + protected List mergeThreads = Collections.synchronizedList(new ArrayList()); /** * Default {@code maxThreadCount}. @@ -66,40 +68,30 @@ // ones run, up until maxMergeCount merges at which point // we forcefully pause incoming threads (that presumably // are the ones causing so much merging). - private int maxThreadCount = DEFAULT_MAX_THREAD_COUNT; + private final int maxThreadCount; - // Max number of merges we accept before forcefully - // throttling the incoming threads - private int maxMergeCount = DEFAULT_MAX_MERGE_COUNT; - /** {@link Directory} that holds the index. */ protected Directory dir; - /** {@link IndexWriter} that owns this instance. */ - protected IndexWriter writer; - /** How many {@link MergeThread}s have kicked off (this is use * to name them). */ protected int mergeThreadCount; - /** Sole constructor, with all settings set to default - * values. */ + /** Creates this with default maxMergeCount and + * maxMergeThreads. */ public ConcurrentMergeScheduler() { + this(DEFAULT_MAX_MERGE_COUNT, DEFAULT_MAX_THREAD_COUNT); } - /** - * Sets the maximum number of merge threads and simultaneous merges allowed. - * - * @param maxMergeCount the max # simultaneous merges that are allowed. - * If a merge is necessary yet we already have this many - * threads running, the incoming thread (that is calling - * add/updateDocument) will block until a merge thread - * has completed. Note that we will only run the - * smallest maxThreadCount merges at a time. - * @param maxThreadCount the max # simultaneous merge threads that should - * be running at once. This must be <= maxMergeCount - */ - public void setMaxMergesAndThreads(int maxMergeCount, int maxThreadCount) { + /** Creates this. + * + * @param maxMergeCount Maximum number of merges before + * incoming segment-producing threads are forcefully stalled. + * @param maxThreadCount Maximum merge threads that can + * run concurrently; this must be <= + * maxMergeCount. */ + public ConcurrentMergeScheduler(int maxMergeCount, int maxThreadCount) { + super(maxMergeCount); if (maxThreadCount < 1) { throw new IllegalArgumentException("maxThreadCount should be at least 1"); } @@ -110,26 +102,18 @@ throw new IllegalArgumentException("maxThreadCount should be <= maxMergeCount (= " + maxMergeCount + ")"); } this.maxThreadCount = maxThreadCount; - this.maxMergeCount = maxMergeCount; } - /** Returns {@code maxThreadCount}. - * - * @see #setMaxMergesAndThreads(int, int) */ + /** Returns {@code maxThreadCount}. */ public int getMaxThreadCount() { return maxThreadCount; } - /** See {@link #setMaxMergesAndThreads}. */ - public int getMaxMergeCount() { - return maxMergeCount; - } - /** Return the priority that merge threads run at. By * default the priority is 1 plus the priority of (ie, * slightly higher priority than) the first thread that * calls merge. */ - public synchronized int getMergeThreadPriority() { + public int getMergeThreadPriority() { initMergeThreadPriority(); return mergeThreadPriority; } @@ -140,27 +124,21 @@ * set this any higher than * Thread.MAX_PRIORITY-maxThreadCount, so that CMS has * room to set relative priority among threads. */ - public synchronized void setMergeThreadPriority(int pri) { - if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY) + public void setMergeThreadPriority(int pri) { + if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY) { throw new IllegalArgumentException("priority must be in range " + Thread.MIN_PRIORITY + " .. " + Thread.MAX_PRIORITY + " inclusive"); + } mergeThreadPriority = pri; updateMergeThreads(); } - /** Sorts {@link MergeThread}s; larger merges come first. */ - protected static final Comparator compareByMergeDocCount = new Comparator() { - @Override - public int compare(MergeThread t1, MergeThread t2) { - final MergePolicy.OneMerge m1 = t1.getCurrentMerge(); - final MergePolicy.OneMerge m2 = t2.getCurrentMerge(); - - final int c1 = m1 == null ? Integer.MAX_VALUE : m1.totalDocCount; - final int c2 = m2 == null ? Integer.MAX_VALUE : m2.totalDocCount; + private void finishMergeThread(MergeThread thread) { + mergeFinished(); + boolean removed = mergeThreads.remove(thread); + assert removed: "list=" + mergeThreads; + updateMergeThreads(); + } - return c2 - c1; - } - }; - /** * Called whenever the running merges have changed, to pause & unpause * threads. This method sorts the merge threads by their merge size in @@ -173,45 +151,39 @@ // process of stopping (ie have an active merge): final List activeMerges = new ArrayList<>(); - int threadIdx = 0; - while (threadIdx < mergeThreads.size()) { - final MergeThread mergeThread = mergeThreads.get(threadIdx); - if (!mergeThread.isAlive()) { - // Prune any dead threads - mergeThreads.remove(threadIdx); - continue; + synchronized (mergeThreads) { + Iterator it = mergeThreads.iterator(); + while (it.hasNext()) { + final MergeThread mergeThread = it.next(); + if (mergeThread.isAlive() == false) { + // Prune any dead threads + it.remove(); + } else { + activeMerges.add(mergeThread); + } } - if (mergeThread.getCurrentMerge() != null) { - activeMerges.add(mergeThread); - } - threadIdx++; } - // Sort the merge threads in descending order. - CollectionUtil.timSort(activeMerges, compareByMergeDocCount); + // Sort the merge threads in descending order by doc count: + CollectionUtil.timSort(activeMerges); int pri = mergeThreadPriority; final int activeMergeCount = activeMerges.size(); - for (threadIdx=0;threadIdx - * if (verbose()) { - * message("your message"); - * } - * - */ - protected boolean verbose() { - return writer != null && writer.infoStream.isEnabled("CMS"); - } - - /** - * Outputs the given message - this method assumes {@link #verbose()} was - * called and returned true. - */ - protected void message(String message) { - writer.infoStream.message("CMS", message); - } - - private synchronized void initMergeThreadPriority() { + private void initMergeThreadPriority() { if (mergeThreadPriority == -1) { // Default to slightly higher priority than our // calling thread mergeThreadPriority = 1+Thread.currentThread().getPriority(); - if (mergeThreadPriority > Thread.MAX_PRIORITY) + if (mergeThreadPriority > Thread.MAX_PRIORITY) { mergeThreadPriority = Thread.MAX_PRIORITY; + } } } @@ -268,7 +219,7 @@ try { while (true) { MergeThread toSync = null; - synchronized (this) { + synchronized (mergeThreads) { for (MergeThread t : mergeThreads) { if (t.isAlive()) { toSync = t; @@ -289,83 +240,32 @@ } } finally { // finally, restore interrupt status: - if (interrupted) Thread.currentThread().interrupt(); - } - } - - /** - * Returns the number of merge threads that are alive. Note that this number - * is ≤ {@link #mergeThreads} size. - */ - protected synchronized int mergeThreadCount() { - int count = 0; - for (MergeThread mt : mergeThreads) { - if (mt.isAlive() && mt.getCurrentMerge() != null) { - count++; + if (interrupted) { + Thread.currentThread().interrupt(); } } - return count; } @Override - public synchronized void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { + public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { assert !Thread.holdsLock(writer); - this.writer = writer; - initMergeThreadPriority(); - dir = writer.getDirectory(); - - // First, quickly run through the newly proposed merges - // and add any orthogonal merges (ie a merge not - // involving segments already pending to be merged) to - // the queue. If we are way behind on merging, many of - // these newly proposed merges will likely already be - // registered. - if (verbose()) { message("now merge"); - message(" index: " + writer.segString()); + message("index: " + writer.segString()); } // Iterate, pulling from the IndexWriter's queue of // pending merges, until it's empty: while (true) { - long startStallTime = 0; - while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) { - // This means merging has fallen too far behind: we - // have already created maxMergeCount threads, and - // now there's at least one more merge pending. - // Note that only maxThreadCount of - // those created merge threads will actually be - // running; the rest will be paused (see - // updateMergeThreads). We stall this producer - // thread to prevent creation of new segments, - // until merging has caught up: - startStallTime = System.currentTimeMillis(); - if (verbose()) { - message(" too many merges; stalling..."); - } - try { - wait(); - } catch (InterruptedException ie) { - throw new ThreadInterruptedException(ie); - } - } - - if (verbose()) { - if (startStallTime != 0) { - message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec"); - } - } - - MergePolicy.OneMerge merge = writer.getNextMerge(); + MergePolicy.OneMerge merge = getNextMerge(writer); if (merge == null) { if (verbose()) { - message(" no more merges pending; now return"); + message("no more merges pending; now return"); } return; } @@ -373,40 +273,41 @@ boolean success = false; try { if (verbose()) { - message(" consider merge " + writer.segString(merge.segments)); + message("run merge " + writer.segString(merge.segments)); } // OK to spawn a new merge thread to handle this // merge: - final MergeThread merger = getMergeThread(writer, merge); - mergeThreads.add(merger); + MergeThread merger = getMergeThread(writer, merge); + + merger.start(); if (verbose()) { - message(" launch new thread [" + merger.getName() + "]"); + message("launch new thread [" + merger.getName() + "]"); } - merger.start(); + success = true; - // Must call this after starting the thread else - // the new thread is removed from mergeThreads - // (since it's not alive yet): - updateMergeThreads(); - - success = true; } finally { - if (!success) { + if (success == false) { + mergeFinished(); writer.mergeFinish(merge); } } + + // Must call this after starting the thread else + // the new thread is removed from mergeThreads + // (since it's not alive yet): + updateMergeThreads(); } } /** Does the actual merge, by calling {@link IndexWriter#merge} */ - protected void doMerge(MergePolicy.OneMerge merge) throws IOException { + protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { writer.merge(merge); } /** Create and return a new MergeThread */ - protected synchronized MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { + protected MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { final MergeThread thread = new MergeThread(writer, merge); thread.setThreadPriority(mergeThreadPriority); thread.setDaemon(true); @@ -416,41 +317,17 @@ /** Runs a merge thread, which may run one or more merges * in sequence. */ - protected class MergeThread extends Thread { + protected class MergeThread extends Thread implements Comparable { - IndexWriter tWriter; - MergePolicy.OneMerge startMerge; - MergePolicy.OneMerge runningMerge; - private volatile boolean done; + final MergePolicy.OneMerge merge; + final IndexWriter writer; /** Sole constructor. */ - public MergeThread(IndexWriter writer, MergePolicy.OneMerge startMerge) { - this.tWriter = writer; - this.startMerge = startMerge; + public MergeThread(IndexWriter writer, MergePolicy.OneMerge merge) { + this.writer = writer; + this.merge = merge; } - /** Record the currently running merge. */ - public synchronized void setRunningMerge(MergePolicy.OneMerge merge) { - runningMerge = merge; - } - - /** Return the currently running merge. */ - public synchronized MergePolicy.OneMerge getRunningMerge() { - return runningMerge; - } - - /** Return the current merge, or null if this {@code - * MergeThread} is done. */ - public synchronized MergePolicy.OneMerge getCurrentMerge() { - if (done) { - return null; - } else if (runningMerge != null) { - return runningMerge; - } else { - return startMerge; - } - } - /** Set the priority of this thread. */ public void setThreadPriority(int pri) { try { @@ -464,65 +341,50 @@ } } + /** Sorts by descending doc count */ @Override + public int compareTo(MergeThread other) { + return other.merge.totalDocCount - merge.totalDocCount; + } + + @Override public void run() { - - // First time through the while loop we do the merge - // that we were started with: - MergePolicy.OneMerge merge = this.startMerge; - - try { - if (verbose()) { - message(" merge thread: start"); - } + // Add ourself to CMS's list to record that we are now alive: + mergeThreads.add(this); - while(true) { - setRunningMerge(merge); - doMerge(merge); + boolean aborted = false; - // Subsequent times through the loop we do any new - // merge that writer says is necessary: - merge = tWriter.getNextMerge(); - - // Notify here in case any threads were stalled; - // they will notice that the pending merge has - // been pulled and possibly resume: - synchronized(ConcurrentMergeScheduler.this) { - ConcurrentMergeScheduler.this.notifyAll(); - } - - if (merge != null) { - updateMergeThreads(); - if (verbose()) { - message(" merge thread: do another merge " + tWriter.segString(merge.segments)); - } - } else { - break; - } - } - + try { if (verbose()) { - message(" merge thread: done"); + message("merge thread: start"); } - + doMerge(writer, merge); } catch (Throwable exc) { - // Ignore the exception if it was due to abort: - if (!(exc instanceof MergePolicy.MergeAbortedException)) { - //System.out.println(Thread.currentThread().getName() + ": CMS: exc"); - //exc.printStackTrace(System.out); - if (!suppressExceptions) { - // suppressExceptions is normally only set during - // testing. - handleMergeException(exc); - } + if (exc instanceof MergePolicy.MergeAbortedException) { + aborted = true; + } else if (suppressExceptions == false) { + // suppressExceptions is normally only set during + // testing. + handleMergeException(merge, exc); } } finally { - done = true; - synchronized(ConcurrentMergeScheduler.this) { - updateMergeThreads(); - ConcurrentMergeScheduler.this.notifyAll(); + finishMergeThread(this); + } + + if (verbose()) { + message("merge thread: done"); + } + + // Let CMS run new merges if necessary: + if (aborted == false) { + try { + merge(writer, MergeTrigger.MERGE_FINISHED, true); + } catch (AlreadyClosedException ace) { + // OK + } catch (IOException ioe) { + throw new RuntimeException(ioe); } } } @@ -530,7 +392,7 @@ /** Called when an exception is hit in a background merge * thread */ - protected void handleMergeException(Throwable exc) { + protected void handleMergeException(MergePolicy.OneMerge merge, Throwable exc) { try { // When an exception is hit during merge, IndexWriter // removes any partial files and then allows another @@ -542,7 +404,7 @@ } catch (InterruptedException ie) { throw new ThreadInterruptedException(ie); } - throw new MergePolicy.MergeException(exc, dir); + throw new MergePolicy.MergeException(exc, merge.info.info.dir); } private boolean suppressExceptions; @@ -561,7 +423,7 @@ public String toString() { StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": "); sb.append("maxThreadCount=").append(maxThreadCount).append(", "); - sb.append("maxMergeCount=").append(maxMergeCount).append(", "); + sb.append("maxMergeCount=").append(maxMergeCount).append(", "); sb.append("mergeThreadPriority=").append(mergeThreadPriority); return sb.toString(); } @@ -569,9 +431,8 @@ @Override public MergeScheduler clone() { ConcurrentMergeScheduler clone = (ConcurrentMergeScheduler) super.clone(); - clone.writer = null; - clone.dir = null; clone.mergeThreads = new ArrayList<>(); + clone.mergeThreadPriority = -1; return clone; } } Index: lucene/core/src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (revision 1590432) +++ lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -704,6 +704,7 @@ mergePolicy = config.getMergePolicy(); mergePolicy.setIndexWriter(this); mergeScheduler = config.getMergeScheduler(); + mergeScheduler.setInfoStream(infoStream); codec = config.getCodec(); bufferedUpdatesStream = new BufferedUpdatesStream(infoStream); @@ -1940,7 +1941,7 @@ * * @lucene.experimental */ - public synchronized MergePolicy.OneMerge getNextMerge() { + public synchronized MergePolicy.OneMerge getAndPromoteNextPendingMerge() { if (pendingMerges.size() == 0) { return null; } else { @@ -1960,6 +1961,11 @@ return pendingMerges.size() != 0; } + /** Returns number of merges currently running. */ + public synchronized int getRunningMergeCount() { + return runningMerges.size(); + } + /** * Close the IndexWriter without committing * any changes that have occurred since the last commit @@ -3600,9 +3606,11 @@ } } } + } catch (OutOfMemoryError oom) { handleOOM(oom, "merge"); } + if (merge.info != null && !merge.isAborted()) { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "merge time " + (System.currentTimeMillis()-t0) + " msec for " + merge.info.info.getDocCount() + " docs"); Index: lucene/core/src/java/org/apache/lucene/index/SerialMergeScheduler.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/SerialMergeScheduler.java (revision 1590432) +++ lucene/core/src/java/org/apache/lucene/index/SerialMergeScheduler.java (working copy) @@ -19,25 +19,45 @@ import java.io.IOException; -/** A {@link MergeScheduler} that simply does each merge - * sequentially, using the current thread. */ +/** A {@link MergeScheduler} that does each merge + * using the current (indexing) thread. + * + *

This merge scheduler allows only one merge to run at a + * time, and if a new merge needs to kick off while + * one is already running, the thread will block by default + * until the first merge completes. */ + public class SerialMergeScheduler extends MergeScheduler { /** Sole constructor. */ public SerialMergeScheduler() { + super(1); } - /** Just do the merges in sequence. We do this - * "synchronized" so that even if the application is using - * multiple threads, only one merge may run at a time. */ + /** Just do the merges in sequence, only allowing one + * incoming indexing thread to be merging at once. */ @Override - synchronized public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { + public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { - while(true) { - MergePolicy.OneMerge merge = writer.getNextMerge(); - if (merge == null) + while (true) { + + MergePolicy.OneMerge merge = getNextMerge(writer); + if (merge == null) { + if (verbose()) { + message("no more merges pending; now return"); + } break; - writer.merge(merge); + } + + if (verbose()) { + message("run merge=" + writer.segString(merge.segments)); + } + + try { + writer.merge(merge); + } finally { + mergeFinished(); + } } } Index: lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java (revision 1590432) +++ lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java (working copy) @@ -32,8 +32,9 @@ /** The single instance of {@link NoMergeScheduler} */ public static final MergeScheduler INSTANCE = new NoMergeScheduler(); + // prevent instantiation private NoMergeScheduler() { - // prevent instantiation + super(0); } @Override @@ -46,5 +47,4 @@ public MergeScheduler clone() { return this; } - } Index: lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java (revision 1590432) +++ lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java (working copy) @@ -19,21 +19,32 @@ import java.io.Closeable; import java.io.IOException; +import java.util.concurrent.Semaphore; +import org.apache.lucene.util.ThreadInterruptedException; +import org.apache.lucene.util.InfoStream; + /**

Expert: {@link IndexWriter} uses an instance - * implementing this interface to execute the merges + * of this to execute the merges * selected by a {@link MergePolicy}. The default * MergeScheduler is {@link ConcurrentMergeScheduler}.

*

Implementers of sub-classes should make sure that {@link #clone()} * returns an independent instance able to work with any {@link IndexWriter} * instance.

- * @lucene.experimental -*/ + * + * @lucene.experimental */ + public abstract class MergeScheduler implements Closeable, Cloneable { + private Semaphore permits; + private InfoStream infoStream; + protected final int maxMergeCount; + /** Sole constructor. (For invocation by subclass * constructors, typically implicit.) */ - protected MergeScheduler() { + protected MergeScheduler(int maxMergeCount) { + this.maxMergeCount = maxMergeCount; + this.permits = new Semaphore(maxMergeCount, true); } /** Run the merges provided by {@link IndexWriter#getNextMerge()}. @@ -47,10 +58,108 @@ @Override public abstract void close() throws IOException; + /** The maximum count of current merges allowed to run at + * once before indexing threads that are producing + * segments are stalled by {@link #maybeStall}. */ + public final int getMaxMergeCount() { + return maxMergeCount; + } + + /** IndexWriter calls this on init. */ + final void setInfoStream(InfoStream infoStream) { + this.infoStream = infoStream; + } + + /** + * Returns true if verbosing is enabled. This method is usually used in + * conjunction with {@link #message(String)}, like that: + * + *
+   * if (verbose()) {
+   *   message("your message");
+   * }
+   * 
+ */ + protected boolean verbose() { + return infoStream != null && infoStream.isEnabled("MS"); + } + + /** + * Outputs the given message - this method assumes {@link #verbose()} was + * called and returned true. + */ + protected void message(String message) { + infoStream.message("MS", message); + } + + /** Subclass calls this to get the next merge. If there + * are more than {@code maxMergeCount} merges running then this + * method will call {@link #maybeStall} to stall (by + * default) until merges catch up. Be sure + * to call {@link #mergeFinished} once the merge is + * done. */ + protected MergePolicy.OneMerge getNextMerge(IndexWriter writer) { + if (permits.tryAcquire() || (writer.hasPendingMerges() && maybeStall(writer))) { + MergePolicy.OneMerge merge = null; + try { + merge = writer.getAndPromoteNextPendingMerge(); + } finally { + if (merge == null) { + permits.release(); + } + } + + return merge; + } else { + return null; + } + } + + /** Called from {@link #getNextMerge} when there are too + * many merges. The default implementation + * stalls the incoming (segment-creating) thread as a + * simple but effective denial-of-service protection. + * Return true if the the thread may now execute a + * merge, or false if the thread should just return + * without merging. */ + protected boolean maybeStall(IndexWriter writer) { + if (verbose()) { + message("too many merges (" + writer.getRunningMergeCount() + " vs max=" + getMaxMergeCount() + "); stalling current thread..."); + } + long start = System.currentTimeMillis(); + boolean acquired = false; + boolean success = false; + try { + permits.acquire(); + acquired = true; + if (verbose()) { + message("stalled for " + (System.currentTimeMillis()-start) + " msec"); + } + success = true; + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } finally { + if (acquired && success == false) { + // Hit an exception in verbose() or message(): + permits.release(); + } + } + + return true; + } + + /** Subclass must call this after finishing each merge. */ + protected void mergeFinished() { + permits.release(); + } + @Override public MergeScheduler clone() { try { - return (MergeScheduler) super.clone(); + MergeScheduler clone = (MergeScheduler) super.clone(); + clone.infoStream = null; + clone.permits = new Semaphore(clone.maxMergeCount, true); + return clone; } catch (CloneNotSupportedException e) { throw new Error(e); } Index: lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java =================================================================== --- lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java (revision 1590432) +++ lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java (working copy) @@ -110,19 +110,15 @@ "org.apache.lucene.index.ConcurrentMergeScheduler"); if (mergeScheduler.equals(NoMergeScheduler.class.getName())) { iwConf.setMergeScheduler(NoMergeScheduler.INSTANCE); + } else if (mergeScheduler.equals(ConcurrentMergeScheduler.class.getName())) { + iwConf.setMergeScheduler(new ConcurrentMergeScheduler(config.get("concurrent.merge.scheduler.max.merge.count", ConcurrentMergeScheduler.DEFAULT_MAX_MERGE_COUNT), + config.get("concurrent.merge.scheduler.max.thread.count", ConcurrentMergeScheduler.DEFAULT_MAX_THREAD_COUNT))); } else { try { iwConf.setMergeScheduler(Class.forName(mergeScheduler).asSubclass(MergeScheduler.class).newInstance()); } catch (Exception e) { throw new RuntimeException("unable to instantiate class '" + mergeScheduler + "' as merge scheduler", e); } - - if (mergeScheduler.equals("org.apache.lucene.index.ConcurrentMergeScheduler")) { - ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) iwConf.getMergeScheduler(); - int maxThreadCount = config.get("concurrent.merge.scheduler.max.thread.count", ConcurrentMergeScheduler.DEFAULT_MAX_THREAD_COUNT); - int maxMergeCount = config.get("concurrent.merge.scheduler.max.merge.count", ConcurrentMergeScheduler.DEFAULT_MAX_MERGE_COUNT); - cms.setMaxMergesAndThreads(maxMergeCount, maxThreadCount); - } } final String defaultCodec = config.get("default.codec", null); Index: solr/core/src/java/org/apache/solr/update/SolrIndexConfig.java =================================================================== --- solr/core/src/java/org/apache/solr/update/SolrIndexConfig.java (revision 1590432) +++ solr/core/src/java/org/apache/solr/update/SolrIndexConfig.java (working copy) @@ -288,28 +288,27 @@ private MergeScheduler buildMergeScheduler(IndexSchema schema) { String msClassName = mergeSchedulerInfo == null ? SolrIndexConfig.DEFAULT_MERGE_SCHEDULER_CLASSNAME : mergeSchedulerInfo.className; - MergeScheduler scheduler = schema.getResourceLoader().newInstance(msClassName, MergeScheduler.class); - if (mergeSchedulerInfo != null) { - // LUCENE-5080: these two setters are removed, so we have to invoke setMaxMergesAndThreads - // if someone has them configured. - if (scheduler instanceof ConcurrentMergeScheduler) { - NamedList args = mergeSchedulerInfo.initArgs.clone(); - Integer maxMergeCount = (Integer) args.remove("maxMergeCount"); - if (maxMergeCount == null) { - maxMergeCount = ((ConcurrentMergeScheduler) scheduler).getMaxMergeCount(); - } - Integer maxThreadCount = (Integer) args.remove("maxThreadCount"); - if (maxThreadCount == null) { - maxThreadCount = ((ConcurrentMergeScheduler) scheduler).getMaxThreadCount(); - } - ((ConcurrentMergeScheduler)scheduler).setMaxMergesAndThreads(maxMergeCount, maxThreadCount); - SolrPluginUtils.invokeSetters(scheduler, args); - } else { - SolrPluginUtils.invokeSetters(scheduler, mergeSchedulerInfo.initArgs); + MergeScheduler scheduler; + if (msClassName.equals("org.apache.lucene.index.ConcurrentMergeScheduler") && mergeSchedulerInfo != null) { + NamedList args = mergeSchedulerInfo.initArgs; + Integer maxMergeCount = (Integer) args.remove("maxMergeCount"); + if (maxMergeCount == null) { + maxMergeCount = ConcurrentMergeScheduler.DEFAULT_MAX_MERGE_COUNT; } + Integer maxThreadCount = (Integer) args.remove("maxThreadCount"); + if (maxThreadCount == null) { + maxThreadCount = ConcurrentMergeScheduler.DEFAULT_MAX_THREAD_COUNT; + } + scheduler = new ConcurrentMergeScheduler(maxMergeCount, maxThreadCount); + } else { + scheduler = schema.getResourceLoader().newInstance(msClassName, MergeScheduler.class); } + if (mergeSchedulerInfo != null) { + SolrPluginUtils.invokeSetters(scheduler, mergeSchedulerInfo.initArgs); + } + return scheduler; }