Index: CHANGES.txt =================================================================== --- CHANGES.txt (revision 573633) +++ CHANGES.txt (working copy) @@ -28,6 +28,20 @@ termText instead of String. This gives faster tokenization performance (~10-15%). (Mike McCandless) + 5. LUCENE-847: Factor MergePolicy and MergeScheduler out of + IndexWriter. This enables users to change how & when segments are + selected for merging (using MergePolicy) as well as how & when + those merges are actually performed (using MergeScheduler). + (Steven Parkes via Mike McCandless). + + 6. LUCENE-870: Add a ConcurrentMergeScheduler which will execute + merges using background threads. (Steven Parkes via Mike McCandless) + + 7. LUCENE-845: Added a LogByteSizeMergePolicy which selects merges by + roughly equal size (in bytes) of the segments in the Directory. + This is a better match when you flush by RAM usage instead of + document count in IndexWriter (Mike McCandless). + Bug fixes 1. LUCENE-933: QueryParser fixed to not produce empty sub Index: src/test/org/apache/lucene/store/MockRAMOutputStream.java =================================================================== --- src/test/org/apache/lucene/store/MockRAMOutputStream.java (revision 573633) +++ src/test/org/apache/lucene/store/MockRAMOutputStream.java (working copy) @@ -55,7 +55,7 @@ writeBytes(singleByte, 0, 1); } - public void writeBytes(byte[] b, int offset, int len) throws IOException { + public void writeBytes(byte[] b, int offset, int len) throws IOException { long freeSpace = dir.maxSize - dir.sizeInBytes(); long realUsage = 0; Index: src/test/org/apache/lucene/store/MockRAMDirectory.java =================================================================== --- src/test/org/apache/lucene/store/MockRAMDirectory.java (revision 573633) +++ src/test/org/apache/lucene/store/MockRAMDirectory.java (working copy) @@ -195,7 +195,7 @@ * RAMOutputStream.BUFFER_SIZE (now 1024) bytes. */ - final long getRecomputedActualSizeInBytes() { + final synchronized long getRecomputedActualSizeInBytes() { long size = 0; Iterator it = fileMap.values().iterator(); while (it.hasNext()) Index: src/test/org/apache/lucene/index/TestThreadedOptimize.java =================================================================== --- src/test/org/apache/lucene/index/TestThreadedOptimize.java (revision 0) +++ src/test/org/apache/lucene/index/TestThreadedOptimize.java (revision 0) @@ -0,0 +1,144 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.analysis.SimpleAnalyzer; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.MockRAMDirectory; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.util._TestUtil; +import org.apache.lucene.util.English; + +import junit.framework.TestCase; + +import java.io.IOException; +import java.io.File; + +public class TestThreadedOptimize extends TestCase { + + private static final Analyzer ANALYZER = new SimpleAnalyzer(); + + private final static int NUM_THREADS = 3; + //private final static int NUM_THREADS = 5; + + private final static int NUM_ITER = 2; + //private final static int NUM_ITER = 10; + + private final static int NUM_ITER2 = 2; + //private final static int NUM_ITER2 = 5; + + private boolean failed; + + private void setFailed() { + failed = true; + } + + public void runTest(Directory directory, boolean autoCommit, MergeScheduler merger) throws Exception { + + IndexWriter writer = new IndexWriter(directory, autoCommit, ANALYZER, true); + writer.setMaxBufferedDocs(2); + if (merger != null) + writer.setMergeScheduler(merger); + + for(int iter=0;iter 0) { Index: src/test/org/apache/lucene/index/TestStressIndexing.java =================================================================== --- src/test/org/apache/lucene/index/TestStressIndexing.java (revision 573633) +++ src/test/org/apache/lucene/index/TestStressIndexing.java (working copy) @@ -32,105 +32,119 @@ public class TestStressIndexing extends TestCase { private static final Analyzer ANALYZER = new SimpleAnalyzer(); private static final Random RANDOM = new Random(); - private static Searcher SEARCHER; - private static int RUN_TIME_SEC = 15; - - private static class IndexerThread extends Thread { - IndexWriter modifier; - int nextID; - public int count; + private static abstract class TimedThread extends Thread { boolean failed; + int count; + private static int RUN_TIME_SEC = 6; + private TimedThread[] allThreads; - public IndexerThread(IndexWriter modifier) { - this.modifier = modifier; + abstract public void doWork() throws Throwable; + + TimedThread(TimedThread[] threads) { + this.allThreads = threads; } public void run() { - long stopTime = System.currentTimeMillis() + 1000*RUN_TIME_SEC; - try { - while(true) { + final long stopTime = System.currentTimeMillis() + 1000*RUN_TIME_SEC; - if (System.currentTimeMillis() > stopTime) { - break; - } + count = 0; - // Add 10 docs: - for(int j=0; j<10; j++) { - Document d = new Document(); - int n = RANDOM.nextInt(); - d.add(new Field("id", Integer.toString(nextID++), Field.Store.YES, Field.Index.UN_TOKENIZED)); - d.add(new Field("contents", English.intToEnglish(n), Field.Store.NO, Field.Index.TOKENIZED)); - modifier.addDocument(d); - } - - // Delete 5 docs: - int deleteID = nextID; - for(int j=0; j<5; j++) { - modifier.deleteDocuments(new Term("id", ""+deleteID)); - deleteID -= 2; - } - + try { + while(System.currentTimeMillis() < stopTime && !anyErrors()) { + doWork(); count++; } - - } catch (Exception e) { - System.out.println(e.toString()); - e.printStackTrace(); + } catch (Throwable e) { + e.printStackTrace(System.out); failed = true; } } + + private boolean anyErrors() { + for(int i=0;i stopTime) { - break; - } - } - } catch (Exception e) { - System.out.println(e.toString()); - e.printStackTrace(); - failed = true; + public void doWork() throws Exception { + // Add 10 docs: + for(int j=0; j<10; j++) { + Document d = new Document(); + int n = RANDOM.nextInt(); + d.add(new Field("id", Integer.toString(nextID++), Field.Store.YES, Field.Index.UN_TOKENIZED)); + d.add(new Field("contents", English.intToEnglish(n), Field.Store.NO, Field.Index.TOKENIZED)); + writer.addDocument(d); } + + // Delete 5 docs: + int deleteID = nextID-1; + for(int j=0; j<5; j++) { + writer.deleteDocuments(new Term("id", ""+deleteID)); + deleteID -= 2; + } } } + private static class SearcherThread extends TimedThread { + private Directory directory; + + public SearcherThread(Directory directory, TimedThread[] threads) { + super(threads); + this.directory = directory; + } + + public void doWork() throws Throwable { + for (int i=0; i<100; i++) + (new IndexSearcher(directory)).close(); + count += 100; + } + } + /* Run one indexer and 2 searchers against single index as stress test. */ - public void runStressTest(Directory directory) throws Exception { - IndexWriter modifier = new IndexWriter(directory, ANALYZER, true); + public void runStressTest(Directory directory, boolean autoCommit, MergeScheduler mergeScheduler) throws Exception { + IndexWriter modifier = new IndexWriter(directory, autoCommit, ANALYZER, true); + modifier.setMaxBufferedDocs(10); + + TimedThread[] threads = new TimedThread[4]; + + if (mergeScheduler != null) + modifier.setMergeScheduler(mergeScheduler); + // One modifier that writes 10 docs then removes 5, over // and over: - IndexerThread indexerThread = new IndexerThread(modifier); + IndexerThread indexerThread = new IndexerThread(modifier, threads); + threads[0] = indexerThread; indexerThread.start(); - IndexerThread indexerThread2 = new IndexerThread(modifier); + IndexerThread indexerThread2 = new IndexerThread(modifier, threads); + threads[2] = indexerThread2; indexerThread2.start(); - // Two searchers that constantly just re-instantiate the searcher: - SearcherThread searcherThread1 = new SearcherThread(directory); + // Two searchers that constantly just re-instantiate the + // searcher: + SearcherThread searcherThread1 = new SearcherThread(directory, threads); + threads[3] = searcherThread1; searcherThread1.start(); - SearcherThread searcherThread2 = new SearcherThread(directory); + SearcherThread searcherThread2 = new SearcherThread(directory, threads); + threads[3] = searcherThread2; searcherThread2.start(); indexerThread.join(); @@ -144,6 +158,7 @@ assertTrue("hit unexpected exception in indexer2", !indexerThread2.failed); assertTrue("hit unexpected exception in search1", !searcherThread1.failed); assertTrue("hit unexpected exception in search2", !searcherThread2.failed); + //System.out.println(" Writer: " + indexerThread.count + " iterations"); //System.out.println("Searcher 1: " + searcherThread1.count + " searchers created"); //System.out.println("Searcher 2: " + searcherThread2.count + " searchers created"); @@ -155,25 +170,38 @@ */ public void testStressIndexAndSearching() throws Exception { - // First in a RAM directory: + // RAMDir Directory directory = new MockRAMDirectory(); - runStressTest(directory); + runStressTest(directory, true, null); directory.close(); - // Second in an FSDirectory: + // FSDir String tempDir = System.getProperty("java.io.tmpdir"); File dirPath = new File(tempDir, "lucene.test.stress"); directory = FSDirectory.getDirectory(dirPath); - runStressTest(directory); + runStressTest(directory, true, null); directory.close(); - rmDir(dirPath); - } - private void rmDir(File dir) { - File[] files = dir.listFiles(); - for (int i = 0; i < files.length; i++) { - files[i].delete(); - } - dir.delete(); + // With ConcurrentMergeScheduler, in RAMDir + directory = new MockRAMDirectory(); + runStressTest(directory, true, new ConcurrentMergeScheduler()); + directory.close(); + + // With ConcurrentMergeScheduler, in FSDir + directory = FSDirectory.getDirectory(dirPath); + runStressTest(directory, true, new ConcurrentMergeScheduler()); + directory.close(); + + // With ConcurrentMergeScheduler and autoCommit=false, in RAMDir + directory = new MockRAMDirectory(); + runStressTest(directory, false, new ConcurrentMergeScheduler()); + directory.close(); + + // With ConcurrentMergeScheduler and autoCommit=false, in FSDir + directory = FSDirectory.getDirectory(dirPath); + runStressTest(directory, false, new ConcurrentMergeScheduler()); + directory.close(); + + _TestUtil.rmDir(dirPath); } } Index: src/test/org/apache/lucene/index/TestDocumentWriter.java =================================================================== --- src/test/org/apache/lucene/index/TestDocumentWriter.java (revision 573633) +++ src/test/org/apache/lucene/index/TestDocumentWriter.java (working copy) @@ -62,7 +62,7 @@ IndexWriter writer = new IndexWriter(dir, analyzer, true); writer.addDocument(testDoc); writer.flush(); - SegmentInfo info = writer.segmentInfos.info(writer.segmentInfos.size()-1); + SegmentInfo info = writer.newestSegment(); writer.close(); //After adding the document, we should be able to read it back in SegmentReader reader = SegmentReader.get(info); @@ -123,7 +123,7 @@ writer.addDocument(doc); writer.flush(); - SegmentInfo info = writer.segmentInfos.info(writer.segmentInfos.size()-1); + SegmentInfo info = writer.newestSegment(); writer.close(); SegmentReader reader = SegmentReader.get(info); @@ -156,7 +156,7 @@ writer.addDocument(doc); writer.flush(); - SegmentInfo info = writer.segmentInfos.info(writer.segmentInfos.size()-1); + SegmentInfo info = writer.newestSegment(); writer.close(); SegmentReader reader = SegmentReader.get(info); Index: src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java =================================================================== --- src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java (revision 573633) +++ src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java (working copy) @@ -272,7 +272,6 @@ writer.addIndexesNoOptimize(new Directory[] { aux, aux }); assertEquals(1020, writer.docCount()); - assertEquals(2, writer.getSegmentCount()); assertEquals(1000, writer.getDocCount(0)); writer.close(); @@ -373,7 +372,7 @@ writer = newWriter(dir, true); writer.setMaxBufferedDocs(1000); - // add 1000 documents + // add 1000 documents in 1 segment addDocs(writer, 1000); assertEquals(1000, writer.docCount()); assertEquals(1, writer.getSegmentCount()); Index: src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java =================================================================== --- src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (revision 0) +++ src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (revision 0) @@ -0,0 +1,309 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.store.Directory; + +import java.io.IOException; +import java.util.List; +import java.util.ArrayList; + +/** A {@link MergeScheduler} that does each merge using a + * separate thread. This is a simple way to use + * concurrency in the indexing process without having to + * create external threads. */ + +public class ConcurrentMergeScheduler implements MergeScheduler { + + public static boolean VERBOSE = false; + + private int mergeThreadPriority = -1; + private boolean aborted; + private List serialMerges = new ArrayList(); + + private List runningMerges = new ArrayList(); + private int maxThreadCount = 3; + + private List exceptions = new ArrayList(); + + /** Sets the max # simultaneous threads that may be + * running. If a merge is necessary yet we already have + * this many threads running, the merge is returned back + * to IndexWriter so that it runs in the "foreground". */ + public void setMaxNumThreads(int count) { + maxThreadCount = count; + } + + /** Get the max # simultaneous threads that may be + * running. @see #setMaxNumThreads. */ + public int getMaxNumThreads() { + return maxThreadCount; + } + + /** Return the priority that merge threads run at. By + * default the priority is 1 plus the priority of first + * thread that calls merge. */ + public synchronized int getMergeThreadPriority() { + initMergeThreadPriority(); + return mergeThreadPriority; + } + + /** Returns any exceptions that were caught in the merge + * threads. */ + public List getExceptions() { + return exceptions; + } + + /** Return the priority that merge threads run at. */ + public synchronized void setMergeThreadPriority(int pri) { + mergeThreadPriority = pri; + + final int numThreads = runningMerges.size(); + for(int i=0;i 0) { + if (VERBOSE) { + message("now wait for threads; currentnly " + runningMerges.size() + " still running"); + for(int i=0;i= maxThreadCount) { + + if (VERBOSE) + message(" too many background merges running (" + numMergeThreads + "); will run in foreground"); + + // We are already running as many threads as + // allowed, so, do this one in the foreground + i++; + } else if (writer.registerMerge(merge)) { + MergeThread merger = new MergeThread(writer, merge); + if (VERBOSE) + message(" launch new thread: merge " + merge.segString(dir)); + merger.setDaemon(true); + merger.start(); + try { + merger.setPriority(mergeThreadPriority); + } catch (NullPointerException npe) { + // Strangely, Sun's JDK 1.5 on Linux sometimes + // throws NPE out of here... + } + spec.merges.remove(i); + } else { + if (VERBOSE) + message(" already running"); + spec.merges.remove(i); + } + } + } + } + + // Any merges that remain must now be run in the + // foreground of this thread + final int numMergesLeft = spec.merges.size(); + for(int j=0;jExpert: a MergePolicy determines the sequence of + * primitive merge operations to be used for overall merge + * and optimize operations.

+ * + *

Whenever the segments in an index have been altered by + * {@link IndexWriter}, either the addition of a newly + * flushed segment, addition of many segments due to + * addIndexes* calls, or a previous merge that may now need + * to cascade, {@link IndexWriter} will invoke {@link + * #findCandidateMerges} to give the MergePolicy a chance to merge + * segments. This method returns a {@link + * MergeSpecification} instance describing the set of merges + * that should be done, or null if no merges are + * necessary.

+ * + *

Note that this means the policy can return more than + * one merge as being necessary. In this case, if the + * writer is using {@link StandardMerger}, the merges will + * be run sequentially.

+ * + *

The default MergePolicy is {@link + * LogByteSizeMergePolicy}.

+ */ + +public interface MergePolicy { + + /** + * A OneMerge instance provides the information necessary + * to perform an individual primitive merge operation, + * resulting in a single new segment. The merge spec + * includes the subset of segments to be merged as well as + * whether the new segment should use the compound file + * format. + */ + + public static class OneMerge { + + SegmentInfo info; // used by IndexWriter + boolean mergeDocStores; // used by IndexWriter + SegmentInfos segmentsClone; // used by IndexWriter + boolean increfDone; // used by IndexWriter + boolean registerDone; // used by IndexWriter + int startFullRefreshCount; // used by IndexWriter + + final SegmentInfos segments; + final boolean useCompoundFile; + boolean aborted; + + public OneMerge(SegmentInfos segments, boolean useCompoundFile) { + this.segments = segments; + this.useCompoundFile = useCompoundFile; + } + + /** Mark this merge as aborted. If this is called + * before the merge is committed then the merge will + * not be committed. */ + public synchronized void abort() { + aborted = true; + } + /** Returns true if this merge was aborted. */ + public synchronized boolean isAborted() { + return aborted; + } + + public String segString(Directory dir) { + StringBuffer b = new StringBuffer(); + final int numSegments = segments.size(); + for(int i=0;i 0) b.append(" "); + b.append(segments.info(i).segString(dir)); + } + if (info != null) + b.append(" into " + info.name); + return b.toString(); + } + } + + /** + * A MergeSpecification instance provides the information + * necessary to perform multiple merges. It simply + * contains a list of {@link OneMerge} instances. + */ + + public static class MergeSpecification implements Cloneable { + + /** + * The subset of segments to be included in the primitive merge. + */ + + public List merges = new ArrayList(); + + public void add(OneMerge merge) { + merges.add(merge); + } + + public String segString(Directory dir) { + StringBuffer b = new StringBuffer(); + b.append("MergeSpec:\n"); + final int count = merges.size(); + for(int i=0;i maxNumSegments || + (infos.size() == 1 && + (infos.info(0).hasDeletions() || + infos.info(0).hasSeparateNorms() || + infos.info(0).dir != writer.getDirectory() || + infos.info(0).getUseCompoundFile() != useCompoundFile))); + } + + public MergeSpecification findMergesForOptimize(SegmentInfos infos, IndexWriter writer, int maxNumSegments) throws IOException { + final MergeSpecification spec; + if (!isOptimized(infos, writer, maxNumSegments)) { + spec = new MergeSpecification(); + final int numSegments = infos.size(); + final int first; + if (numSegments > mergeFactor) + first = numSegments-mergeFactor; + else + first = 0; + spec.add(new OneMerge(infos.range(first, numSegments), useCompoundFile)); + } else + spec = null; + return spec; + } + + public MergeSpecification findMerges(SegmentInfos infos, IndexWriter writer) throws IOException { + + final int numSegments = infos.size(); + + // Compute levels, which is just log (base mergeFactor) + // of the size of each segment + float[] levels = new float[numSegments]; + final float norm = (float) Math.log(mergeFactor); + + float levelFloor = (float) (Math.log(minMergeSize)/norm); + final Directory directory = writer.getDirectory(); + + for(int i=0;i= maxMergeSize && info.dir != directory) + throw new IllegalArgumentException("Segment is too large (" + size + " vs max size " + maxMergeSize + ")"); + + // Floor tiny segments @ mergeFactor + if (size < mergeFactor) + size = mergeFactor; + levels[i] = (float) Math.log(size)/norm; + } + + // Now, we quantize the log values into levels. The + // first level is any segment whose log size is within + // LEVEL_LOG_SPAN of the max size, or, who has such as + // segment "to the right". Then, we find the max of all + // other segments and use that to define the next level + // segment, etc. + + MergeSpecification spec = null; + + int start = 0; + while(start < numSegments) { + + // Find max level of all segments not already + // quantized. + float maxLevel = levels[start]; + for(int i=1+start;i maxLevel) + maxLevel = level; + } + + // Now search backwards for the rightmost segment that + // falls into this level: + float levelBottom; + if (maxLevel < levelFloor) + // All remaining segments fall into the min level + levelBottom = -1.0F; + else { + levelBottom = (float) (maxLevel - LEVEL_LOG_SPAN); + + // Force a boundary at the level floor + if (levelBottom < levelFloor && maxLevel >= levelFloor) + levelBottom = levelFloor; + } + + int upto = numSegments-1; + while(upto >= start) { + if (levels[upto] >= levelBottom) + break; + upto--; + } + + // Finally, record all merges that are viable at this level: + int end = start + mergeFactor; + while(end <= 1+upto) { + boolean anyTooLarge = false; + for(int i=start;i= maxMergeSize; + + if (!anyTooLarge) { + if (spec == null) + spec = new MergeSpecification(); + spec.add(new OneMerge(infos.range(start, end), useCompoundFile)); + } + start = end; + end = start + mergeFactor; + } + + start = 1+upto; + } + + return spec; + } +} Property changes on: src/java/org/apache/lucene/index/LogMergePolicy.java ___________________________________________________________________ Name: svn:eol-style + native Index: src/java/org/apache/lucene/index/SerialMergeScheduler.java =================================================================== --- src/java/org/apache/lucene/index/SerialMergeScheduler.java (revision 0) +++ src/java/org/apache/lucene/index/SerialMergeScheduler.java (revision 0) @@ -0,0 +1,39 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +/** A {@link MergeScheduler} that simply does each merge + * sequentially, using the current thread. This is the + * default for {@link IndexWriter}. */ +public class SerialMergeScheduler implements MergeScheduler { + + public void merge(IndexWriter writer, MergePolicy.MergeSpecification spec) + throws CorruptIndexException, IOException { + + final int numMerge = spec.merges.size(); + for(int i=0;i + also trigger one or more segment merges which by default + run (blocking) with the current thread (see below for changing the {@link + MergeScheduler}).

The optional autoCommit argument to the @@ -135,8 +141,20 @@ filesystems like NFS that do not support "delete on last close" semantics, which Lucene's "point in time" search normally relies on.

- */ + +

Expert: IndexWriter allows you to + separately change the {@link MergePolicy} and the {@link + MergeScheduler}. The {@link MergePolicy} is invoked + whenever there are changes to the segments in the index. + Its role is to select which merges to do, if any, and + return a {@link MergePolicy.MergeSpecification} describing + the merges. Default is {@link LogDocMergePolicy}. Then, + the {@link MergeScheduler} is invoked with the requested + merges and it decides when and how to run the merges. + Default is {@link SerialMergeScheduler}.

+*/ + /* * Clarification: Check Points (and commits) * Being able to set autoCommit=false allows IndexWriter to flush and @@ -177,9 +195,10 @@ public static final String WRITE_LOCK_NAME = "write.lock"; /** - * Default value is 10. Change using {@link #setMergeFactor(int)}. + * @deprecated + * @see LogMergePolicy#DEFAULT_MERGE_FACTOR */ - public final static int DEFAULT_MERGE_FACTOR = 10; + public final static int DEFAULT_MERGE_FACTOR = LogMergePolicy.DEFAULT_MERGE_FACTOR; /** * Default value is 10. Change using {@link #setMaxBufferedDocs(int)}. @@ -205,9 +224,10 @@ public final static int DEFAULT_MAX_BUFFERED_DELETE_TERMS = 1000; /** - * Default value is {@link Integer#MAX_VALUE}. Change using {@link #setMaxMergeDocs(int)}. + * @deprecated + * @see: LogMergePolicy.DEFAULT_MAX_MERGE_DOCS */ - public final static int DEFAULT_MAX_MERGE_DOCS = Integer.MAX_VALUE; + public final static int DEFAULT_MAX_MERGE_DOCS = LogDocMergePolicy.DEFAULT_MAX_MERGE_DOCS; /** * Default value is 10,000. Change using {@link #setMaxFieldLength(int)}. @@ -239,7 +259,7 @@ private boolean localAutoCommit; // saved autoCommit during local transaction private boolean autoCommit = true; // false if we should commit only on close - SegmentInfos segmentInfos = new SegmentInfos(); // the segments + private SegmentInfos segmentInfos = new SegmentInfos(); // the segments private DocumentsWriter docWriter; private IndexFileDeleter deleter; @@ -257,15 +277,14 @@ private HashMap bufferedDeleteTerms = new HashMap(); private int numBufferedDeleteTerms = 0; - /** Use compound file setting. Defaults to true, minimizing the number of - * files used. Setting this to false may improve indexing performance, but - * may also cause file handle problems. - */ - private boolean useCompoundFile = true; - private boolean closeDir; private boolean closed; + private boolean closing; + private MergePolicy mergePolicy = new LogDocMergePolicy(); + private MergeScheduler mergeScheduler = new SerialMergeScheduler(); + + /** * Used internally to throw an {@link * AlreadyClosedException} if this IndexWriter has been @@ -278,23 +297,53 @@ } } - /** Get the current setting of whether to use the compound file format. - * Note that this just returns the value you set with setUseCompoundFile(boolean) - * or the default. You cannot use this to query the status of an existing index. + /** + * Casts current mergePolicy to LogMergePolicy, and throws + * an exception if the mergePolicy is not a LogMergePolicy. + */ + private LogMergePolicy getLogMergePolicy() { + if (mergePolicy instanceof LogMergePolicy) + return (LogMergePolicy) mergePolicy; + else + throw new IllegalArgumentException("this method can only be called when the merge policy is the default LogMergePolicy"); + } + + private LogDocMergePolicy getLogDocMergePolicy() { + if (mergePolicy instanceof LogDocMergePolicy) + return (LogDocMergePolicy) mergePolicy; + else + throw new IllegalArgumentException("this method can only be called when the merge policy is LogDocMergePolicy"); + } + + /**

Get the current setting of whether newly flushed + * segments will use the compound file format. Note that + * this just returns the value previously set with + * setUseCompoundFile(boolean), or the default value + * (true). You cannot use this to query the status of + * previously flushed segments.

+ * + *

Note that this method is a convenience method: it + * just calls mergePolicy.getUseCompoundFile as long as + * mergePolicy is an instance of {@link LogMergePolicy}. + * Otherwise an IllegalArgumentException is thrown.

+ * * @see #setUseCompoundFile(boolean) */ public boolean getUseCompoundFile() { - ensureOpen(); - return useCompoundFile; + return getLogMergePolicy().getUseCompoundFile(); } - /** Setting to turn on usage of a compound file. When on, multiple files - * for each segment are merged into a single file once the segment creation - * is finished. This is done regardless of what directory is in use. + /**

Setting to turn on usage of a compound file. When on, + * multiple files for each segment are merged into a + * single file when a new segment is flushed.

+ * + *

Note that this method is a convenience method: it + * just calls mergePolicy.setUseCompoundFile as long as + * mergePolicy is an instance of {@link LogMergePolicy}. + * Otherwise an IllegalArgumentException is thrown.

*/ public void setUseCompoundFile(boolean value) { - ensureOpen(); - useCompoundFile = value; + getLogMergePolicy().setUseCompoundFile(value); } /** Expert: Set the Similarity implementation used by this IndexWriter. @@ -652,26 +701,79 @@ } } + /** + * Set the merge policy used by this IndexWriter + */ + public void setMergePolicy(MergePolicy mp) { + ensureOpen(); + if (mp == null) + throw new NullPointerException("MergePolicy must be non-null"); + + if (mergePolicy != mp) + mergePolicy.close(); + mergePolicy = mp; + } + + /** + * Returns the current MergePolicy in use by this writer. + * @see #setMergePolicy + */ + public MergePolicy getMergePolicy() { + ensureOpen(); + return mergePolicy; + } + + /** + * Set the merger used by this IndexWriter + */ + public void setMergeScheduler(MergeScheduler mergeScheduler) throws CorruptIndexException, IOException { + ensureOpen(); + if (mergeScheduler == null) + throw new NullPointerException("MergeScheduler must be non-null"); + + if (this.mergeScheduler != mergeScheduler) + this.mergeScheduler.close(); + this.mergeScheduler = mergeScheduler; + } + + /** + * Returns the current MergePolicy in use by this writer. + * @see #setMergePolicy + */ + public MergeScheduler getMergeScheduler() { + ensureOpen(); + return mergeScheduler; + } + /** Determines the largest number of documents ever merged by addDocument(). * Small values (e.g., less than 10,000) are best for interactive indexing, * as this limits the length of pauses while indexing to a few seconds. * Larger values are best for batched indexing and speedier searches. * *

The default value is {@link Integer#MAX_VALUE}. + * + *

Note that this method is a convenience method: it + * just calls mergePolicy.setMaxMergeDocs as long as + * mergePolicy is an instance of {@link LogMergePolicy}. + * Otherwise an IllegalArgumentException is thrown.

*/ public void setMaxMergeDocs(int maxMergeDocs) { - ensureOpen(); - this.maxMergeDocs = maxMergeDocs; + getLogDocMergePolicy().setMaxMergeDocs(maxMergeDocs); } - /** + /** * Returns the largest number of documents allowed in a * single segment. + * + *

Note that this method is a convenience method: it + * just calls mergePolicy.getMaxMergeDocs as long as + * mergePolicy is an instance of {@link LogMergePolicy}. + * Otherwise an IllegalArgumentException is thrown.

+ * * @see #setMaxMergeDocs */ public int getMaxMergeDocs() { - ensureOpen(); - return maxMergeDocs; + return getLogDocMergePolicy().getMaxMergeDocs(); } /** @@ -796,24 +898,31 @@ * for batch index creation, and smaller values (< 10) for indices that are * interactively maintained. * + *

Note that this method is a convenience method: it + * just calls mergePolicy.setMergeFactor as long as + * mergePolicy is an instance of {@link LogMergePolicy}. + * Otherwise an IllegalArgumentException is thrown.

+ * *

This must never be less than 2. The default value is 10. */ public void setMergeFactor(int mergeFactor) { - ensureOpen(); - if (mergeFactor < 2) - throw new IllegalArgumentException("mergeFactor cannot be less than 2"); - this.mergeFactor = mergeFactor; + getLogMergePolicy().setMergeFactor(mergeFactor); } /** - * Returns the number of segments that are merged at once - * and also controls the total number of segments allowed - * to accumulate in the index. + *

Returns the number of segments that are merged at + * once and also controls the total number of segments + * allowed to accumulate in the index.

+ * + *

Note that this method is a convenience method: it + * just calls mergePolicy.getMergeFactor as long as + * mergePolicy is an instance of {@link LogMergePolicy}. + * Otherwise an IllegalArgumentException is thrown.

+ * * @see #setMergeFactor */ public int getMergeFactor() { - ensureOpen(); - return mergeFactor; + return getLogMergePolicy().getMergeFactor(); } /** If non-null, this will be the default infoStream used @@ -922,37 +1031,66 @@ * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public synchronized void close() throws CorruptIndexException, IOException { - if (!closed) { - flush(true, true); + public void close() throws CorruptIndexException, IOException { - if (commitPending) { - segmentInfos.write(directory); // now commit changes - if (infoStream != null) - infoStream.println("close: wrote segments file \"" + segmentInfos.getCurrentSegmentFileName() + "\""); - deleter.checkpoint(segmentInfos, true); - commitPending = false; - rollbackSegmentInfos = null; - } + boolean doClose; + synchronized(this) { + // Ensure that only one thread actually gets to do the closing: + if (!closing) { + doClose = true; + closing = true; + } else + doClose = false; + } - if (writeLock != null) { - writeLock.release(); // release write lock - writeLock = null; + if (doClose) { + try { + flush(true, true); + + mergePolicy.close(); + mergeScheduler.close(); + + if (commitPending) { + segmentInfos.write(directory); // now commit changes + if (infoStream != null) + infoStream.println("close: wrote segments file \"" + segmentInfos.getCurrentSegmentFileName() + "\""); + synchronized(this) { + deleter.checkpoint(segmentInfos, true); + } + commitPending = false; + rollbackSegmentInfos = null; + } + + if (writeLock != null) { + writeLock.release(); // release write lock + writeLock = null; + } + closed = true; + docWriter = null; + + synchronized(this) { + deleter.close(); + } + + if(closeDir) + directory.close(); + } finally { + if (!closed) + closing = false; } - closed = true; - docWriter = null; - - if(closeDir) - directory.close(); } } /** Tells the docWriter to close its currently open shared - * doc stores (stored fields & vectors files). */ - private void flushDocStores() throws IOException { + * doc stores (stored fields & vectors files). + * Return value specifices whether new doc store files are compound or not. + */ + private synchronized boolean flushDocStores() throws IOException { List files = docWriter.files(); + boolean useCompoundDocStore = false; + if (files.size() > 0) { String docStoreSegment; @@ -965,16 +1103,18 @@ docWriter.abort(); } - if (useCompoundFile && docStoreSegment != null) { + useCompoundDocStore = mergePolicy.useCompoundDocStore(segmentInfos); + + if (useCompoundDocStore && docStoreSegment != null) { // Now build compound doc store file - checkpoint(); success = false; final int numSegments = segmentInfos.size(); + final String compoundFileName = docStoreSegment + "." + IndexFileNames.COMPOUND_FILE_STORE_EXTENSION; try { - CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, docStoreSegment + "." + IndexFileNames.COMPOUND_FILE_STORE_EXTENSION); + CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName); final int size = files.size(); for(int i=0;i * - *

The amount of free space required when a merge is - * triggered is up to 1X the size of all segments being - * merged, when no readers/searchers are open against the - * index, and up to 2X the size of all segments being - * merged when readers/searchers are open against the - * index (see {@link #optimize()} for details). Most - * merges are small (merging the smallest segments - * together), but whenever a full merge occurs (all - * segments in the index, which is the worst case for - * temporary space usage) then the maximum free disk space - * required is the same as {@link #optimize}.

+ *

The amount of free space required when a merge is triggered is + * up to 1X the size of all segments being merged, when no + * readers/searchers are open against the index, and up to 2X the + * size of all segments being merged when readers/searchers are open + * against the index (see {@link #optimize()} for details). The + * sequence of primitive merge operations performed is governed by + * the merge policy. * *

Note that each term in the document can be no longer * than 16383 characters, otherwise an @@ -1117,14 +1257,27 @@ */ public void addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { ensureOpen(); + boolean doFlush = false; boolean success = false; try { - success = docWriter.addDocument(doc, analyzer); - } catch (IOException ioe) { - deleter.refresh(); - throw ioe; + doFlush = docWriter.addDocument(doc, analyzer); + success = true; + } finally { + if (!success) { + synchronized (this) { + bufferedDeleteTerms.clear(); + numBufferedDeleteTerms = 0; + + // If docWriter has some aborted files, then we + // clean them up here + final List files = docWriter.abortedFiles(); + if (files != null) { + deleter.deleteFiles(files); + } + } + } } - if (success) + if (doFlush) flush(true, false); } @@ -1134,9 +1287,11 @@ * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public synchronized void deleteDocuments(Term term) throws CorruptIndexException, IOException { + public void deleteDocuments(Term term) throws CorruptIndexException, IOException { ensureOpen(); - bufferDeleteTerm(term); + synchronized(this) { + bufferDeleteTerm(term); + } maybeFlush(); } @@ -1148,10 +1303,12 @@ * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public synchronized void deleteDocuments(Term[] terms) throws CorruptIndexException, IOException { + public void deleteDocuments(Term[] terms) throws CorruptIndexException, IOException { ensureOpen(); - for (int i = 0; i < terms.length; i++) { - bufferDeleteTerm(terms[i]); + synchronized(this) { + for (int i = 0; i < terms.length; i++) { + bufferDeleteTerm(terms[i]); + } } maybeFlush(); } @@ -1192,14 +1349,26 @@ synchronized (this) { bufferDeleteTerm(term); } + boolean doFlush = false; boolean success = false; try { - success = docWriter.addDocument(doc, analyzer); - } catch (IOException ioe) { - deleter.refresh(); - throw ioe; + doFlush = docWriter.addDocument(doc, analyzer); + success = true; + } finally { + if (!success) { + synchronized (this) { + bufferedDeleteTerms.clear(); + numBufferedDeleteTerms = 0; + + // If docWriter has some aborted files, then we + // clean them up here + final List files = docWriter.abortedFiles(); + if (files != null) + deleter.deleteFiles(files); + } + } } - if (success) + if (doFlush) flush(true, false); else maybeFlush(); @@ -1228,51 +1397,33 @@ return "_" + Integer.toString(segmentInfos.counter++, Character.MAX_RADIX); } - /** Determines how often segment indices are merged by addDocument(). With - * smaller values, less RAM is used while indexing, and searches on - * unoptimized indices are faster, but indexing speed is slower. With larger - * values, more RAM is used during indexing, and while searches on unoptimized - * indices are slower, indexing is faster. Thus larger values (> 10) are best - * for batch index creation, and smaller values (< 10) for indices that are - * interactively maintained. - * - *

This must never be less than 2. The default value is {@link #DEFAULT_MERGE_FACTOR}. - - */ - private int mergeFactor = DEFAULT_MERGE_FACTOR; - /** Determines amount of RAM usage by the buffered docs at * which point we trigger a flush to the index. */ private double ramBufferSize = DEFAULT_RAM_BUFFER_SIZE_MB*1024F*1024F; - /** Determines the largest number of documents ever merged by addDocument(). - * Small values (e.g., less than 10,000) are best for interactive indexing, - * as this limits the length of pauses while indexing to a few seconds. - * Larger values are best for batched indexing and speedier searches. - * - *

The default value is {@link #DEFAULT_MAX_MERGE_DOCS}. - - */ - private int maxMergeDocs = DEFAULT_MAX_MERGE_DOCS; - /** If non-null, information about merges will be printed to this. */ private PrintStream infoStream = null; - private static PrintStream defaultInfoStream = null; - /** Merges all segments together into a single segment, - * optimizing an index for search. + /** + * Requests an "optimize" operation on an index, priming the index + * for the fastest available search. Traditionally this has meant + * merging all segments into a single segment as is done in the + * default merge policy, but individaul merge policies may implement + * optimize in different ways. * + * @see LogDocMergePolicy#optimize(SegmentInfos) + * *

It is recommended that this method be called upon completion of indexing. In * environments with frequent updates, optimize is best done during low volume times, if at all. * *

*

See http://www.gossamer-threads.com/lists/lucene/java-dev/47895 for more discussion.

* - *

Note that this requires substantial temporary free + *

Note that this can require substantial temporary free * space in the Directory (see LUCENE-764 * for details):

@@ -1310,7 +1461,7 @@ *

The actual temporary usage could be much less than * these figures (it depends on many factors).

* - *

Once the optimize completes, the total size of the + *

In general, once the optimize completes, the total size of the * index will be less than the size of the starting index. * It could be quite a bit smaller (if there were many * pending deletes) or just slightly smaller.

@@ -1327,21 +1478,48 @@ * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public synchronized void optimize() throws CorruptIndexException, IOException { + public void optimize() throws CorruptIndexException, IOException { ensureOpen(); flush(); - while (segmentInfos.size() > 1 || - (segmentInfos.size() == 1 && - (SegmentReader.hasDeletions(segmentInfos.info(0)) || - SegmentReader.hasSeparateNorms(segmentInfos.info(0)) || - segmentInfos.info(0).dir != directory || - (useCompoundFile && - !segmentInfos.info(0).getUseCompoundFile())))) { - int minSegment = segmentInfos.size() - mergeFactor; - mergeSegments(minSegment < 0 ? 0 : minSegment, segmentInfos.size()); + + // Currently hardwired to 1, but once we add method to + // IndexWriter to allow "optimizing to <= N segments" + // then we will change this. + final int maxSegmentCount = 1; + + // Repeat until merge policy stops returning merges: + while(true) { + MergePolicy.MergeSpecification spec; + synchronized(this) { + spec = mergePolicy.findMergesForOptimize(segmentInfos, this, maxSegmentCount); + } + if (spec != null) + mergeScheduler.merge(this, spec); + else + break; } } + /** + * Expert: asks the mergePolicy whether any merges are + * necessary now and if so, runs the requested merges and + * then iterate (test again if merges are needed) until no + * more merges are returned by the mergePolicy. + * + * Explicit calls to maybeMerge() are usually not + * necessary. The most common case is when merge policy + * parameters have changed. + */ + + private final void maybeMerge() throws CorruptIndexException, IOException { + final MergePolicy.MergeSpecification spec; + synchronized(this) { + spec = mergePolicy.findMerges(segmentInfos, this); + } + if (spec != null) + mergeScheduler.merge(this, spec); + } + /* * Begin a transaction. During a transaction, any segment * merges that happen (or ram segments flushed) will not @@ -1455,18 +1633,19 @@ segmentInfos.clear(); segmentInfos.addAll(rollbackSegmentInfos); + // discard any buffered delete terms so they aren't applied in close() + bufferedDeleteTerms.clear(); + numBufferedDeleteTerms = 0; + docWriter.abort(); // Ask deleter to locate unreferenced files & remove // them: deleter.checkpoint(segmentInfos, false); + deleter.refresh(); - bufferedDeleteTerms.clear(); - numBufferedDeleteTerms = 0; - commitPending = false; - docWriter.abort(); close(); } else { @@ -1481,7 +1660,7 @@ * commit the change immediately. Else, we mark * commitPending. */ - private void checkpoint() throws IOException { + private synchronized void checkpoint() throws IOException { if (autoCommit) { segmentInfos.write(directory); if (infoStream != null) @@ -1541,7 +1720,7 @@ throws CorruptIndexException, IOException { ensureOpen(); - optimize(); // start with zero or 1 seg + flush(); int start = segmentInfos.size(); @@ -1558,15 +1737,8 @@ } } - // merge newly added segments in log(n) passes - while (segmentInfos.size() > start+mergeFactor) { - for (int base = start; base < segmentInfos.size(); base++) { - int end = Math.min(segmentInfos.size(), base+mergeFactor); - if (end-base > 1) { - mergeSegments(base, end); - } - } - } + optimize(); + success = true; } finally { if (success) { @@ -1575,8 +1747,6 @@ rollbackTransaction(); } } - - optimize(); // final cleanup } /** @@ -1598,40 +1768,10 @@ */ public synchronized void addIndexesNoOptimize(Directory[] dirs) throws CorruptIndexException, IOException { - // Adding indexes can be viewed as adding a sequence of segments S to - // a sequence of segments T. Segments in T follow the invariants but - // segments in S may not since they could come from multiple indexes. - // Here is the merge algorithm for addIndexesNoOptimize(): - // - // 1 Flush ram. - // 2 Consider a combined sequence with segments from T followed - // by segments from S (same as current addIndexes(Directory[])). - // 3 Assume the highest level for segments in S is h. Call - // maybeMergeSegments(), but instead of starting w/ lowerBound = -1 - // and upperBound = maxBufferedDocs, start w/ lowerBound = -1 and - // upperBound = upperBound of level h. After this, the invariants - // are guaranteed except for the last < M segments whose levels <= h. - // 4 If the invariants hold for the last < M segments whose levels <= h, - // if some of those < M segments are from S (not merged in step 3), - // properly copy them over*, otherwise done. - // Otherwise, simply merge those segments. If the merge results in - // a segment of level <= h, done. Otherwise, it's of level h+1 and call - // maybeMergeSegments() starting w/ upperBound = upperBound of level h+1. - // - // * Ideally, we want to simply copy a segment. However, directory does - // not support copy yet. In addition, source may use compound file or not - // and target may use compound file or not. So we use mergeSegments() to - // copy a segment, which may cause doc count to change because deleted - // docs are garbage collected. - // 1 flush ram - ensureOpen(); flush(); - // 2 copy segment infos and find the highest level from dirs - int startUpperBound = docWriter.getMaxBufferedDocs(); - /* new merge policy if (startUpperBound == 0) startUpperBound = 10; @@ -1654,64 +1794,20 @@ for (int j = 0; j < sis.size(); j++) { SegmentInfo info = sis.info(j); segmentInfos.addElement(info); // add each info - - while (startUpperBound < info.docCount) { - startUpperBound *= mergeFactor; // find the highest level from dirs - if (startUpperBound > maxMergeDocs) { - // upper bound cannot exceed maxMergeDocs - throw new IllegalArgumentException("Upper bound cannot exceed maxMergeDocs"); - } - } } } - // 3 maybe merge segments starting from the highest level from dirs - maybeMergeSegments(startUpperBound); + maybeMerge(); - // get the tail segments whose levels <= h - int segmentCount = segmentInfos.size(); - int numTailSegments = 0; - while (numTailSegments < segmentCount - && startUpperBound >= segmentInfos.info(segmentCount - 1 - numTailSegments).docCount) { - numTailSegments++; - } - if (numTailSegments == 0) { - success = true; - return; - } + // If after merging there remain segments in the index + // that are in a different directory, just copy these + // over into our index. This is necessary (before + // finishing the transaction) to avoid leaving the + // index in an unusable (inconsistent) state. + copyExternalSegments(); - // 4 make sure invariants hold for the tail segments whose levels <= h - if (checkNonDecreasingLevels(segmentCount - numTailSegments)) { - // identify the segments from S to be copied (not merged in 3) - int numSegmentsToCopy = 0; - while (numSegmentsToCopy < segmentCount - && directory != segmentInfos.info(segmentCount - 1 - numSegmentsToCopy).dir) { - numSegmentsToCopy++; - } - if (numSegmentsToCopy == 0) { - success = true; - return; - } + success = true; - // copy those segments from S - for (int i = segmentCount - numSegmentsToCopy; i < segmentCount; i++) { - mergeSegments(i, i + 1); - } - if (checkNonDecreasingLevels(segmentCount - numSegmentsToCopy)) { - success = true; - return; - } - } - - // invariants do not hold, simply merge those segments - mergeSegments(segmentCount - numTailSegments, segmentCount); - - // maybe merge segments again if necessary - if (segmentInfos.info(segmentInfos.size() - 1).docCount > startUpperBound) { - maybeMergeSegments(startUpperBound * mergeFactor); - } - - success = true; } finally { if (success) { commitTransaction(); @@ -1721,6 +1817,30 @@ } } + /* If any of our segments are using a directory != ours + * then copy them over */ + private synchronized void copyExternalSegments() throws CorruptIndexException, IOException { + final int numSegments = segmentInfos.size(); + for(int i=0;iAfter this completes, the index is optimized.

*

The provided IndexReaders are not closed.

@@ -1785,7 +1905,7 @@ } } - if (useCompoundFile) { + if (mergePolicy instanceof LogMergePolicy && getUseCompoundFile()) { boolean success = false; @@ -1804,40 +1924,6 @@ } } - // Overview of merge policy: - // - // A flush is triggered either by close() or by the number of ram segments - // reaching maxBufferedDocs. After a disk segment is created by the flush, - // further merges may be triggered. - // - // LowerBound and upperBound set the limits on the doc count of a segment - // which may be merged. Initially, lowerBound is set to 0 and upperBound - // to maxBufferedDocs. Starting from the rightmost* segment whose doc count - // > lowerBound and <= upperBound, count the number of consecutive segments - // whose doc count <= upperBound. - // - // Case 1: number of worthy segments < mergeFactor, no merge, done. - // Case 2: number of worthy segments == mergeFactor, merge these segments. - // If the doc count of the merged segment <= upperBound, done. - // Otherwise, set lowerBound to upperBound, and multiply upperBound - // by mergeFactor, go through the process again. - // Case 3: number of worthy segments > mergeFactor (in the case mergeFactor - // M changes), merge the leftmost* M segments. If the doc count of - // the merged segment <= upperBound, consider the merged segment for - // further merges on this same level. Merge the now leftmost* M - // segments, and so on, until number of worthy segments < mergeFactor. - // If the doc count of all the merged segments <= upperBound, done. - // Otherwise, set lowerBound to upperBound, and multiply upperBound - // by mergeFactor, go through the process again. - // Note that case 2 can be considerd as a special case of case 3. - // - // This merge policy guarantees two invariants if M does not change and - // segment doc count is not reaching maxMergeDocs: - // B for maxBufferedDocs, f(n) defined as ceil(log_M(ceil(n/B))) - // 1: If i (left*) and i+1 (right*) are two consecutive segments of doc - // counts x and y, then f(x) >= f(y). - // 2: The number of committed segments on the same level (f(n)) <= M. - // This is called after pending added and deleted // documents have been flushed to the Directory but before // the change is committed (new segments_N file written). @@ -1850,12 +1936,16 @@ * buffered added documents or buffered deleted terms are * large enough. */ - protected final synchronized void maybeFlush() throws CorruptIndexException, IOException { + protected final void maybeFlush() throws CorruptIndexException, IOException { // We only check for flush due to number of buffered // delete terms, because triggering of a flush due to // too many added documents is handled by // DocumentsWriter - if (numBufferedDeleteTerms >= maxBufferedDeleteTerms && docWriter.setFlushPending()) + boolean doFlush; + synchronized(this) { + doFlush = numBufferedDeleteTerms >= maxBufferedDeleteTerms && docWriter.setFlushPending(); + } + if (doFlush) flush(true, false); } @@ -1867,7 +1957,7 @@ * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public final synchronized void flush() throws CorruptIndexException, IOException { + public final void flush() throws CorruptIndexException, IOException { flush(true, false); } @@ -1879,9 +1969,14 @@ * @param flushDocStores if false we are allowed to keep * doc stores open to share with the next segment */ - protected final synchronized void flush(boolean triggerMerge, boolean flushDocStores) throws CorruptIndexException, IOException { + protected final void flush(boolean triggerMerge, boolean flushDocStores) throws CorruptIndexException, IOException { ensureOpen(); + if (doFlush(flushDocStores) && triggerMerge) + maybeMerge(); + } + private synchronized final boolean doFlush(boolean flushDocStores) throws CorruptIndexException, IOException { + // Make sure no threads are actively adding a document docWriter.pauseAllThreads(); @@ -1911,10 +2006,14 @@ boolean flushDeletes = bufferedDeleteTerms.size() > 0; if (infoStream != null) - infoStream.println(" flush: flushDocs=" + flushDocs + + infoStream.println(" flush: segment=" + docWriter.getSegment() + + " docStoreSegment=" + docWriter.getDocStoreSegment() + + " docStoreOffset=" + docWriter.getDocStoreOffset() + + " flushDocs=" + flushDocs + " flushDeletes=" + flushDeletes + " flushDocStores=" + flushDocStores + - " numDocs=" + numDocs); + " numDocs=" + numDocs + + " numBufDelTerms=" + numBufferedDeleteTerms); int docStoreOffset = docWriter.getDocStoreOffset(); boolean docStoreIsCompoundFile = false; @@ -1927,13 +2026,15 @@ if (infoStream != null) infoStream.println(" flush shared docStore segment " + docStoreSegment); - flushDocStores(); + docStoreIsCompoundFile = flushDocStores(); flushDocStores = false; - docStoreIsCompoundFile = useCompoundFile; } String segment = docWriter.getSegment(); + // If we are flushing docs, segment must not be null: + assert segment != null || !flushDocs; + if (flushDocs || flushDeletes) { SegmentInfos rollback = null; @@ -1985,7 +2086,19 @@ success = true; } finally { if (!success) { + if (flushDeletes) { + + // Carefully check if any partial .del files + // should be removed: + final int size = rollback.size(); + for(int i=0;i= 0) { - SegmentInfo si = segmentInfos.info(minSegment); + final int numSegments = segmentInfos.size(); + + final int numSegmentsToMerge = merge.segments.size(); + for(int i=0;i lowerBound && si.docCount <= upperBound) { - // start from the rightmost* segment whose doc count is in bounds - maxSegment = minSegment; - } else if (si.docCount > upperBound) { - // until the segment whose doc count exceeds upperBound - break; - } + if (first + i >= numSegments || !segmentInfos.info(first+i).equals(info)) { + if (segmentInfos.indexOf(info) == -1) + throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.name + ") that is not in the index"); + else + throw new MergePolicy.MergeException("MergePolicy selected non-contiguous segments to merge (" + merge + " vs " + segString() + "), which IndexWriter (currently) cannot handle"); } + } - minSegment++; - maxSegment++; - int numSegments = maxSegment - minSegment; + return first; + } - if (numSegments < mergeFactor) { - break; - } else { - boolean exceedsUpperLimit = false; + /* FIXME if we want to support non-contiguous segment merges */ + synchronized private boolean commitMerge(MergePolicy.OneMerge merge) throws IOException { - // number of merge-worthy segments may exceed mergeFactor when - // mergeFactor and/or maxBufferedDocs change(s) - while (numSegments >= mergeFactor) { - // merge the leftmost* mergeFactor segments + // If merge was explicitly aborted, or, if abort() or + // rollbackTransaction() had been called since our merge + // started, we abort this merge + if (merge.isAborted() || deleter.getFullRefreshCount() != merge.startFullRefreshCount) { - int docCount = mergeSegments(minSegment, minSegment + mergeFactor); - numSegments -= mergeFactor; + if (infoStream != null) { + if (merge.isAborted()) + infoStream.println("commitMerge: skipping merge " + merge.segString(directory) + ": it was aborted"); + if (deleter.getFullRefreshCount() != merge.startFullRefreshCount) + infoStream.println("commitMerge: skipping merge " + merge.segString(directory) + ": deleter.refresh() was called during this merge"); + } - if (docCount > upperBound) { - // continue to merge the rest of the worthy segments on this level - minSegment++; - exceedsUpperLimit = true; - } else { - // if the merged segment does not exceed upperBound, consider - // this segment for further merges on this same level - numSegments++; + assert merge.increfDone; + decrefMergeSegments(merge); + deleter.refresh(merge.info.name); + return false; + } + + boolean success = false; + + int start; + + try { + SegmentInfos sourceSegmentsClone = merge.segmentsClone; + SegmentInfos sourceSegments = merge.segments; + final int numSegments = segmentInfos.size(); + + start = ensureContiguousMerge(merge); + if (infoStream != null) + infoStream.println("commitMerge " + merge.segString(directory)); + + // Carefully merge deletes that occurred after we + // started merging: + + BitVector deletes = null; + int docUpto = 0; + + final int numSegmentsToMerge = sourceSegments.size(); + for(int i=0;i minSegment; i--) // remove old infos & add new - segmentInfos.remove(i); + merger = new SegmentMerger(this, mergedName); - segmentInfos.set(minSegment, newSegment); + // This is try/finally to make sure merger's readers are + // closed: - checkpoint(); + boolean success = false; - success = true; + try { + int totDocCount = 0; + for (int i = 0; i < numSegments; i++) { + SegmentInfo si = sourceSegmentsClone.info(i); + if (infoStream != null) + infoStream.print(" " + si.name + " (" + si.docCount + " docs)"); + IndexReader reader = SegmentReader.get(si, MERGE_READ_BUFFER_SIZE, merge.mergeDocStores); // no need to set deleter (yet) + merger.add(reader); + if (infoStream != null) + totDocCount += reader.numDocs(); + } + if (infoStream != null) { + infoStream.println(" into "+mergedName+" ("+totDocCount+" docs)"); + } - } finally { - if (!success) { - if (rollback != null) { - // Rollback the individual SegmentInfo - // instances, but keep original SegmentInfos - // instance (so we don't try to write again the - // same segments_N file -- write once): - segmentInfos.clear(); - segmentInfos.addAll(rollback); - } + mergedDocCount = merge.info.docCount = merger.merge(merge.mergeDocStores); - // Delete any partially created and now unreferenced files: - deleter.refresh(); - } - } + if (infoStream != null) + assert mergedDocCount == totDocCount; + + success = true; + } finally { - // close readers before we attempt to delete now-obsolete segments + // close readers before we attempt to delete + // now-obsolete segments if (merger != null) { merger.closeReaders(); } + if (!success) { + synchronized(this) { + deleter.refresh(mergedName); + } + } } - // Give deleter a chance to remove files now. - deleter.checkpoint(segmentInfos, autoCommit); + if (!commitMerge(merge)) + // commitMerge will return false if this merge was aborted + return 0; - if (useCompoundFile) { + if (merge.useCompoundFile) { + + success = false; + boolean skip = false; + final String compoundFileName = mergedName + "." + IndexFileNames.COMPOUND_FILE_EXTENSION; - boolean success = false; - try { + try { + merger.createCompoundFile(compoundFileName); + success = true; + } catch (IOException ioe) { + synchronized(this) { + if (segmentInfos.indexOf(merge.info) == -1) { + // If another merge kicked in and merged our + // new segment away while we were trying to + // build the compound file, we can hit a + // FileNotFoundException and possibly + // IOException over NFS. We can tell this has + // happened because our SegmentInfo is no + // longer in the segments; if this has + // happened it is safe to ignore the exception + // & skip finishing/committing our compound + // file creating. + skip = true; + } else + throw ioe; + } + } + } finally { + if (!success) { + synchronized(this) { + deleter.deleteFile(compoundFileName); + } + } + } - merger.createCompoundFile(mergedName + ".cfs"); - newSegment.setUseCompoundFile(true); - checkpoint(); - success = true; + if (!skip) { - } finally { - if (!success) { - // Must rollback: - newSegment.setUseCompoundFile(false); - deleter.refresh(); + synchronized(this) { + if (skip || segmentInfos.indexOf(merge.info) == -1 || merge.isAborted()) { + // Our segment (committed in non-compound + // format) got merged away while we were + // building the compound format. + deleter.deleteFile(compoundFileName); + } else { + success = false; + try { + merge.info.setUseCompoundFile(true); + checkpoint(); + success = true; + } finally { + if (!success) { + // Must rollback: + merge.info.setUseCompoundFile(false); + deletePartialSegmentsFile(); + deleter.deleteFile(compoundFileName); + } + } + + // Give deleter a chance to remove files now. + deleter.checkpoint(segmentInfos, autoCommit); + } } } - - // Give deleter a chance to remove files now. - deleter.checkpoint(segmentInfos, autoCommit); } return mergedDocCount; } + private void deletePartialSegmentsFile() throws IOException { + if (segmentInfos.getLastGeneration() != segmentInfos.getGeneration()) { + String segmentFileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS, + "", + segmentInfos.getGeneration()); + deleter.deleteFile(segmentFileName); + } + } + // Called during flush to apply any buffered deletes. If // flushedNewSegment is true then a new segment was just // created and flushed from the ram segments, so we will @@ -2385,29 +2790,6 @@ } } - private final boolean checkNonDecreasingLevels(int start) { - int lowerBound = -1; - int upperBound = docWriter.getMaxBufferedDocs(); - - /* new merge policy - if (upperBound == 0) - upperBound = 10; - */ - - for (int i = segmentInfos.size() - 1; i >= start; i--) { - int docCount = segmentInfos.info(i).docCount; - if (docCount <= lowerBound) { - return false; - } - - while (docCount > upperBound) { - lowerBound = upperBound; - upperBound *= mergeFactor; - } - } - return true; - } - // For test purposes. final synchronized int getBufferedDeleteTermsSize() { return bufferedDeleteTerms.size(); @@ -2484,7 +2866,24 @@ Iterator iter = deleteTerms.entrySet().iterator(); while (iter.hasNext()) { Entry entry = (Entry) iter.next(); - reader.deleteDocuments((Term) entry.getKey()); + final int count = reader.deleteDocuments((Term) entry.getKey()); } } + + // utility routines for tests + SegmentInfo newestSegment() { + return segmentInfos.info(segmentInfos.size()-1); + } + + public synchronized String segString() { + StringBuffer buffer = new StringBuffer(); + for(int i = 0; i < segmentInfos.size(); i++) { + if (i > 0) { + buffer.append(' '); + } + buffer.append(segmentInfos.info(i).segString(directory)); + } + + return buffer.toString(); + } } Index: src/java/org/apache/lucene/index/IndexFileDeleter.java =================================================================== --- src/java/org/apache/lucene/index/IndexFileDeleter.java (revision 573633) +++ src/java/org/apache/lucene/index/IndexFileDeleter.java (working copy) @@ -100,12 +100,18 @@ private IndexDeletionPolicy policy; private DocumentsWriter docWriter; + private int fullRefreshCount; // tracks how many times refresh(null) has been called + + int getFullRefreshCount() { + return fullRefreshCount; + } + void setInfoStream(PrintStream infoStream) { this.infoStream = infoStream; } private void message(String message) { - infoStream.println(this + " " + Thread.currentThread().getName() + ": " + message); + infoStream.println("Deleter [" + Thread.currentThread().getName() + "]: " + message); } /** @@ -275,25 +281,61 @@ * Writer calls this when it has hit an error and had to * roll back, to tell us that there may now be * unreferenced files in the filesystem. So we re-list - * the filesystem and delete such files: + * the filesystem and delete such files. If segmentName + * is non-null, we will only delete files corresponding to + * that segment. */ - public void refresh() throws IOException { + public void refresh(String segmentName) throws IOException { + if (segmentName == null) + fullRefreshCount++; String[] files = directory.list(); if (files == null) throw new IOException("cannot read directory " + directory + ": list() returned null"); IndexFileNameFilter filter = IndexFileNameFilter.getFilter(); + String segmentPrefix1; + String segmentPrefix2; + if (segmentName != null) { + segmentPrefix1 = segmentName + "."; + segmentPrefix2 = segmentName + "_"; + } else { + segmentPrefix1 = null; + segmentPrefix2 = null; + } + for(int i=0;i 0; return --count; } } Index: src/java/org/apache/lucene/index/MergeScheduler.java =================================================================== --- src/java/org/apache/lucene/index/MergeScheduler.java (revision 0) +++ src/java/org/apache/lucene/index/MergeScheduler.java (revision 0) @@ -0,0 +1,36 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +/** Expert: {@link IndexWriter} uses an instance + * implementing this interface to execute the merges + * selected by a {@link MergePolicy}. The default + * MergeScheduler is {@link SerialMergeScheduler}. */ + +public interface MergeScheduler { + + /** Run the requested merges from spec. */ + void merge(IndexWriter writer, MergePolicy.MergeSpecification spec) + throws CorruptIndexException, IOException; + + /** Close this MergeScheduler. */ + void close() + throws CorruptIndexException, IOException; +} Property changes on: src/java/org/apache/lucene/index/MergeScheduler.java ___________________________________________________________________ Name: svn:eol-style + native Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java =================================================================== --- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java (revision 573633) +++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java (working copy) @@ -21,6 +21,7 @@ import org.apache.lucene.benchmark.byTask.PerfRunData; import org.apache.lucene.benchmark.byTask.utils.Config; import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.ConcurrentMergeScheduler; import org.apache.lucene.store.Directory; import java.io.IOException; @@ -53,6 +54,7 @@ boolean autoCommit = config.get("autocommit", OpenIndexTask.DEFAULT_AUTO_COMMIT); IndexWriter iw = new IndexWriter(dir, autoCommit, analyzer, true); + iw.setMergeScheduler(new ConcurrentMergeScheduler()); iw.setUseCompoundFile(cmpnd); iw.setMergeFactor(mrgf); Index: contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/LineDocMaker.java =================================================================== --- contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/LineDocMaker.java (revision 573633) +++ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/LineDocMaker.java (working copy) @@ -24,6 +24,7 @@ import org.apache.lucene.document.Field; import java.io.BufferedReader; +import java.io.FileReader; import java.io.IOException; import java.io.FileInputStream; import java.io.InputStreamReader; @@ -81,9 +82,9 @@ public Document setFields(String line) { // title date body - int spot = line.indexOf(SEP); + final int spot = line.indexOf(SEP); titleField.setValue(line.substring(0, spot)); - int spot2 = line.indexOf(SEP, 1+spot); + final int spot2 = line.indexOf(SEP, 1+spot); dateField.setValue(line.substring(1+spot, spot2)); bodyField.setValue(line.substring(1+spot2, line.length())); return doc; @@ -111,12 +112,14 @@ return ds; } + private int lineCount = 0; public Document makeDocument() throws Exception { String line; synchronized(this) { while(true) { line = fileIn.readLine(); + lineCount++; if (line == null) { if (!forever) throw new NoMoreDataException(); @@ -149,8 +152,10 @@ try { if (fileIn != null) fileIn.close(); - fileIS = new FileInputStream(fileName); - fileIn = new BufferedReader(new InputStreamReader(fileIS,"UTF-8"), READER_BUFFER_BYTES); + // nocommit + //fileIS = new FileInputStream(fileName); + //fileIn = new BufferedReader(new InputStreamReader(fileIS,"UTF-8"), READER_BUFFER_BYTES); + fileIn = new BufferedReader(new FileReader(fileName), READER_BUFFER_BYTES); } catch (IOException e) { throw new RuntimeException(e); }