Index: src/test/org/apache/lucene/index/TestStressIndexing.java =================================================================== --- src/test/org/apache/lucene/index/TestStressIndexing.java (revision 570240) +++ src/test/org/apache/lucene/index/TestStressIndexing.java (working copy) @@ -34,15 +34,15 @@ private static final Random RANDOM = new Random(); private static Searcher SEARCHER; - private static int RUN_TIME_SEC = 15; + private static int RUN_TIME_SEC = 6; private static class IndexerThread extends Thread { - IndexModifier modifier; + IndexWriter modifier; int nextID; public int count; boolean failed; - public IndexerThread(IndexModifier modifier) { + public IndexerThread(IndexWriter modifier) { this.modifier = modifier; } @@ -65,8 +65,11 @@ } // Delete 5 docs: - int deleteID = nextID; + int deleteID = nextID-1; for(int j=0; j<5; j++) { + // nocommit -- the two indexing threads are + // stomping on each othere here -- need threadID + // as part of this doc id: modifier.deleteDocuments(new Term("id", ""+deleteID)); deleteID -= 2; } @@ -105,7 +108,8 @@ } } catch (Exception e) { System.out.println(e.toString()); - e.printStackTrace(); + System.out.print(Thread.currentThread().getName() + ": "); + e.printStackTrace(System.out); failed = true; } } @@ -115,9 +119,14 @@ Run one indexer and 2 searchers against single index as stress test. */ - public void runStressTest(Directory directory) throws Exception { - IndexModifier modifier = new IndexModifier(directory, ANALYZER, true); + public void runStressTest(Directory directory, boolean autoCommit, MergePolicy mergePolicy) throws Exception { + IndexWriter modifier = new IndexWriter(directory, autoCommit, ANALYZER, true); + modifier.setMaxBufferedDocs(10); + + if (mergePolicy != null) + modifier.setMergePolicy(mergePolicy); + // One modifier that writes 10 docs then removes 5, over // and over: IndexerThread indexerThread = new IndexerThread(modifier); @@ -126,7 +135,8 @@ IndexerThread indexerThread2 = new IndexerThread(modifier); indexerThread2.start(); - // Two searchers that constantly just re-instantiate the searcher: + // Two searchers that constantly just re-instantiate the + // searcher: SearcherThread searcherThread1 = new SearcherThread(directory); searcherThread1.start(); @@ -144,6 +154,7 @@ assertTrue("hit unexpected exception in indexer 2", !indexerThread2.failed); assertTrue("hit unexpected exception in search1", !searcherThread1.failed); assertTrue("hit unexpected exception in search2", !searcherThread2.failed); + //System.out.println(" Writer: " + indexerThread.count + " iterations"); //System.out.println("Searcher 1: " + searcherThread1.count + " searchers created"); //System.out.println("Searcher 2: " + searcherThread2.count + " searchers created"); @@ -155,18 +166,38 @@ */ public void testStressIndexAndSearching() throws Exception { - // First in a RAM directory: + // RAMDir Directory directory = new MockRAMDirectory(); - runStressTest(directory); + runStressTest(directory, true, null); directory.close(); - // Second in an FSDirectory: + // FSDir String tempDir = System.getProperty("java.io.tmpdir"); File dirPath = new File(tempDir, "lucene.test.stress"); directory = FSDirectory.getDirectory(dirPath); - runStressTest(directory); + runStressTest(directory, true, null); directory.close(); rmDir(dirPath); + + // With ConcurrentMergePolicy, in RAMDir + directory = new MockRAMDirectory(); + runStressTest(directory, true, new ConcurrentMergePolicyWrapper(new LogDocMergePolicy())); + directory.close(); + + // With ConcurrentMergePolicy, in FSDir + directory = FSDirectory.getDirectory(dirPath); + runStressTest(directory, true, new ConcurrentMergePolicyWrapper(new LogDocMergePolicy())); + directory.close(); + + // With ConcurrentMergePolicy and autoCommit=false, in RAMDir + directory = new MockRAMDirectory(); + runStressTest(directory, false, new ConcurrentMergePolicyWrapper(new LogDocMergePolicy())); + directory.close(); + + // With ConcurrentMergePolicy and autoCommit=false, in FSDir + directory = FSDirectory.getDirectory(dirPath); + runStressTest(directory, false, new ConcurrentMergePolicyWrapper(new LogDocMergePolicy())); + directory.close(); } private void rmDir(File dir) { Index: src/java/org/apache/lucene/index/ConcurrentMergePolicyWrapper.java =================================================================== --- src/java/org/apache/lucene/index/ConcurrentMergePolicyWrapper.java (revision 0) +++ src/java/org/apache/lucene/index/ConcurrentMergePolicyWrapper.java (revision 0) @@ -0,0 +1,266 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.store.Directory; + +import java.util.List; +import java.util.ArrayList; +import java.util.HashSet; +import java.io.IOException; + +public class ConcurrentMergePolicyWrapper implements MergePolicy { + + MergePolicy policy; + + private static boolean VERBOSE = false; + + private int maxThreadCount = 3; + List runningMerges = new ArrayList(); + + public ConcurrentMergePolicyWrapper(MergePolicy policy) { + this.policy = policy; + } + + public boolean useCompoundDocStore(SegmentInfos segments) { + return policy.useCompoundDocStore(segments); + } + + public boolean useCompoundFile(SegmentInfos segments, SegmentInfo info) { + return policy.useCompoundFile(segments, info); + } + + /** Sets the max # simultaneous threads that may be + * running. If a merge is necessary yet we already have + * this many threads running, the merge is returned back + * to IndexWriter so that it runs in the "foreground". */ + public void setMaxNumThreads(int count) { + maxThreadCount = count; + } + + /** Get the max # simultaneous threads that may be + * running. @see #setMaxNumThreads. */ + public int getMaxNumThreads() { + return maxThreadCount; + } + + public void close() { + policy.close(); + finishThreads(); + } + + private synchronized void finishThreads() { + while(runningMerges.size() > 0) { + if (VERBOSE) { + System.out.println("CMP: now wait for threads; currently " + runningMerges.size()); + for(int i=0;i= maxThreadCount) { + if (VERBOSE) + System.out.println(" too many merges running; send this one to caller to run in foreground"); + + // We are already running as many threads as + // allowed, so, return this merge back to writer + // and have it do it "in the foreground". + i++; + } else { + if (VERBOSE) + System.out.println(" launch new thread"); + MergeThread merger = new MergeThread(writer, merge, infos); + setPriorities(); + merger.setDaemon(true); + merger.start(); + spec.merges.remove(i); + } + } + + if (0 == spec.merges.size()) + spec = null; + + } // else + if (VERBOSE) + System.out.println(" no merges needed now"); + + // Return any "overflow" (when # merges requested + // exceeds our allowed concurrency) back to IndexWriter + return spec; + } + + protected synchronized void setPriorities() { + // nocommit -- not quite right -- we should make + // "bigger" merges run w/ lower priority + int pri = Thread.currentThread().getPriority(); + + final int numThreads = runningMerges.size(); + for(int i=0;i date body - int spot = line.indexOf(SEP); + final int spot = line.indexOf(SEP); titleField.setValue(line.substring(0, spot)); - int spot2 = line.indexOf(SEP, 1+spot); + final int spot2 = line.indexOf(SEP, 1+spot); dateField.setValue(line.substring(1+spot, spot2)); bodyField.setValue(line.substring(1+spot2, line.length())); return doc; @@ -111,12 +112,14 @@ return ds; } + private int lineCount = 0; public Document makeDocument() throws Exception { String line; synchronized(this) { while(true) { line = fileIn.readLine(); + lineCount++; if (line == null) { if (!forever) throw new NoMoreDataException(); @@ -149,8 +152,10 @@ try { if (fileIn != null) fileIn.close(); - fileIS = new FileInputStream(fileName); - fileIn = new BufferedReader(new InputStreamReader(fileIS,"UTF-8"), READER_BUFFER_BYTES); + // nocommit + //fileIS = new FileInputStream(fileName); + //fileIn = new BufferedReader(new InputStreamReader(fileIS,"UTF-8"), READER_BUFFER_BYTES); + fileIn = new BufferedReader(new FileReader(fileName), READER_BUFFER_BYTES); } catch (IOException e) { throw new RuntimeException(e); }