Index: src/test/org/apache/lucene/index/TestStressIndexing.java =================================================================== --- src/test/org/apache/lucene/index/TestStressIndexing.java (revision 569112) +++ src/test/org/apache/lucene/index/TestStressIndexing.java (working copy) @@ -34,15 +34,15 @@ private static final Random RANDOM = new Random(); private static Searcher SEARCHER; - private static int RUN_TIME_SEC = 15; + private static int RUN_TIME_SEC = 7; private static class IndexerThread extends Thread { - IndexModifier modifier; + IndexWriter modifier; int nextID; public int count; boolean failed; - public IndexerThread(IndexModifier modifier) { + public IndexerThread(IndexWriter modifier) { this.modifier = modifier; } @@ -65,7 +65,7 @@ } // Delete 5 docs: - int deleteID = nextID; + int deleteID = nextID-1; for(int j=0; j<5; j++) { modifier.deleteDocuments(new Term("id", ""+deleteID)); deleteID -= 2; @@ -115,9 +115,14 @@ Run one indexer and 2 searchers against single index as stress test. */ - public void runStressTest(Directory directory) throws Exception { - IndexModifier modifier = new IndexModifier(directory, ANALYZER, true); + public void runStressTest(Directory directory, boolean autoCommit, MergePolicy mergePolicy) throws Exception { + IndexWriter modifier = new IndexWriter(directory, autoCommit, ANALYZER, true); + modifier.setMaxBufferedDocs(10); + + if (mergePolicy != null) + modifier.setMergePolicy(mergePolicy); + // One modifier that writes 10 docs then removes 5, over // and over: IndexerThread indexerThread = new IndexerThread(modifier); @@ -126,7 +131,8 @@ IndexerThread indexerThread2 = new IndexerThread(modifier); indexerThread2.start(); - // Two searchers that constantly just re-instantiate the searcher: + // Two searchers that constantly just re-instantiate the + // searcher: SearcherThread searcherThread1 = new SearcherThread(directory); searcherThread1.start(); @@ -157,16 +163,26 @@ // First in a RAM directory: Directory directory = new MockRAMDirectory(); - runStressTest(directory); + runStressTest(directory, true, null); directory.close(); // Second in an FSDirectory: String tempDir = System.getProperty("java.io.tmpdir"); File dirPath = new File(tempDir, "lucene.test.stress"); directory = FSDirectory.getDirectory(dirPath); - runStressTest(directory); + runStressTest(directory, true, null); directory.close(); rmDir(dirPath); + + // Third with ConcurrentMergePolicy + directory = new MockRAMDirectory(); + runStressTest(directory, true, new ConcurrentMergePolicyWrapper(new LogDocMergePolicy())); + directory.close(); + + // Finally with ConcurrentMergePolicy and autoCommit=false + directory = new MockRAMDirectory(); + runStressTest(directory, false, new ConcurrentMergePolicyWrapper(new LogDocMergePolicy())); + directory.close(); } private void rmDir(File dir) { Index: src/java/org/apache/lucene/index/ConcurrentMergePolicyWrapper.java =================================================================== --- src/java/org/apache/lucene/index/ConcurrentMergePolicyWrapper.java (revision 0) +++ src/java/org/apache/lucene/index/ConcurrentMergePolicyWrapper.java (revision 0) @@ -0,0 +1,267 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.store.Directory; + +import java.util.List; +import java.util.ArrayList; +import java.util.HashSet; +import java.io.IOException; + +public class ConcurrentMergePolicyWrapper implements MergePolicy { + + MergePolicy policy; + + private static boolean VERBOSE = false; + + private int maxThreadCount = 3; + List runningMerges = new ArrayList(); + + public ConcurrentMergePolicyWrapper(MergePolicy policy) { + this.policy = policy; + } + + public boolean useCompoundDocStore(SegmentInfos segments) { + return policy.useCompoundDocStore(segments); + } + + public boolean useCompoundFile(SegmentInfos segments, SegmentInfo info) { + return policy.useCompoundFile(segments, info); + } + + /** Sets the max # simultaneous threads that may be + * running. If a merge is necessary yet we already have + * this many threads running, the merge is returned back + * to IndexWriter so that it runs in the "foreground". */ + public void setMaxNumThreads(int count) { + maxThreadCount = count; + } + + /** Get the max # simultaneous threads that may be + * running. @see #setMaxNumThreads. */ + public int getMaxNumThreads() { + return maxThreadCount; + } + + public void close() { + policy.close(); + finishThreads(); + } + + private synchronized void finishThreads() { + while(runningMerges.size() > 0) { + if (VERBOSE) { + System.out.println("CMP: now wait for threads; currently " + runningMerges.size()); + for(int i=0;i= maxThreadCount) { + if (VERBOSE) + System.out.println(" too many merges running; send this one to caller to run in foreground"); + + // We are already running as many threads as + // allowed, so, return this merge back to writer + // and have it do it "in the foreground". + i++; + } else { + if (VERBOSE) + System.out.println(" launch new thread"); + MergeThread merger = new MergeThread(writer, merge, infos); + setPriorities(); + merger.setDaemon(true); + merger.start(); + spec.merges.remove(i); + } + } + + if (0 == spec.merges.size()) + spec = null; + + } // else + if (VERBOSE) + System.out.println(" no merges needed now"); + + // Return any "overflow" (when # merges requested + // exceeds our allowed concucrency) back to + // IndexWriter + return spec; + } + + private synchronized void setPriorities() { + // nocommit -- not quite right -- we should make + // "bigger" merges run w/ lower priority + int pri = Thread.currentThread().getPriority(); + + final int numThreads = runningMerges.size(); + for(int i=0;i