Index: CHANGES.txt =================================================================== --- CHANGES.txt (revision 574266) +++ CHANGES.txt (working copy) @@ -28,6 +28,30 @@ termText instead of String. This gives faster tokenization performance (~10-15%). (Mike McCandless) + 5. LUCENE-847: Factor MergePolicy and MergeScheduler out of + IndexWriter. This enables users to change how & when segments are + selected for merging or optimizing (by creating a MergePolicy) as + well as how & when the selected merges are actually performed (by + creating a MergeScheduler). Added new public optimize(boolean + doWait), close(boolean doWait) and maybeMerge() methods to + IndexWriter. (Steven Parkes via Mike McCandless). + + 6. LUCENE-870: Add a ConcurrentMergeScheduler which executes merges + using background threads, up until a max number of threads. Once + there are too many merges, they are performed serially. This + change also improves overall concurrency of IndexWriter for + applications that use multiple threads. For example, optimize is + no longer synchronized (and can run in the background), and + multiple threads from the application can be running merges at + once even when not using ConcurrentMergeScheduler (Steven Parkes + via Mike McCandless). + + 7. LUCENE-845: Added a LogByteSizeMergePolicy which selects merges by + roughly equal size (in bytes) of the segments in the Directory. + This is a better match when you are flushing by RAM usage instead + of document count in IndexWriter or when documents vary + substantially in size (Mike McCandless). + Bug fixes 1. LUCENE-933: QueryParser fixed to not produce empty sub Index: src/test/org/apache/lucene/store/MockRAMOutputStream.java =================================================================== --- src/test/org/apache/lucene/store/MockRAMOutputStream.java (revision 574266) +++ src/test/org/apache/lucene/store/MockRAMOutputStream.java (working copy) @@ -55,7 +55,7 @@ writeBytes(singleByte, 0, 1); } - public void writeBytes(byte[] b, int offset, int len) throws IOException { + public void writeBytes(byte[] b, int offset, int len) throws IOException { long freeSpace = dir.maxSize - dir.sizeInBytes(); long realUsage = 0; Index: src/test/org/apache/lucene/store/MockRAMDirectory.java =================================================================== --- src/test/org/apache/lucene/store/MockRAMDirectory.java (revision 574266) +++ src/test/org/apache/lucene/store/MockRAMDirectory.java (working copy) @@ -195,7 +195,7 @@ * RAMOutputStream.BUFFER_SIZE (now 1024) bytes. */ - final long getRecomputedActualSizeInBytes() { + final synchronized long getRecomputedActualSizeInBytes() { long size = 0; Iterator it = fileMap.values().iterator(); while (it.hasNext()) Index: src/test/org/apache/lucene/index/TestThreadedOptimize.java =================================================================== --- src/test/org/apache/lucene/index/TestThreadedOptimize.java (revision 0) +++ src/test/org/apache/lucene/index/TestThreadedOptimize.java (revision 0) @@ -0,0 +1,160 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.analysis.SimpleAnalyzer; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.MockRAMDirectory; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.util._TestUtil; +import org.apache.lucene.util.English; + +import junit.framework.TestCase; + +import java.io.IOException; +import java.io.File; + +public class TestThreadedOptimize extends TestCase { + + private static final Analyzer ANALYZER = new SimpleAnalyzer(); + + private final static int NUM_THREADS = 3; + //private final static int NUM_THREADS = 5; + + private final static int NUM_ITER = 2; + //private final static int NUM_ITER = 10; + + private final static int NUM_ITER2 = 2; + //private final static int NUM_ITER2 = 5; + + private boolean failed; + + private void setFailed() { + failed = true; + } + + public void runTest(Directory directory, boolean autoCommit, MergeScheduler merger) throws Exception { + + IndexWriter writer = new IndexWriter(directory, autoCommit, ANALYZER, true); + writer.setMaxBufferedDocs(2); + if (merger != null) + writer.setMergeScheduler(merger); + + for(int iter=0;iter 0) { @@ -1107,12 +1151,14 @@ RAMDirectory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true); writer.setMaxBufferedDocs(10); + int lastNumFile = dir.list().length; long lastGen = -1; for(int j=1;j<52;j++) { Document doc = new Document(); doc.add(new Field("field", "aaa" + j, Field.Store.YES, Field.Index.TOKENIZED)); writer.addDocument(doc); + _TestUtil.syncConcurrentMerges(writer); long gen = SegmentInfos.generationFromSegmentsFileName(SegmentInfos.getCurrentSegmentFileName(dir.list())); if (j == 1) lastGen = gen; @@ -1153,7 +1199,6 @@ public void testDiverseDocs() throws IOException { RAMDirectory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true); - // writer.setInfoStream(System.out); long t0 = System.currentTimeMillis(); writer.setRAMBufferSizeMB(0.5); Random rand = new Random(31415); @@ -1348,6 +1393,48 @@ assertEquals(2, reader.numDocs()); } + // Test calling optimize(false) whereby optimize is kicked + // off but we don't wait for it to finish (but + // writer.close()) does wait + public void testBackgroundOptimize() throws IOException { + + Directory dir = new MockRAMDirectory(); + for(int pass=0;pass<2;pass++) { + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true); + writer.setMergeScheduler(new ConcurrentMergeScheduler()); + Document doc = new Document(); + doc.add(new Field("field", "aaa", Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS)); + writer.setMaxBufferedDocs(2); + writer.setMergeFactor(101); + for(int i=0;i<200;i++) + writer.addDocument(doc); + writer.optimize(false); + + if (0 == pass) { + writer.close(); + IndexReader reader = IndexReader.open(dir); + assertTrue(reader.isOptimized()); + reader.close(); + } else { + // Get another segment to flush so we can verify it is + // NOT included in the optimization + writer.addDocument(doc); + writer.addDocument(doc); + writer.close(); + + IndexReader reader = IndexReader.open(dir); + assertTrue(!reader.isOptimized()); + reader.close(); + + SegmentInfos infos = new SegmentInfos(); + infos.read(dir); + assertEquals(2, infos.size()); + } + } + + dir.close(); + } + private void rmDir(File dir) { File[] files = dir.listFiles(); if (files != null) { Index: src/test/org/apache/lucene/index/TestStressIndexing.java =================================================================== --- src/test/org/apache/lucene/index/TestStressIndexing.java (revision 574266) +++ src/test/org/apache/lucene/index/TestStressIndexing.java (working copy) @@ -32,105 +32,119 @@ public class TestStressIndexing extends TestCase { private static final Analyzer ANALYZER = new SimpleAnalyzer(); private static final Random RANDOM = new Random(); - private static Searcher SEARCHER; - private static int RUN_TIME_SEC = 15; - - private static class IndexerThread extends Thread { - IndexWriter modifier; - int nextID; - public int count; + private static abstract class TimedThread extends Thread { boolean failed; + int count; + private static int RUN_TIME_SEC = 6; + private TimedThread[] allThreads; - public IndexerThread(IndexWriter modifier) { - this.modifier = modifier; + abstract public void doWork() throws Throwable; + + TimedThread(TimedThread[] threads) { + this.allThreads = threads; } public void run() { - long stopTime = System.currentTimeMillis() + 1000*RUN_TIME_SEC; - try { - while(true) { + final long stopTime = System.currentTimeMillis() + 1000*RUN_TIME_SEC; - if (System.currentTimeMillis() > stopTime) { - break; - } + count = 0; - // Add 10 docs: - for(int j=0; j<10; j++) { - Document d = new Document(); - int n = RANDOM.nextInt(); - d.add(new Field("id", Integer.toString(nextID++), Field.Store.YES, Field.Index.UN_TOKENIZED)); - d.add(new Field("contents", English.intToEnglish(n), Field.Store.NO, Field.Index.TOKENIZED)); - modifier.addDocument(d); - } - - // Delete 5 docs: - int deleteID = nextID; - for(int j=0; j<5; j++) { - modifier.deleteDocuments(new Term("id", ""+deleteID)); - deleteID -= 2; - } - + try { + while(System.currentTimeMillis() < stopTime && !anyErrors()) { + doWork(); count++; } - - } catch (Exception e) { - System.out.println(e.toString()); - e.printStackTrace(); + } catch (Throwable e) { + e.printStackTrace(System.out); failed = true; } } + + private boolean anyErrors() { + for(int i=0;i stopTime) { - break; - } - } - } catch (Exception e) { - System.out.println(e.toString()); - e.printStackTrace(); - failed = true; + public void doWork() throws Exception { + // Add 10 docs: + for(int j=0; j<10; j++) { + Document d = new Document(); + int n = RANDOM.nextInt(); + d.add(new Field("id", Integer.toString(nextID++), Field.Store.YES, Field.Index.UN_TOKENIZED)); + d.add(new Field("contents", English.intToEnglish(n), Field.Store.NO, Field.Index.TOKENIZED)); + writer.addDocument(d); } + + // Delete 5 docs: + int deleteID = nextID-1; + for(int j=0; j<5; j++) { + writer.deleteDocuments(new Term("id", ""+deleteID)); + deleteID -= 2; + } } } + private static class SearcherThread extends TimedThread { + private Directory directory; + + public SearcherThread(Directory directory, TimedThread[] threads) { + super(threads); + this.directory = directory; + } + + public void doWork() throws Throwable { + for (int i=0; i<100; i++) + (new IndexSearcher(directory)).close(); + count += 100; + } + } + /* Run one indexer and 2 searchers against single index as stress test. */ - public void runStressTest(Directory directory) throws Exception { - IndexWriter modifier = new IndexWriter(directory, ANALYZER, true); + public void runStressTest(Directory directory, boolean autoCommit, MergeScheduler mergeScheduler) throws Exception { + IndexWriter modifier = new IndexWriter(directory, autoCommit, ANALYZER, true); + modifier.setMaxBufferedDocs(10); + + TimedThread[] threads = new TimedThread[4]; + + if (mergeScheduler != null) + modifier.setMergeScheduler(mergeScheduler); + // One modifier that writes 10 docs then removes 5, over // and over: - IndexerThread indexerThread = new IndexerThread(modifier); + IndexerThread indexerThread = new IndexerThread(modifier, threads); + threads[0] = indexerThread; indexerThread.start(); - IndexerThread indexerThread2 = new IndexerThread(modifier); + IndexerThread indexerThread2 = new IndexerThread(modifier, threads); + threads[2] = indexerThread2; indexerThread2.start(); - // Two searchers that constantly just re-instantiate the searcher: - SearcherThread searcherThread1 = new SearcherThread(directory); + // Two searchers that constantly just re-instantiate the + // searcher: + SearcherThread searcherThread1 = new SearcherThread(directory, threads); + threads[3] = searcherThread1; searcherThread1.start(); - SearcherThread searcherThread2 = new SearcherThread(directory); + SearcherThread searcherThread2 = new SearcherThread(directory, threads); + threads[3] = searcherThread2; searcherThread2.start(); indexerThread.join(); @@ -144,6 +158,7 @@ assertTrue("hit unexpected exception in indexer2", !indexerThread2.failed); assertTrue("hit unexpected exception in search1", !searcherThread1.failed); assertTrue("hit unexpected exception in search2", !searcherThread2.failed); + //System.out.println(" Writer: " + indexerThread.count + " iterations"); //System.out.println("Searcher 1: " + searcherThread1.count + " searchers created"); //System.out.println("Searcher 2: " + searcherThread2.count + " searchers created"); @@ -155,25 +170,38 @@ */ public void testStressIndexAndSearching() throws Exception { - // First in a RAM directory: + // RAMDir Directory directory = new MockRAMDirectory(); - runStressTest(directory); + runStressTest(directory, true, null); directory.close(); - // Second in an FSDirectory: + // FSDir String tempDir = System.getProperty("java.io.tmpdir"); File dirPath = new File(tempDir, "lucene.test.stress"); directory = FSDirectory.getDirectory(dirPath); - runStressTest(directory); + runStressTest(directory, true, null); directory.close(); - rmDir(dirPath); - } - private void rmDir(File dir) { - File[] files = dir.listFiles(); - for (int i = 0; i < files.length; i++) { - files[i].delete(); - } - dir.delete(); + // With ConcurrentMergeScheduler, in RAMDir + directory = new MockRAMDirectory(); + runStressTest(directory, true, new ConcurrentMergeScheduler()); + directory.close(); + + // With ConcurrentMergeScheduler, in FSDir + directory = FSDirectory.getDirectory(dirPath); + runStressTest(directory, true, new ConcurrentMergeScheduler()); + directory.close(); + + // With ConcurrentMergeScheduler and autoCommit=false, in RAMDir + directory = new MockRAMDirectory(); + runStressTest(directory, false, new ConcurrentMergeScheduler()); + directory.close(); + + // With ConcurrentMergeScheduler and autoCommit=false, in FSDir + directory = FSDirectory.getDirectory(dirPath); + runStressTest(directory, false, new ConcurrentMergeScheduler()); + directory.close(); + + _TestUtil.rmDir(dirPath); } } Index: src/test/org/apache/lucene/index/TestDocumentWriter.java =================================================================== --- src/test/org/apache/lucene/index/TestDocumentWriter.java (revision 574266) +++ src/test/org/apache/lucene/index/TestDocumentWriter.java (working copy) @@ -62,7 +62,7 @@ IndexWriter writer = new IndexWriter(dir, analyzer, true); writer.addDocument(testDoc); writer.flush(); - SegmentInfo info = writer.segmentInfos.info(writer.segmentInfos.size()-1); + SegmentInfo info = writer.newestSegment(); writer.close(); //After adding the document, we should be able to read it back in SegmentReader reader = SegmentReader.get(info); @@ -123,7 +123,7 @@ writer.addDocument(doc); writer.flush(); - SegmentInfo info = writer.segmentInfos.info(writer.segmentInfos.size()-1); + SegmentInfo info = writer.newestSegment(); writer.close(); SegmentReader reader = SegmentReader.get(info); @@ -156,7 +156,7 @@ writer.addDocument(doc); writer.flush(); - SegmentInfo info = writer.segmentInfos.info(writer.segmentInfos.size()-1); + SegmentInfo info = writer.newestSegment(); writer.close(); SegmentReader reader = SegmentReader.get(info); Index: src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java =================================================================== --- src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java (revision 574266) +++ src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java (working copy) @@ -272,7 +272,6 @@ writer.addIndexesNoOptimize(new Directory[] { aux, aux }); assertEquals(1020, writer.docCount()); - assertEquals(2, writer.getSegmentCount()); assertEquals(1000, writer.getDocCount(0)); writer.close(); @@ -373,7 +372,7 @@ writer = newWriter(dir, true); writer.setMaxBufferedDocs(1000); - // add 1000 documents + // add 1000 documents in 1 segment addDocs(writer, 1000); assertEquals(1000, writer.docCount()); assertEquals(1, writer.getSegmentCount()); Index: src/test/org/apache/lucene/util/_TestUtil.java =================================================================== --- src/test/org/apache/lucene/util/_TestUtil.java (revision 574266) +++ src/test/org/apache/lucene/util/_TestUtil.java (working copy) @@ -19,6 +19,9 @@ import java.io.File; import java.io.IOException; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.MergeScheduler; +import org.apache.lucene.index.ConcurrentMergeScheduler; public class _TestUtil { @@ -37,4 +40,15 @@ public static void rmDir(String dir) throws IOException { rmDir(new File(dir)); } + + public static void syncConcurrentMerges(IndexWriter writer) { + MergeScheduler ms = writer.getMergeScheduler(); + if (ms instanceof ConcurrentMergeScheduler) + ((ConcurrentMergeScheduler) ms).sync(); + } + + public static void syncConcurrentMerges(MergeScheduler ms) { + if (ms instanceof ConcurrentMergeScheduler) + ((ConcurrentMergeScheduler) ms).sync(); + } } Index: src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java =================================================================== --- src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (revision 0) +++ src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (revision 0) @@ -0,0 +1,273 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.store.Directory; + +import java.io.IOException; +import java.util.List; +import java.util.LinkedList; +import java.util.ArrayList; + +/** A {@link MergeScheduler} that runs each merge using a + * separate thread, up until a maximum number of threads + * ({@link #setMaxThreadCount}) at which points merges are + * run in the foreground, serially. This is a simple way + * to use concurrency in the indexing process without + * having to create and manage application level + * threads. */ + +public class ConcurrentMergeScheduler implements MergeScheduler { + + public static boolean VERBOSE = false; + + private int mergeThreadPriority = -1; + + private List mergeThreads = new ArrayList(); + private int maxThreadCount = 3; + + private List exceptions = new ArrayList(); + private Directory dir; + + /** Sets the max # simultaneous threads that may be + * running. If a merge is necessary yet we already have + * this many threads running, the merge is returned back + * to IndexWriter so that it runs in the "foreground". */ + public void setMaxThreadCount(int count) { + if (count < 1) + throw new IllegalArgumentException("count should be at least 1"); + maxThreadCount = count; + } + + /** Get the max # simultaneous threads that may be + * running. @see #setMaxThreadCount. */ + public int getMaxThreadCount() { + return maxThreadCount; + } + + /** Return the priority that merge threads run at. By + * default the priority is 1 plus the priority of (ie, + * slightly higher priority than) the first thread that + * calls merge. */ + public synchronized int getMergeThreadPriority() { + initMergeThreadPriority(); + return mergeThreadPriority; + } + + /** Return the priority that merge threads run at. */ + public synchronized void setMergeThreadPriority(int pri) { + mergeThreadPriority = pri; + + final int numThreads = mergeThreads.size(); + for(int i=0;i 0) { + if (VERBOSE) { + message("now wait for threads; currently " + mergeThreads.size() + " still running"); + for(int i=0;iExpert: a MergePolicy determines the sequence of + * primitive merge operations to be used for overall merge + * and optimize operations.

+ * + *

Whenever the segments in an index have been altered by + * {@link IndexWriter}, either the addition of a newly + * flushed segment, addition of many segments from + * addIndexes* calls, or a previous merge that may now need + * to cascade, {@link IndexWriter} invokes {@link + * #findMerges} to give the MergePolicy a chance to pick + * merges that are now required. This method returns a + * {@link MergeSpecification} instance describing the set of + * merges that should be done, or null if no merges are + * necessary. When IndexWriter.optimize is called, it calls + * {@link #findMergesForOptimize} and the MergePolicy should + * then return the necessary merges.

+ * + *

Note that the policy can return more than one merge at + * a time. In this case, if the writer is using {@link + * SerialMergeScheduler}, the merges will be run + * sequentially but if it is using {@link + * ConcurrentMergeScheduler} they will be run concurrently.

+ * + *

The default MergePolicy is {@link + * LogByteSizeMergePolicy}.

+ */ + +public interface MergePolicy { + + /** OneMerge provides the information necessary to perform + * an individual primitive merge operation, resulting in + * a single new segment. The merge spec includes the + * subset of segments to be merged as well as whether the + * new segment should use the compound file format. */ + + public static class OneMerge { + + SegmentInfo info; // used by IndexWriter + boolean mergeDocStores; // used by IndexWriter + boolean optimize; // used by IndexWriter + SegmentInfos segmentsClone; // used by IndexWriter + boolean increfDone; // used by IndexWriter + boolean registerDone; // used by IndexWriter + long mergeGen; // used by IndexWriter + boolean isExternal; // used by IndexWriter + + final SegmentInfos segments; + final boolean useCompoundFile; + boolean aborted; + Throwable error; + + public OneMerge(SegmentInfos segments, boolean useCompoundFile) { + if (0 == segments.size()) + throw new RuntimeException("segments must include at least one segment"); + this.segments = segments; + this.useCompoundFile = useCompoundFile; + } + + /** Record that an exception occurred while executing + * this merge */ + public synchronized void setException(Throwable error) { + this.error = error; + } + + /** Retrieve previous exception set by {@link + * #setException}. */ + public synchronized Throwable getException() { + return error; + } + + /** Mark this merge as aborted. If this is called + * before the merge is committed then the merge will + * not be committed. */ + public synchronized void abort() { + aborted = true; + } + + /** Returns true if this merge was aborted. */ + public synchronized boolean isAborted() { + return aborted; + } + + public String segString(Directory dir) { + StringBuffer b = new StringBuffer(); + final int numSegments = segments.size(); + for(int i=0;i 0) b.append(" "); + b.append(segments.info(i).segString(dir)); + } + if (info != null) + b.append(" into " + info.name); + if (optimize) + b.append(" [optimize]"); + return b.toString(); + } + } + + /** + * A MergeSpecification instance provides the information + * necessary to perform multiple merges. It simply + * contains a list of {@link OneMerge} instances. + */ + + public static class MergeSpecification implements Cloneable { + + /** + * The subset of segments to be included in the primitive merge. + */ + + public List merges = new ArrayList(); + + public void add(OneMerge merge) { + merges.add(merge); + } + + public String segString(Directory dir) { + StringBuffer b = new StringBuffer(); + b.append("MergeSpec:\n"); + final int count = merges.size(); + for(int i=0;iThis class implements a {@link MergePolicy} that tries + * to merge segments into levels of exponentially + * increasing size, where each level has < mergeFactor + * segments in it. Whenever a given levle has mergeFactor + * segments or more in it, they will be merged.

+ * + *

This class is abstract and requires a subclass to + * define the {@link #size} method which specifies how a + * segment's size is determined. {@link LogDocMergePolicy} + * is one subclass that measures size by document count in + * the segment. {@link LogByteSizeMergePolicy} is another + * subclass that measures size as the total byte size of the + * file(s) for the segment.

+ */ + +public abstract class LogMergePolicy implements MergePolicy { + + /** Defines the allowed range of log(size) for each + * level. A level is computed by taking the max segment + * log size, minuse LEVEL_LOG_SPAN, and finding all + * segments falling within that range. */ + public static final double LEVEL_LOG_SPAN = 0.75; + + /** Default merge factor, which is how many segments are + * merged at a time */ + public static final int DEFAULT_MERGE_FACTOR = 10; + + private int mergeFactor = DEFAULT_MERGE_FACTOR; + + long minMergeSize; + long maxMergeSize; + + private boolean useCompoundFile = true; + private boolean useCompoundDocStore = true; + + /**

Returns the number of segments that are merged at + * once and also controls the total number of segments + * allowed to accumulate in the index.

*/ + public int getMergeFactor() { + return mergeFactor; + } + + /** Determines how often segment indices are merged by + * addDocument(). With smaller values, less RAM is used + * while indexing, and searches on unoptimized indices are + * faster, but indexing speed is slower. With larger + * values, more RAM is used during indexing, and while + * searches on unoptimized indices are slower, indexing is + * faster. Thus larger values (> 10) are best for batch + * index creation, and smaller values (< 10) for indices + * that are interactively maintained. */ + public void setMergeFactor(int mergeFactor) { + if (mergeFactor < 2) + throw new IllegalArgumentException("mergeFactor cannot be less than 2"); + this.mergeFactor = mergeFactor; + } + + // Javadoc inherited + public boolean useCompoundFile(SegmentInfos infos, SegmentInfo info) { + return useCompoundFile; + } + + /** Sets whether compound file format should be used for + * newly flushed and newly merged segments. */ + public void setUseCompoundFile(boolean useCompoundFile) { + this.useCompoundFile = useCompoundFile; + } + + /** Returns true if newly flushed and newly merge segments + * are written in compound file format. @see + * #setUseCompoundFile */ + public boolean getUseCompoundFile() { + return useCompoundFile; + } + + // Javadoc inherited + public boolean useCompoundDocStore(SegmentInfos infos) { + return useCompoundDocStore; + } + + /** Sets whether compound file format should be used for + * newly flushed and newly merged doc store + * segment files (term vectors and stored fields). */ + public void setUseCompoundDocStore(boolean useCompoundDocStore) { + this.useCompoundDocStore = useCompoundDocStore; + } + + /** Returns true if newly flushed and newly merge doc + * store segment files (term vectors and stored fields) + * are written in compound file format. @see + * #setUseCompoundDocStore */ + public boolean getUseCompoundDocStore() { + return useCompoundDocStore; + } + + public void close() {} + + abstract protected long size(SegmentInfo info) throws IOException; + + private boolean isOptimized(SegmentInfos infos, IndexWriter writer, int maxNumSegments, Set segmentsToOptimize) throws IOException { + final int numSegments = infos.size(); + int numToOptimize = 0; + SegmentInfo optimizeInfo = null; + for(int i=0;i maxNumSegments || + (numToOptimize == 1 && + (optimizeInfo.hasDeletions() || + optimizeInfo.hasSeparateNorms() || + optimizeInfo.dir != writer.getDirectory() || + optimizeInfo.getUseCompoundFile() != useCompoundFile))); + } + + /** Returns the merges necessary to optimize the index. + * This merge policy defines "optimized" to mean only one + * segment in the index, where that segment has no + * deletions pending nor separate norms, and it is in + * compound file format if the current useCompoundFile + * setting is true. This method returns multiple merges + * (mergeFactor at a time) so the {@link MergeScheduler} + * in use may make use of concurrency. */ + public MergeSpecification findMergesForOptimize(SegmentInfos infos, IndexWriter writer, int maxNumSegments, Set segmentsToOptimize) throws IOException { + final Directory dir = writer.getDirectory(); + MergeSpecification spec; + + if (!isOptimized(infos, writer, maxNumSegments, segmentsToOptimize)) { + + int numSegments = infos.size(); + while(numSegments > 0) { + final SegmentInfo info = infos.info(--numSegments); + if (segmentsToOptimize.contains(info)) { + numSegments++; + break; + } + } + + if (numSegments > 0) { + + spec = new MergeSpecification(); + while (numSegments > 0) { + + final int first; + if (numSegments > mergeFactor) + first = numSegments-mergeFactor; + else + first = 0; + spec.add(new OneMerge(infos.range(first, numSegments), useCompoundFile)); + + numSegments -= mergeFactor; + } + + } else + spec = null; + } else + spec = null; + + return spec; + } + + /** Checks if any merges are now necessary and returns a + * {@link MergePolicy.MergeSpecification} if so. A merge + * is necessary when there are more than {@link + * #setMergeFactor} segments at a given level. When + * multiple levels have too many segments, this method + * will return multiple merges, allowing the {@link + * MergeScheduler} to use concurrency. */ + public MergeSpecification findMerges(SegmentInfos infos, IndexWriter writer) throws IOException { + + final int numSegments = infos.size(); + + // Compute levels, which is just log (base mergeFactor) + // of the size of each segment + float[] levels = new float[numSegments]; + final float norm = (float) Math.log(mergeFactor); + + final Directory directory = writer.getDirectory(); + + for(int i=0;i= maxMergeSize && info.dir != directory) + throw new IllegalArgumentException("Segment is too large (" + size + " vs max size " + maxMergeSize + ")"); + + // Floor tiny segments + if (size < 1) + size = 1; + levels[i] = (float) Math.log(size)/norm; + } + + final float levelFloor; + if (minMergeSize <= 0) + levelFloor = (float) 0.0; + else + levelFloor = (float) (Math.log(minMergeSize)/norm); + + // Now, we quantize the log values into levels. The + // first level is any segment whose log size is within + // LEVEL_LOG_SPAN of the max size, or, who has such as + // segment "to the right". Then, we find the max of all + // other segments and use that to define the next level + // segment, etc. + + MergeSpecification spec = null; + + int start = 0; + while(start < numSegments) { + + // Find max level of all segments not already + // quantized. + float maxLevel = levels[start]; + for(int i=1+start;i maxLevel) + maxLevel = level; + } + + // Now search backwards for the rightmost segment that + // falls into this level: + float levelBottom; + if (maxLevel < levelFloor) + // All remaining segments fall into the min level + levelBottom = -1.0F; + else { + levelBottom = (float) (maxLevel - LEVEL_LOG_SPAN); + + // Force a boundary at the level floor + if (levelBottom < levelFloor && maxLevel >= levelFloor) + levelBottom = levelFloor; + } + + int upto = numSegments-1; + while(upto >= start) { + if (levels[upto] >= levelBottom) { + break; + } + upto--; + } + + // Finally, record all merges that are viable at this level: + int end = start + mergeFactor; + while(end <= 1+upto) { + boolean anyTooLarge = false; + for(int i=start;i= maxMergeSize; + + if (!anyTooLarge) { + if (spec == null) + spec = new MergeSpecification(); + spec.add(new OneMerge(infos.range(start, end), useCompoundFile)); + } + start = end; + end = start + mergeFactor; + } + + start = 1+upto; + } + + return spec; + } +} Property changes on: src/java/org/apache/lucene/index/LogMergePolicy.java ___________________________________________________________________ Name: svn:eol-style + native Index: src/java/org/apache/lucene/index/DocumentsWriter.java =================================================================== --- src/java/org/apache/lucene/index/DocumentsWriter.java (revision 574266) +++ src/java/org/apache/lucene/index/DocumentsWriter.java (working copy) @@ -113,6 +113,7 @@ private int nextDocID; // Next docID to be added private int numDocsInRAM; // # docs buffered in RAM + private int numDocsInStore; // # docs written to doc stores private int nextWriteDocID; // Next docID to be written // Max # ThreadState instances; if there are more threads @@ -238,6 +239,7 @@ String s = docStoreSegment; docStoreSegment = null; docStoreOffset = 0; + numDocsInStore = 0; return s; } else { return null; @@ -245,7 +247,12 @@ } private List files = null; // Cached list of files we've created + private List abortedFiles = null; // List of files that were written before last abort() + List abortedFiles() { + return abortedFiles; + } + /* Returns list of files in use by this instance, * including any flushed segments. */ List files() { @@ -278,6 +285,9 @@ * docs added since last flush. */ synchronized void abort() throws IOException { + if (infoStream != null) + infoStream.println("docWriter: now abort"); + // Forcefully remove waiting ThreadStates from line for(int i=0;i + also trigger one or more segment merges which by default + run (blocking) with the current thread (see below for changing the {@link + MergeScheduler}).

The optional autoCommit argument to the @@ -135,8 +144,21 @@ filesystems like NFS that do not support "delete on last close" semantics, which Lucene's "point in time" search normally relies on.

- */ +

Expert: + IndexWriter allows you to separately change + the {@link MergePolicy} and the {@link MergeScheduler}. + The {@link MergePolicy} is invoked whenever there are + changes to the segments in the index. Its role is to + select which merges to do, if any, and return a {@link + MergePolicy.MergeSpecification} describing the merges. It + also selects merges to do for optimize(). (The default is + {@link LogDocMergePolicy}. Then, the {@link + MergeScheduler} is invoked with the requested merges and + it decides when and how to run the merges. The default is + {@link SerialMergeScheduler}.

+*/ + /* * Clarification: Check Points (and commits) * Being able to set autoCommit=false allows IndexWriter to flush and @@ -177,9 +199,10 @@ public static final String WRITE_LOCK_NAME = "write.lock"; /** - * Default value is 10. Change using {@link #setMergeFactor(int)}. + * @deprecated + * @see LogMergePolicy#DEFAULT_MERGE_FACTOR */ - public final static int DEFAULT_MERGE_FACTOR = 10; + public final static int DEFAULT_MERGE_FACTOR = LogMergePolicy.DEFAULT_MERGE_FACTOR; /** * Default value is 10. Change using {@link #setMaxBufferedDocs(int)}. @@ -205,9 +228,10 @@ public final static int DEFAULT_MAX_BUFFERED_DELETE_TERMS = 1000; /** - * Default value is {@link Integer#MAX_VALUE}. Change using {@link #setMaxMergeDocs(int)}. + * @deprecated + * @see LogDocMergePolicy#DEFAULT_MAX_MERGE_DOCS */ - public final static int DEFAULT_MAX_MERGE_DOCS = Integer.MAX_VALUE; + public final static int DEFAULT_MAX_MERGE_DOCS = LogDocMergePolicy.DEFAULT_MAX_MERGE_DOCS; /** * Default value is 10,000. Change using {@link #setMaxFieldLength(int)}. @@ -239,23 +263,31 @@ private boolean localAutoCommit; // saved autoCommit during local transaction private boolean autoCommit = true; // false if we should commit only on close - SegmentInfos segmentInfos = new SegmentInfos(); // the segments + private SegmentInfos segmentInfos = new SegmentInfos(); // the segments private DocumentsWriter docWriter; private IndexFileDeleter deleter; + private Set segmentsToOptimize = new HashSet(); // used by optimize to note those needing optimization + private Lock writeLock; private int termIndexInterval = DEFAULT_TERM_INDEX_INTERVAL; - /** Use compound file setting. Defaults to true, minimizing the number of - * files used. Setting this to false may improve indexing performance, but - * may also cause file handle problems. - */ - private boolean useCompoundFile = true; - private boolean closeDir; private boolean closed; + private boolean closing; + // Holds all SegmentInfo instances currently involved in + // merges + private HashSet mergingSegments = new HashSet(); + + private MergePolicy mergePolicy = new LogDocMergePolicy(); + private MergeScheduler mergeScheduler = new SerialMergeScheduler(); + private LinkedList pendingMerges = new LinkedList(); + private Set runningMerges = new HashSet(); + private List mergeExceptions = new ArrayList(); + private long mergeGen; + /** * Used internally to throw an {@link * AlreadyClosedException} if this IndexWriter has been @@ -268,23 +300,57 @@ } } - /** Get the current setting of whether to use the compound file format. - * Note that this just returns the value you set with setUseCompoundFile(boolean) - * or the default. You cannot use this to query the status of an existing index. + private void message(String message) { + infoStream.println("IW [" + Thread.currentThread().getName() + "]: " + message); + } + + /** + * Casts current mergePolicy to LogMergePolicy, and throws + * an exception if the mergePolicy is not a LogMergePolicy. + */ + private LogMergePolicy getLogMergePolicy() { + if (mergePolicy instanceof LogMergePolicy) + return (LogMergePolicy) mergePolicy; + else + throw new IllegalArgumentException("this method can only be called when the merge policy is the default LogMergePolicy"); + } + + private LogDocMergePolicy getLogDocMergePolicy() { + if (mergePolicy instanceof LogDocMergePolicy) + return (LogDocMergePolicy) mergePolicy; + else + throw new IllegalArgumentException("this method can only be called when the merge policy is LogDocMergePolicy"); + } + + /**

Get the current setting of whether newly flushed + * segments will use the compound file format. Note that + * this just returns the value previously set with + * setUseCompoundFile(boolean), or the default value + * (true). You cannot use this to query the status of + * previously flushed segments.

+ * + *

Note that this method is a convenience method: it + * just calls mergePolicy.getUseCompoundFile as long as + * mergePolicy is an instance of {@link LogMergePolicy}. + * Otherwise an IllegalArgumentException is thrown.

+ * * @see #setUseCompoundFile(boolean) */ public boolean getUseCompoundFile() { - ensureOpen(); - return useCompoundFile; + return getLogMergePolicy().getUseCompoundFile(); } - /** Setting to turn on usage of a compound file. When on, multiple files - * for each segment are merged into a single file once the segment creation - * is finished. This is done regardless of what directory is in use. + /**

Setting to turn on usage of a compound file. When on, + * multiple files for each segment are merged into a + * single file when a new segment is flushed.

+ * + *

Note that this method is a convenience method: it + * just calls mergePolicy.setUseCompoundFile as long as + * mergePolicy is an instance of {@link LogMergePolicy}. + * Otherwise an IllegalArgumentException is thrown.

*/ public void setUseCompoundFile(boolean value) { - ensureOpen(); - useCompoundFile = value; + getLogMergePolicy().setUseCompoundFile(value); } /** Expert: Set the Similarity implementation used by this IndexWriter. @@ -635,6 +701,8 @@ deletionPolicy == null ? new KeepOnlyLastCommitDeletionPolicy() : deletionPolicy, segmentInfos, infoStream, docWriter); + pushMaxBufferedDocs(); + } catch (IOException e) { this.writeLock.release(); this.writeLock = null; @@ -642,26 +710,83 @@ } } + /** + * Expert: set the merge policy used by this writer. + */ + public void setMergePolicy(MergePolicy mp) { + ensureOpen(); + if (mp == null) + throw new NullPointerException("MergePolicy must be non-null"); + + if (mergePolicy != mp) + mergePolicy.close(); + mergePolicy = mp; + pushMaxBufferedDocs(); + } + + /** + * Expert: returns the current MergePolicy in use by this writer. + * @see #setMergePolicy + */ + public MergePolicy getMergePolicy() { + ensureOpen(); + return mergePolicy; + } + + /** + * Expert: set the merge scheduler used by this writer. + */ + public void setMergeScheduler(MergeScheduler mergeScheduler) throws CorruptIndexException, IOException { + ensureOpen(); + if (mergeScheduler == null) + throw new NullPointerException("MergeScheduler must be non-null"); + + if (this.mergeScheduler != mergeScheduler) { + finishMerges(true); + this.mergeScheduler.close(); + } + this.mergeScheduler = mergeScheduler; + } + + /** + * Expert: returns the current MergePolicy in use by this + * writer. + * @see #setMergePolicy + */ + public MergeScheduler getMergeScheduler() { + ensureOpen(); + return mergeScheduler; + } + /** Determines the largest number of documents ever merged by addDocument(). * Small values (e.g., less than 10,000) are best for interactive indexing, * as this limits the length of pauses while indexing to a few seconds. * Larger values are best for batched indexing and speedier searches. * *

The default value is {@link Integer#MAX_VALUE}. + * + *

Note that this method is a convenience method: it + * just calls mergePolicy.setMaxMergeDocs as long as + * mergePolicy is an instance of {@link LogMergePolicy}. + * Otherwise an IllegalArgumentException is thrown.

*/ public void setMaxMergeDocs(int maxMergeDocs) { - ensureOpen(); - this.maxMergeDocs = maxMergeDocs; + getLogDocMergePolicy().setMaxMergeDocs(maxMergeDocs); } - /** + /** * Returns the largest number of documents allowed in a * single segment. + * + *

Note that this method is a convenience method: it + * just calls mergePolicy.getMaxMergeDocs as long as + * mergePolicy is an instance of {@link LogMergePolicy}. + * Otherwise an IllegalArgumentException is thrown.

+ * * @see #setMaxMergeDocs */ public int getMaxMergeDocs() { - ensureOpen(); - return maxMergeDocs; + return getLogDocMergePolicy().getMaxMergeDocs(); } /** @@ -713,9 +838,30 @@ if (maxBufferedDocs < 2) throw new IllegalArgumentException("maxBufferedDocs must at least be 2"); docWriter.setMaxBufferedDocs(maxBufferedDocs); + pushMaxBufferedDocs(); } /** + * If we are flushing by doc count (not by RAM usage), and + * using LogDocMergePolicy then push maxBufferedDocs down + * as its minMergeDocs, to keep backwards compatibility. + */ + private void pushMaxBufferedDocs() { + if (docWriter.getRAMBufferSizeMB() == 0.0) { + final MergePolicy mp = mergePolicy; + if (mp instanceof LogDocMergePolicy) { + LogDocMergePolicy lmp = (LogDocMergePolicy) mp; + final int maxBufferedDocs = docWriter.getMaxBufferedDocs(); + if (lmp.getMinMergeDocs() != maxBufferedDocs) { + if (infoStream != null) + message("now push maxBufferedDocs " + maxBufferedDocs + " to LogDocMergePolicy"); + lmp.setMinMergeDocs(maxBufferedDocs); + } + } + } + } + + /** * Returns 0 if this writer is flushing by RAM usage, else * returns the number of buffered added documents that will * trigger a flush. @@ -784,24 +930,31 @@ * for batch index creation, and smaller values (< 10) for indices that are * interactively maintained. * + *

Note that this method is a convenience method: it + * just calls mergePolicy.setMergeFactor as long as + * mergePolicy is an instance of {@link LogMergePolicy}. + * Otherwise an IllegalArgumentException is thrown.

+ * *

This must never be less than 2. The default value is 10. */ public void setMergeFactor(int mergeFactor) { - ensureOpen(); - if (mergeFactor < 2) - throw new IllegalArgumentException("mergeFactor cannot be less than 2"); - this.mergeFactor = mergeFactor; + getLogMergePolicy().setMergeFactor(mergeFactor); } /** - * Returns the number of segments that are merged at once - * and also controls the total number of segments allowed - * to accumulate in the index. + *

Returns the number of segments that are merged at + * once and also controls the total number of segments + * allowed to accumulate in the index.

+ * + *

Note that this method is a convenience method: it + * just calls mergePolicy.getMergeFactor as long as + * mergePolicy is an instance of {@link LogMergePolicy}. + * Otherwise an IllegalArgumentException is thrown.

+ * * @see #setMergeFactor */ public int getMergeFactor() { - ensureOpen(); - return mergeFactor; + return getLogMergePolicy().getMergeFactor(); } /** If non-null, this will be the default infoStream used @@ -910,15 +1063,75 @@ * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public synchronized void close() throws CorruptIndexException, IOException { - if (!closed) { + public void close() throws CorruptIndexException, IOException { + close(true); + } + + /** + * Closes the index with or without waiting for currently + * running merges to finish. This is only meaningful when + * using a MergeScheduler that runs merges in background + * threads. + * @param waitForMerges if true, this call will block + * until all merges complete; else, it will abort all + * running merges and return right away + */ + public void close(boolean waitForMerges) throws CorruptIndexException, IOException { + boolean doClose; + synchronized(this) { + // Ensure that only one thread actually gets to do the closing: + if (!closing) { + doClose = true; + closing = true; + } else + doClose = false; + } + if (doClose) + closeInternal(waitForMerges); + else + // Another thread beat us to it (is actually doing the + // close), so we will block until that other thread + // has finished closing + waitForClose(); + } + + synchronized private void waitForClose() { + while(!closed && closing) { + try { + wait(); + } catch (InterruptedException ie) { + } + } + } + + private void closeInternal(boolean waitForMerges) throws CorruptIndexException, IOException { + try { + flush(true, true); + mergePolicy.close(); + + finishMerges(waitForMerges); + + mergeScheduler.close(); + if (commitPending) { - segmentInfos.write(directory); // now commit changes + boolean success = false; + try { + segmentInfos.write(directory); // now commit changes + success = true; + } finally { + if (!success) { + if (infoStream != null) + message("hit exception committing segments file during close"); + deletePartialSegmentsFile(); + } + } if (infoStream != null) - infoStream.println("close: wrote segments file \"" + segmentInfos.getCurrentSegmentFileName() + "\""); - deleter.checkpoint(segmentInfos, true); + message("close: wrote segments file \"" + segmentInfos.getCurrentSegmentFileName() + "\""); + synchronized(this) { + deleter.checkpoint(segmentInfos, true); + } commitPending = false; rollbackSegmentInfos = null; } @@ -930,17 +1143,31 @@ closed = true; docWriter = null; - if(closeDir) + synchronized(this) { + deleter.close(); + } + + if (closeDir) directory.close(); + } finally { + synchronized(this) { + if (!closed) + closing = false; + notifyAll(); + } } } /** Tells the docWriter to close its currently open shared - * doc stores (stored fields & vectors files). */ - private void flushDocStores() throws IOException { + * doc stores (stored fields & vectors files). + * Return value specifices whether new doc store files are compound or not. + */ + private synchronized boolean flushDocStores() throws IOException { List files = docWriter.files(); + boolean useCompoundDocStore = false; + if (files.size() > 0) { String docStoreSegment; @@ -949,20 +1176,25 @@ docStoreSegment = docWriter.closeDocStore(); success = true; } finally { - if (!success) + if (!success) { + if (infoStream != null) + message("hit exception closing doc store segment"); docWriter.abort(); + } } - if (useCompoundFile && docStoreSegment != null) { + useCompoundDocStore = mergePolicy.useCompoundDocStore(segmentInfos); + + if (useCompoundDocStore && docStoreSegment != null) { // Now build compound doc store file - checkpoint(); success = false; final int numSegments = segmentInfos.size(); + final String compoundFileName = docStoreSegment + "." + IndexFileNames.COMPOUND_FILE_STORE_EXTENSION; try { - CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, docStoreSegment + "." + IndexFileNames.COMPOUND_FILE_STORE_EXTENSION); + CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName); final int size = files.size(); for(int i=0;i * - *

The amount of free space required when a merge is - * triggered is up to 1X the size of all segments being - * merged, when no readers/searchers are open against the - * index, and up to 2X the size of all segments being - * merged when readers/searchers are open against the - * index (see {@link #optimize()} for details). Most - * merges are small (merging the smallest segments - * together), but whenever a full merge occurs (all - * segments in the index, which is the worst case for - * temporary space usage) then the maximum free disk space - * required is the same as {@link #optimize}.

+ *

The amount of free space required when a merge is triggered is + * up to 1X the size of all segments being merged, when no + * readers/searchers are open against the index, and up to 2X the + * size of all segments being merged when readers/searchers are open + * against the index (see {@link #optimize()} for details). The + * sequence of primitive merge operations performed is governed by + * the merge policy. * *

Note that each term in the document can be no longer * than 16383 characters, otherwise an @@ -1105,14 +1340,27 @@ */ public void addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { ensureOpen(); + boolean doFlush = false; boolean success = false; try { - success = docWriter.addDocument(doc, analyzer); - } catch (IOException ioe) { - deleter.refresh(); - throw ioe; + doFlush = docWriter.addDocument(doc, analyzer); + success = true; + } finally { + if (!success) { + + if (infoStream != null) + message("hit exception adding document"); + + synchronized (this) { + // If docWriter has some aborted files that were + // never incref'd, then we clean them up here + final List files = docWriter.abortedFiles(); + if (files != null) + deleter.deleteNewFiles(files); + } + } } - if (success) + if (doFlush) flush(true, false); } @@ -1178,11 +1426,24 @@ throws CorruptIndexException, IOException { ensureOpen(); boolean doFlush = false; + boolean success = false; try { doFlush = docWriter.updateDocument(term, doc, analyzer); - } catch (IOException ioe) { - deleter.refresh(); - throw ioe; + success = true; + } finally { + if (!success) { + + if (infoStream != null) + message("hit exception updating document"); + + synchronized (this) { + // If docWriter has some aborted files that were + // never incref'd, then we clean them up here + final List files = docWriter.abortedFiles(); + if (files != null) + deleter.deleteNewFiles(files); + } + } } if (doFlush) flush(true, false); @@ -1208,54 +1469,40 @@ } final String newSegmentName() { - return "_" + Integer.toString(segmentInfos.counter++, Character.MAX_RADIX); + // Cannot synchronize on IndexWriter because that causes + // deadlock + synchronized(segmentInfos) { + return "_" + Integer.toString(segmentInfos.counter++, Character.MAX_RADIX); + } } - /** Determines how often segment indices are merged by addDocument(). With - * smaller values, less RAM is used while indexing, and searches on - * unoptimized indices are faster, but indexing speed is slower. With larger - * values, more RAM is used during indexing, and while searches on unoptimized - * indices are slower, indexing is faster. Thus larger values (> 10) are best - * for batch index creation, and smaller values (< 10) for indices that are - * interactively maintained. - * - *

This must never be less than 2. The default value is {@link #DEFAULT_MERGE_FACTOR}. - - */ - private int mergeFactor = DEFAULT_MERGE_FACTOR; - /** Determines amount of RAM usage by the buffered docs at * which point we trigger a flush to the index. */ private double ramBufferSize = DEFAULT_RAM_BUFFER_SIZE_MB*1024F*1024F; - /** Determines the largest number of documents ever merged by addDocument(). - * Small values (e.g., less than 10,000) are best for interactive indexing, - * as this limits the length of pauses while indexing to a few seconds. - * Larger values are best for batched indexing and speedier searches. - * - *

The default value is {@link #DEFAULT_MAX_MERGE_DOCS}. - - */ - private int maxMergeDocs = DEFAULT_MAX_MERGE_DOCS; - /** If non-null, information about merges will be printed to this. */ private PrintStream infoStream = null; - private static PrintStream defaultInfoStream = null; - /** Merges all segments together into a single segment, - * optimizing an index for search. + /** + * Requests an "optimize" operation on an index, priming the index + * for the fastest available search. Traditionally this has meant + * merging all segments into a single segment as is done in the + * default merge policy, but individaul merge policies may implement + * optimize in different ways. * + * @see LogMergePolicy#findMergesForOptimize + * *

It is recommended that this method be called upon completion of indexing. In * environments with frequent updates, optimize is best done during low volume times, if at all. * *

*

See http://www.gossamer-threads.com/lists/lucene/java-dev/47895 for more discussion.

* - *

Note that this requires substantial temporary free + *

Note that this can require substantial temporary free * space in the Directory (see LUCENE-764 * for details):

@@ -1293,7 +1540,7 @@ *

The actual temporary usage could be much less than * these figures (it depends on many factors).

* - *

Once the optimize completes, the total size of the + *

In general, once the optimize completes, the total size of the * index will be less than the size of the starting index. * It could be quite a bit smaller (if there were many * pending deletes) or just slightly smaller.

@@ -1307,24 +1554,158 @@ * using compound file format. This will occur when the * Exception is hit during conversion of the segment into * compound format.

+ * + *

This call will optimize those segments present in + * the index when the call started. If other threads are + * still adding documents and flushing segments, those + * newly created segments will not be optimized unless you + * call optimize again.

+ * * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public synchronized void optimize() throws CorruptIndexException, IOException { + public void optimize() throws CorruptIndexException, IOException { + optimize(true); + } + + /** Just like {@link #optimize()}, except you can specify + * whether the call should block until the optimize + * completes. This is only meaningful with a + * {@link MergeScheduler} that is able to run merges in + * background threads. */ + public void optimize(boolean doWait) throws CorruptIndexException, IOException { ensureOpen(); flush(); - while (segmentInfos.size() > 1 || - (segmentInfos.size() == 1 && - (SegmentReader.hasDeletions(segmentInfos.info(0)) || - SegmentReader.hasSeparateNorms(segmentInfos.info(0)) || - segmentInfos.info(0).dir != directory || - (useCompoundFile && - !segmentInfos.info(0).getUseCompoundFile())))) { - int minSegment = segmentInfos.size() - mergeFactor; - mergeSegments(minSegment < 0 ? 0 : minSegment, segmentInfos.size()); + + if (infoStream != null) + message("optimize: index now " + segString()); + + synchronized(this) { + resetMergeExceptions(); + segmentsToOptimize = new HashSet(); + final int numSegments = segmentInfos.size(); + for(int i=0;i 0) { + // Forward any exceptions in background merge + // threads to the current thread: + final int size = mergeExceptions.size(); + for(int i=0;iautoCommit=true. * @throws IOException if there is a low-level IO error */ - public synchronized void abort() throws IOException { + public void abort() throws IOException { ensureOpen(); - if (!autoCommit) { + if (autoCommit) + throw new IllegalStateException("abort() can only be called when IndexWriter was opened with autoCommit=false"); - // Keep the same segmentInfos instance but replace all - // of its SegmentInfo instances. This is so the next - // attempt to commit using this instance of IndexWriter - // will always write to a new generation ("write once"). - segmentInfos.clear(); - segmentInfos.addAll(rollbackSegmentInfos); + boolean doClose; + synchronized(this) { + // Ensure that only one thread actually gets to do the closing: + if (!closing) { + doClose = true; + closing = true; + } else + doClose = false; + } - docWriter.abort(); + if (doClose) { - // Ask deleter to locate unreferenced files & remove - // them: - deleter.checkpoint(segmentInfos, false); - deleter.refresh(); + finishMerges(false); + // Must pre-close these two, in case they set + // commitPending=true, so that we can then set it to + // false before calling closeInternal + mergePolicy.close(); + mergeScheduler.close(); + + synchronized(this) { + // Keep the same segmentInfos instance but replace all + // of its SegmentInfo instances. This is so the next + // attempt to commit using this instance of IndexWriter + // will always write to a new generation ("write + // once"). + segmentInfos.clear(); + segmentInfos.addAll(rollbackSegmentInfos); + + docWriter.abort(); + + // Ask deleter to locate unreferenced files & remove + // them: + deleter.checkpoint(segmentInfos, false); + deleter.refresh(); + finishMerges(false); + } + commitPending = false; - docWriter.abort(); - close(); + closeInternal(false); + } else + waitForClose(); + } + private synchronized void finishMerges(boolean waitForMerges) { + if (!waitForMerges) { + // Abort all pending & running merges: + Iterator it = pendingMerges.iterator(); + while(it.hasNext()) + ((MergePolicy.OneMerge) it.next()).abort(); + + pendingMerges.clear(); + it = runningMerges.iterator(); + while(it.hasNext()) + ((MergePolicy.OneMerge) it.next()).abort(); + + runningMerges.clear(); + mergingSegments.clear(); + notifyAll(); } else { - throw new IllegalStateException("abort() can only be called when IndexWriter was opened with autoCommit=false"); + while(pendingMerges.size() > 0 || runningMerges.size() > 0) { + try { + wait(); + } catch (InterruptedException ie) { + } + } + assert 0 == mergingSegments.size(); } } @@ -1461,11 +1903,11 @@ * commit the change immediately. Else, we mark * commitPending. */ - private void checkpoint() throws IOException { + private synchronized void checkpoint() throws IOException { if (autoCommit) { segmentInfos.write(directory); if (infoStream != null) - infoStream.println("checkpoint: wrote segments file \"" + segmentInfos.getCurrentSegmentFileName() + "\""); + message("checkpoint: wrote segments file \"" + segmentInfos.getCurrentSegmentFileName() + "\""); } else { commitPending = true; } @@ -1521,7 +1963,7 @@ throws CorruptIndexException, IOException { ensureOpen(); - optimize(); // start with zero or 1 seg + flush(); int start = segmentInfos.size(); @@ -1538,15 +1980,8 @@ } } - // merge newly added segments in log(n) passes - while (segmentInfos.size() > start+mergeFactor) { - for (int base = start; base < segmentInfos.size(); base++) { - int end = Math.min(segmentInfos.size(), base+mergeFactor); - if (end-base > 1) { - mergeSegments(base, end); - } - } - } + optimize(); + success = true; } finally { if (success) { @@ -1555,8 +1990,11 @@ rollbackTransaction(); } } + } - optimize(); // final cleanup + private synchronized void resetMergeExceptions() { + mergeExceptions = new ArrayList(); + mergeGen++; } /** @@ -1578,40 +2016,10 @@ */ public synchronized void addIndexesNoOptimize(Directory[] dirs) throws CorruptIndexException, IOException { - // Adding indexes can be viewed as adding a sequence of segments S to - // a sequence of segments T. Segments in T follow the invariants but - // segments in S may not since they could come from multiple indexes. - // Here is the merge algorithm for addIndexesNoOptimize(): - // - // 1 Flush ram. - // 2 Consider a combined sequence with segments from T followed - // by segments from S (same as current addIndexes(Directory[])). - // 3 Assume the highest level for segments in S is h. Call - // maybeMergeSegments(), but instead of starting w/ lowerBound = -1 - // and upperBound = maxBufferedDocs, start w/ lowerBound = -1 and - // upperBound = upperBound of level h. After this, the invariants - // are guaranteed except for the last < M segments whose levels <= h. - // 4 If the invariants hold for the last < M segments whose levels <= h, - // if some of those < M segments are from S (not merged in step 3), - // properly copy them over*, otherwise done. - // Otherwise, simply merge those segments. If the merge results in - // a segment of level <= h, done. Otherwise, it's of level h+1 and call - // maybeMergeSegments() starting w/ upperBound = upperBound of level h+1. - // - // * Ideally, we want to simply copy a segment. However, directory does - // not support copy yet. In addition, source may use compound file or not - // and target may use compound file or not. So we use mergeSegments() to - // copy a segment, which may cause doc count to change because deleted - // docs are garbage collected. - // 1 flush ram - ensureOpen(); flush(); - // 2 copy segment infos and find the highest level from dirs - int startUpperBound = docWriter.getMaxBufferedDocs(); - /* new merge policy if (startUpperBound == 0) startUpperBound = 10; @@ -1634,64 +2042,20 @@ for (int j = 0; j < sis.size(); j++) { SegmentInfo info = sis.info(j); segmentInfos.addElement(info); // add each info - - while (startUpperBound < info.docCount) { - startUpperBound *= mergeFactor; // find the highest level from dirs - if (startUpperBound > maxMergeDocs) { - // upper bound cannot exceed maxMergeDocs - throw new IllegalArgumentException("Upper bound cannot exceed maxMergeDocs"); - } - } } } - // 3 maybe merge segments starting from the highest level from dirs - maybeMergeSegments(startUpperBound); + maybeMerge(); - // get the tail segments whose levels <= h - int segmentCount = segmentInfos.size(); - int numTailSegments = 0; - while (numTailSegments < segmentCount - && startUpperBound >= segmentInfos.info(segmentCount - 1 - numTailSegments).docCount) { - numTailSegments++; - } - if (numTailSegments == 0) { - success = true; - return; - } + // If after merging there remain segments in the index + // that are in a different directory, just copy these + // over into our index. This is necessary (before + // finishing the transaction) to avoid leaving the + // index in an unusable (inconsistent) state. + copyExternalSegments(); - // 4 make sure invariants hold for the tail segments whose levels <= h - if (checkNonDecreasingLevels(segmentCount - numTailSegments)) { - // identify the segments from S to be copied (not merged in 3) - int numSegmentsToCopy = 0; - while (numSegmentsToCopy < segmentCount - && directory != segmentInfos.info(segmentCount - 1 - numSegmentsToCopy).dir) { - numSegmentsToCopy++; - } - if (numSegmentsToCopy == 0) { - success = true; - return; - } + success = true; - // copy those segments from S - for (int i = segmentCount - numSegmentsToCopy; i < segmentCount; i++) { - mergeSegments(i, i + 1); - } - if (checkNonDecreasingLevels(segmentCount - numSegmentsToCopy)) { - success = true; - return; - } - } - - // invariants do not hold, simply merge those segments - mergeSegments(segmentCount - numTailSegments, segmentCount); - - // maybe merge segments again if necessary - if (segmentInfos.info(segmentInfos.size() - 1).docCount > startUpperBound) { - maybeMergeSegments(startUpperBound * mergeFactor); - } - - success = true; } finally { if (success) { commitTransaction(); @@ -1701,6 +2065,33 @@ } } + /* If any of our segments are using a directory != ours + * then copy them over. Currently this is only used by + * addIndexesNoOptimize(). */ + private synchronized void copyExternalSegments() throws CorruptIndexException, IOException { + final int numSegments = segmentInfos.size(); + for(int i=0;iAfter this completes, the index is optimized.

*

The provided IndexReaders are not closed.

@@ -1754,6 +2145,9 @@ } finally { if (!success) { + if (infoStream != null) + message("hit exception in addIndexes during merge"); + rollbackTransaction(); } else { commitTransaction(); @@ -1765,7 +2159,7 @@ } } - if (useCompoundFile) { + if (mergePolicy instanceof LogMergePolicy && getUseCompoundFile()) { boolean success = false; @@ -1776,6 +2170,9 @@ info.setUseCompoundFile(true); } finally { if (!success) { + if (infoStream != null) + message("hit exception building compound file in addIndexes during merge"); + rollbackTransaction(); } else { commitTransaction(); @@ -1784,40 +2181,6 @@ } } - // Overview of merge policy: - // - // A flush is triggered either by close() or by the number of ram segments - // reaching maxBufferedDocs. After a disk segment is created by the flush, - // further merges may be triggered. - // - // LowerBound and upperBound set the limits on the doc count of a segment - // which may be merged. Initially, lowerBound is set to 0 and upperBound - // to maxBufferedDocs. Starting from the rightmost* segment whose doc count - // > lowerBound and <= upperBound, count the number of consecutive segments - // whose doc count <= upperBound. - // - // Case 1: number of worthy segments < mergeFactor, no merge, done. - // Case 2: number of worthy segments == mergeFactor, merge these segments. - // If the doc count of the merged segment <= upperBound, done. - // Otherwise, set lowerBound to upperBound, and multiply upperBound - // by mergeFactor, go through the process again. - // Case 3: number of worthy segments > mergeFactor (in the case mergeFactor - // M changes), merge the leftmost* M segments. If the doc count of - // the merged segment <= upperBound, consider the merged segment for - // further merges on this same level. Merge the now leftmost* M - // segments, and so on, until number of worthy segments < mergeFactor. - // If the doc count of all the merged segments <= upperBound, done. - // Otherwise, set lowerBound to upperBound, and multiply upperBound - // by mergeFactor, go through the process again. - // Note that case 2 can be considerd as a special case of case 3. - // - // This merge policy guarantees two invariants if M does not change and - // segment doc count is not reaching maxMergeDocs: - // B for maxBufferedDocs, f(n) defined as ceil(log_M(ceil(n/B))) - // 1: If i (left*) and i+1 (right*) are two consecutive segments of doc - // counts x and y, then f(x) >= f(y). - // 2: The number of committed segments on the same level (f(n)) <= M. - // This is called after pending added and deleted // documents have been flushed to the Directory but before // the change is committed (new segments_N file written). @@ -1833,7 +2196,7 @@ * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public final synchronized void flush() throws CorruptIndexException, IOException { + public final void flush() throws CorruptIndexException, IOException { flush(true, false); } @@ -1845,9 +2208,15 @@ * @param flushDocStores if false we are allowed to keep * doc stores open to share with the next segment */ - protected final synchronized void flush(boolean triggerMerge, boolean flushDocStores) throws CorruptIndexException, IOException { + protected final void flush(boolean triggerMerge, boolean flushDocStores) throws CorruptIndexException, IOException { ensureOpen(); + if (doFlush(flushDocStores) && triggerMerge) + maybeMerge(); + } + + private synchronized final boolean doFlush(boolean flushDocStores) throws CorruptIndexException, IOException { + // Make sure no threads are actively adding a document docWriter.pauseAllThreads(); @@ -1877,10 +2246,14 @@ boolean flushDeletes = docWriter.hasDeletes(); if (infoStream != null) - infoStream.println(" flush: flushDocs=" + flushDocs + - " flushDeletes=" + flushDeletes + - " flushDocStores=" + flushDocStores + - " numDocs=" + numDocs); + message(" flush: segment=" + docWriter.getSegment() + + " docStoreSegment=" + docWriter.getDocStoreSegment() + + " docStoreOffset=" + docWriter.getDocStoreOffset() + + " flushDocs=" + flushDocs + + " flushDeletes=" + flushDeletes + + " flushDocStores=" + flushDocStores + + " numDocs=" + numDocs + + " numBufDelTerms=" + docWriter.getNumBufferedDeleteTerms()); int docStoreOffset = docWriter.getDocStoreOffset(); boolean docStoreIsCompoundFile = false; @@ -1891,15 +2264,17 @@ if (flushDocStores && (!flushDocs || !docWriter.getSegment().equals(docWriter.getDocStoreSegment()))) { // We must separately flush the doc store if (infoStream != null) - infoStream.println(" flush shared docStore segment " + docStoreSegment); + message(" flush shared docStore segment " + docStoreSegment); - flushDocStores(); + docStoreIsCompoundFile = flushDocStores(); flushDocStores = false; - docStoreIsCompoundFile = useCompoundFile; } String segment = docWriter.getSegment(); + // If we are flushing docs, segment must not be null: + assert segment != null || !flushDocs; + if (flushDocs || flushDeletes) { SegmentInfos rollback = null; @@ -1948,7 +2323,22 @@ success = true; } finally { if (!success) { + + if (infoStream != null) + message("hit exception flushing segment " + segment); + if (flushDeletes) { + + // Carefully check if any partial .del files + // should be removed: + final int size = rollback.size(); + for(int i=0;i= 0) { - SegmentInfo si = segmentInfos.info(minSegment); + final int numSegments = segmentInfos.size(); + + final int numSegmentsToMerge = merge.segments.size(); + for(int i=0;i lowerBound && si.docCount <= upperBound) { - // start from the rightmost* segment whose doc count is in bounds - maxSegment = minSegment; - } else if (si.docCount > upperBound) { - // until the segment whose doc count exceeds upperBound - break; - } + if (first + i >= numSegments || !segmentInfos.info(first+i).equals(info)) { + if (segmentInfos.indexOf(info) == -1) + throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.name + ") that is not in the index"); + else + throw new MergePolicy.MergeException("MergePolicy selected non-contiguous segments to merge (" + merge + " vs " + segString() + "), which IndexWriter (currently) cannot handle"); } + } - minSegment++; - maxSegment++; - int numSegments = maxSegment - minSegment; + return first; + } - if (numSegments < mergeFactor) { - break; - } else { - boolean exceedsUpperLimit = false; + /* FIXME if we want to support non-contiguous segment merges */ + synchronized private boolean commitMerge(MergePolicy.OneMerge merge) throws IOException { - // number of merge-worthy segments may exceed mergeFactor when - // mergeFactor and/or maxBufferedDocs change(s) - while (numSegments >= mergeFactor) { - // merge the leftmost* mergeFactor segments + assert merge.registerDone; - int docCount = mergeSegments(minSegment, minSegment + mergeFactor); - numSegments -= mergeFactor; + // If merge was explicitly aborted, or, if abort() or + // rollbackTransaction() had been called since our merge + // started (which results in an unqualified + // deleter.refresh() call that will remove any index + // file that current segments does not reference), we + // abort this merge + if (merge.isAborted()) { - if (docCount > upperBound) { - // continue to merge the rest of the worthy segments on this level - minSegment++; - exceedsUpperLimit = true; - } else { - // if the merged segment does not exceed upperBound, consider - // this segment for further merges on this same level - numSegments++; + if (infoStream != null) { + if (merge.isAborted()) + message("commitMerge: skipping merge " + merge.segString(directory) + ": it was aborted"); + } + + assert merge.increfDone; + decrefMergeSegments(merge); + deleter.refresh(merge.info.name); + return false; + } + + boolean success = false; + + int start; + + try { + SegmentInfos sourceSegmentsClone = merge.segmentsClone; + SegmentInfos sourceSegments = merge.segments; + final int numSegments = segmentInfos.size(); + + start = ensureContiguousMerge(merge); + if (infoStream != null) + message("commitMerge " + merge.segString(directory)); + + // Carefully merge deletes that occurred after we + // started merging: + + BitVector deletes = null; + int docUpto = 0; + + final int numSegmentsToMerge = sourceSegments.size(); + for(int i=0;i minSegment; i--) // remove old infos & add new - segmentInfos.remove(i); + /** Does fininishing for a merge, which is fast but holds + * the synchronized lock on IndexWriter instance. */ + final synchronized void mergeFinish(MergePolicy.OneMerge merge) throws IOException { - segmentInfos.set(minSegment, newSegment); + if (merge.increfDone) + decrefMergeSegments(merge); - checkpoint(); + assert merge.registerDone; - success = true; + final SegmentInfos sourceSegments = merge.segments; + final SegmentInfos sourceSegmentsClone = merge.segmentsClone; + final int end = sourceSegments.size(); + for(int i=0;i 0) { if (infoStream != null) - infoStream.println("flush " + docWriter.getNumBufferedDeleteTerms() + " buffered deleted terms on " - + segmentInfos.size() + " segments."); + message("flush " + docWriter.getNumBufferedDeleteTerms() + " buffered deleted terms on " + + segmentInfos.size() + " segments."); if (flushedNewSegment) { IndexReader reader = null; @@ -2341,29 +3079,6 @@ return delCount; } - private final boolean checkNonDecreasingLevels(int start) { - int lowerBound = -1; - int upperBound = docWriter.getMaxBufferedDocs(); - - /* new merge policy - if (upperBound == 0) - upperBound = 10; - */ - - for (int i = segmentInfos.size() - 1; i >= start; i--) { - int docCount = segmentInfos.info(i).docCount; - if (docCount <= lowerBound) { - return false; - } - - while (docCount > upperBound) { - lowerBound = upperBound; - upperBound *= mergeFactor; - } - } - return true; - } - // For test purposes. final synchronized int getBufferedDeleteTermsSize() { return docWriter.getBufferedDeleteTerms().size(); @@ -2417,13 +3132,18 @@ return delCount; } + // utility routines for tests + SegmentInfo newestSegment() { + return segmentInfos.info(segmentInfos.size()-1); + } + public synchronized String segString() { StringBuffer buffer = new StringBuffer(); for(int i = 0; i < segmentInfos.size(); i++) { if (i > 0) { buffer.append(' '); } - buffer.append(segmentInfos.info(i).name + ":" + segmentInfos.info(i).docCount); + buffer.append(segmentInfos.info(i).segString(directory)); } return buffer.toString(); Index: src/java/org/apache/lucene/index/IndexFileDeleter.java =================================================================== --- src/java/org/apache/lucene/index/IndexFileDeleter.java (revision 574266) +++ src/java/org/apache/lucene/index/IndexFileDeleter.java (working copy) @@ -105,7 +105,7 @@ } private void message(String message) { - infoStream.println(this + " " + Thread.currentThread().getName() + ": " + message); + infoStream.println("Deleter [" + Thread.currentThread().getName() + "]: " + message); } /** @@ -275,25 +275,59 @@ * Writer calls this when it has hit an error and had to * roll back, to tell us that there may now be * unreferenced files in the filesystem. So we re-list - * the filesystem and delete such files: + * the filesystem and delete such files. If segmentName + * is non-null, we will only delete files corresponding to + * that segment. */ - public void refresh() throws IOException { + public void refresh(String segmentName) throws IOException { String[] files = directory.list(); if (files == null) throw new IOException("cannot read directory " + directory + ": list() returned null"); IndexFileNameFilter filter = IndexFileNameFilter.getFilter(); + String segmentPrefix1; + String segmentPrefix2; + if (segmentName != null) { + segmentPrefix1 = segmentName + "."; + segmentPrefix2 = segmentName + "_"; + } else { + segmentPrefix1 = null; + segmentPrefix2 = null; + } + for(int i=0;i 0; return --count; } } Index: src/java/org/apache/lucene/index/MergeScheduler.java =================================================================== --- src/java/org/apache/lucene/index/MergeScheduler.java (revision 0) +++ src/java/org/apache/lucene/index/MergeScheduler.java (revision 0) @@ -0,0 +1,36 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +/** Expert: {@link IndexWriter} uses an instance + * implementing this interface to execute the merges + * selected by a {@link MergePolicy}. The default + * MergeScheduler is {@link SerialMergeScheduler}. */ + +public interface MergeScheduler { + + /** Run the merges provided by {@link IndexWriter#getNextMerge()}. */ + void merge(IndexWriter writer) + throws CorruptIndexException, IOException; + + /** Close this MergeScheduler. */ + void close() + throws CorruptIndexException, IOException; +} Property changes on: src/java/org/apache/lucene/index/MergeScheduler.java ___________________________________________________________________ Name: svn:eol-style + native