Index: lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMerging.java =================================================================== --- lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMerging.java (revision 1589197) +++ lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMerging.java (working copy) @@ -15,11 +15,6 @@ * limitations under the License. */ -import java.io.IOException; -import java.util.ArrayList; -import java.util.Random; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; @@ -31,7 +26,11 @@ import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.LuceneTestCase; +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.atomic.AtomicReference; + public class TestIndexWriterMerging extends LuceneTestCase { @@ -313,7 +312,7 @@ // merging a segment with >= 20 (maxMergeDocs) docs private class MyMergeScheduler extends MergeScheduler { @Override - synchronized public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { + public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { while(true) { MergePolicy.OneMerge merge = writer.getNextMerge(); Index: lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (revision 1589197) +++ lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (working copy) @@ -42,7 +42,7 @@ * incoming threads by pausing until one more more merges * complete.

*/ -public class ConcurrentMergeScheduler extends MergeScheduler { +public class ConcurrentMergeScheduler extends AbstractLockedMergeScheduler { private int mergeThreadPriority = -1; @@ -308,7 +308,7 @@ } @Override - public synchronized void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { + public synchronized void mergeInternal(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { assert !Thread.holdsLock(writer); @@ -567,7 +567,7 @@ } @Override - public MergeScheduler clone() { + public ConcurrentMergeScheduler clone() { ConcurrentMergeScheduler clone = (ConcurrentMergeScheduler) super.clone(); clone.writer = null; clone.dir = null; Index: lucene/core/src/java/org/apache/lucene/index/SerialMergeScheduler.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/SerialMergeScheduler.java (revision 1589197) +++ lucene/core/src/java/org/apache/lucene/index/SerialMergeScheduler.java (working copy) @@ -21,7 +21,7 @@ /** A {@link MergeScheduler} that simply does each merge * sequentially, using the current thread. */ -public class SerialMergeScheduler extends MergeScheduler { +public class SerialMergeScheduler extends AbstractLockedMergeScheduler { /** Sole constructor. */ public SerialMergeScheduler() { @@ -31,7 +31,7 @@ * "synchronized" so that even if the application is using * multiple threads, only one merge may run at a time. */ @Override - synchronized public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { + public void mergeInternal(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { while(true) { MergePolicy.OneMerge merge = writer.getNextMerge(); Index: lucene/core/src/java/org/apache/lucene/index/AbstractLockedMergeScheduler.java =================================================================== --- lucene/core/src/java/org/apache/lucene/index/AbstractLockedMergeScheduler.java (revision 0) +++ lucene/core/src/java/org/apache/lucene/index/AbstractLockedMergeScheduler.java (working copy) @@ -0,0 +1,87 @@ +package org.apache.lucene.index; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A MergeScheduler that ensures that only one thread a time in entering + * the scheduling logic. The default implementation deploys a safety mechanism + * that prevents the index writer to produce too many segments if merges can't keep up. + */ +public abstract class AbstractLockedMergeScheduler extends MergeScheduler { + private Lock lock = new ReentrantLock(); + + /** Run the merges provided by {@link IndexWriter#getNextMerge()}. + * @param writer the {@link IndexWriter} to obtain the merges from. + * @param trigger the {@link MergeTrigger} that caused this merge to happen + * @param newMergesFound true iff any new merges were found by the caller otherwise false + * */ + protected abstract void mergeInternal(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException; + + + /** Run the merges provided by {@link IndexWriter#getNextMerge()}. + * The calling thread will try to obtain a lock before it calls {@link #mergeInternal(IndexWriter, MergeTrigger, boolean)}. If + * the lock can not be obtained {@link #onMergeLocked(IndexWriter, MergeTrigger, boolean)} will be called before + * the caller returns. + * @param writer the {@link IndexWriter} to obtain the merges from. + * @param trigger the {@link MergeTrigger} that caused this merge to happen + * @param newMergesFound true iff any new merges were found by the caller otherwise false + * */ + public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { + final Lock lock = this.lock; + if (lock.tryLock()) { + try { + mergeInternal(writer, trigger, newMergesFound); + } finally { + lock.unlock(); + } + } else { + onMergeLocked(writer, trigger, newMergesFound); + } + } + + /** + * This method is called by {@link #merge(IndexWriter, MergeTrigger, boolean)} when it can not acquire exclusive access to + * the merge scheduler. The default implementation will reattempt to get exclusive access on the scheduler by blocking + * on the internal lock. This will prevent creation of too many segments if merges are falling behind. + * + * @param writer the {@link IndexWriter} to obtain the merges from. + * @param trigger the {@link MergeTrigger} that caused this merge to happen + * @param newMergesFound true iff any new merges were found by the caller otherwise false + */ + protected void onMergeLocked(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { + final Lock lock = this.lock; + lock.lock(); // default impl is to block if merges stall + try { + mergeInternal(writer, trigger, newMergesFound); + } finally { + lock.unlock(); + } + } + + @Override + public AbstractLockedMergeScheduler clone() { + final AbstractLockedMergeScheduler clone = (AbstractLockedMergeScheduler) super.clone(); + clone.lock = new ReentrantLock(); + return clone; + } + +} Property changes on: lucene/core/src/java/org/apache/lucene/index/AbstractLockedMergeScheduler.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: lucene/test-framework/src/java/org/apache/lucene/index/BaseMergePolicyTestCase.java =================================================================== --- lucene/test-framework/src/java/org/apache/lucene/index/BaseMergePolicyTestCase.java (revision 1589197) +++ lucene/test-framework/src/java/org/apache/lucene/index/BaseMergePolicyTestCase.java (working copy) @@ -39,11 +39,11 @@ final AtomicBoolean mayMerge = new AtomicBoolean(true); final MergeScheduler mergeScheduler = new SerialMergeScheduler() { @Override - synchronized public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { + public void mergeInternal(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { if (!mayMerge.get() && writer.getNextMerge() != null) { throw new AssertionError(); } - super.merge(writer, trigger, newMergesFound); + super.mergeInternal(writer, trigger, newMergesFound); } }; IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())).setMergeScheduler(mergeScheduler).setMergePolicy(mergePolicy()));