Index: CHANGES.txt
===================================================================
--- CHANGES.txt (revision 575729)
+++ CHANGES.txt (working copy)
@@ -28,6 +28,30 @@
termText instead of String. This gives faster tokenization
performance (~10-15%). (Mike McCandless)
+ 5. LUCENE-847: Factor MergePolicy and MergeScheduler out of
+ IndexWriter. This enables users to change how & when segments are
+ selected for merging or optimizing (by creating a MergePolicy) as
+ well as how & when the selected merges are actually performed (by
+ creating a MergeScheduler). Added new public optimize(boolean
+ doWait), close(boolean doWait) and maybeMerge() methods to
+ IndexWriter. (Steven Parkes via Mike McCandless).
+
+ 6. LUCENE-870: Add a ConcurrentMergeScheduler which executes merges
+ using background threads, up until a max number of threads. Once
+ there are too many merges, they are performed serially. This
+ change also improves overall concurrency of IndexWriter for
+ applications that use multiple threads. For example, optimize is
+ no longer synchronized (and can run in the background), and
+ multiple threads from the application can be running merges at
+ once even when not using ConcurrentMergeScheduler (Steven Parkes
+ via Mike McCandless).
+
+ 7. LUCENE-845: Added a LogByteSizeMergePolicy which selects merges by
+ roughly equal size (in bytes) of the segments in the Directory.
+ This is a better match when you are flushing by RAM usage instead
+ of document count in IndexWriter or when documents vary
+ substantially in size (Mike McCandless).
+
Bug fixes
1. LUCENE-933: QueryParser fixed to not produce empty sub
Index: src/test/org/apache/lucene/store/MockRAMOutputStream.java
===================================================================
--- src/test/org/apache/lucene/store/MockRAMOutputStream.java (revision 575729)
+++ src/test/org/apache/lucene/store/MockRAMOutputStream.java (working copy)
@@ -55,7 +55,7 @@
writeBytes(singleByte, 0, 1);
}
- public void writeBytes(byte[] b, int offset, int len) throws IOException {
+ public void writeBytes(byte[] b, int offset, int len) throws IOException {
long freeSpace = dir.maxSize - dir.sizeInBytes();
long realUsage = 0;
Index: src/test/org/apache/lucene/store/MockRAMDirectory.java
===================================================================
--- src/test/org/apache/lucene/store/MockRAMDirectory.java (revision 575729)
+++ src/test/org/apache/lucene/store/MockRAMDirectory.java (working copy)
@@ -195,7 +195,7 @@
* RAMOutputStream.BUFFER_SIZE (now 1024) bytes.
*/
- final long getRecomputedActualSizeInBytes() {
+ final synchronized long getRecomputedActualSizeInBytes() {
long size = 0;
Iterator it = fileMap.values().iterator();
while (it.hasNext())
Index: src/test/org/apache/lucene/index/TestThreadedOptimize.java
===================================================================
--- src/test/org/apache/lucene/index/TestThreadedOptimize.java (revision 0)
+++ src/test/org/apache/lucene/index/TestThreadedOptimize.java (revision 0)
@@ -0,0 +1,160 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.analysis.SimpleAnalyzer;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.MockRAMDirectory;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.util._TestUtil;
+import org.apache.lucene.util.English;
+
+import junit.framework.TestCase;
+
+import java.io.IOException;
+import java.io.File;
+
+public class TestThreadedOptimize extends TestCase {
+
+ private static final Analyzer ANALYZER = new SimpleAnalyzer();
+
+ private final static int NUM_THREADS = 3;
+ //private final static int NUM_THREADS = 5;
+
+ private final static int NUM_ITER = 2;
+ //private final static int NUM_ITER = 10;
+
+ private final static int NUM_ITER2 = 2;
+ //private final static int NUM_ITER2 = 5;
+
+ private boolean failed;
+
+ private void setFailed() {
+ failed = true;
+ }
+
+ public void runTest(Directory directory, boolean autoCommit, MergeScheduler merger) throws Exception {
+
+ IndexWriter writer = new IndexWriter(directory, autoCommit, ANALYZER, true);
+ writer.setMaxBufferedDocs(2);
+ if (merger != null)
+ writer.setMergeScheduler(merger);
+
+ for(int iter=0;iter 0) {
@@ -1107,12 +1151,14 @@
RAMDirectory dir = new RAMDirectory();
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
writer.setMaxBufferedDocs(10);
+
int lastNumFile = dir.list().length;
long lastGen = -1;
for(int j=1;j<52;j++) {
Document doc = new Document();
doc.add(new Field("field", "aaa" + j, Field.Store.YES, Field.Index.TOKENIZED));
writer.addDocument(doc);
+ _TestUtil.syncConcurrentMerges(writer);
long gen = SegmentInfos.generationFromSegmentsFileName(SegmentInfos.getCurrentSegmentFileName(dir.list()));
if (j == 1)
lastGen = gen;
@@ -1153,7 +1199,6 @@
public void testDiverseDocs() throws IOException {
RAMDirectory dir = new RAMDirectory();
IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
- // writer.setInfoStream(System.out);
long t0 = System.currentTimeMillis();
writer.setRAMBufferSizeMB(0.5);
Random rand = new Random(31415);
@@ -1348,6 +1393,48 @@
assertEquals(2, reader.numDocs());
}
+ // Test calling optimize(false) whereby optimize is kicked
+ // off but we don't wait for it to finish (but
+ // writer.close()) does wait
+ public void testBackgroundOptimize() throws IOException {
+
+ Directory dir = new MockRAMDirectory();
+ for(int pass=0;pass<2;pass++) {
+ IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true);
+ writer.setMergeScheduler(new ConcurrentMergeScheduler());
+ Document doc = new Document();
+ doc.add(new Field("field", "aaa", Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+ writer.setMaxBufferedDocs(2);
+ writer.setMergeFactor(101);
+ for(int i=0;i<200;i++)
+ writer.addDocument(doc);
+ writer.optimize(false);
+
+ if (0 == pass) {
+ writer.close();
+ IndexReader reader = IndexReader.open(dir);
+ assertTrue(reader.isOptimized());
+ reader.close();
+ } else {
+ // Get another segment to flush so we can verify it is
+ // NOT included in the optimization
+ writer.addDocument(doc);
+ writer.addDocument(doc);
+ writer.close();
+
+ IndexReader reader = IndexReader.open(dir);
+ assertTrue(!reader.isOptimized());
+ reader.close();
+
+ SegmentInfos infos = new SegmentInfos();
+ infos.read(dir);
+ assertEquals(2, infos.size());
+ }
+ }
+
+ dir.close();
+ }
+
private void rmDir(File dir) {
File[] files = dir.listFiles();
if (files != null) {
Index: src/test/org/apache/lucene/index/TestStressIndexing.java
===================================================================
--- src/test/org/apache/lucene/index/TestStressIndexing.java (revision 575729)
+++ src/test/org/apache/lucene/index/TestStressIndexing.java (working copy)
@@ -32,105 +32,119 @@
public class TestStressIndexing extends TestCase {
private static final Analyzer ANALYZER = new SimpleAnalyzer();
private static final Random RANDOM = new Random();
- private static Searcher SEARCHER;
- private static int RUN_TIME_SEC = 15;
-
- private static class IndexerThread extends Thread {
- IndexWriter modifier;
- int nextID;
- public int count;
+ private static abstract class TimedThread extends Thread {
boolean failed;
+ int count;
+ private static int RUN_TIME_SEC = 6;
+ private TimedThread[] allThreads;
- public IndexerThread(IndexWriter modifier) {
- this.modifier = modifier;
+ abstract public void doWork() throws Throwable;
+
+ TimedThread(TimedThread[] threads) {
+ this.allThreads = threads;
}
public void run() {
- long stopTime = System.currentTimeMillis() + 1000*RUN_TIME_SEC;
- try {
- while(true) {
+ final long stopTime = System.currentTimeMillis() + 1000*RUN_TIME_SEC;
- if (System.currentTimeMillis() > stopTime) {
- break;
- }
+ count = 0;
- // Add 10 docs:
- for(int j=0; j<10; j++) {
- Document d = new Document();
- int n = RANDOM.nextInt();
- d.add(new Field("id", Integer.toString(nextID++), Field.Store.YES, Field.Index.UN_TOKENIZED));
- d.add(new Field("contents", English.intToEnglish(n), Field.Store.NO, Field.Index.TOKENIZED));
- modifier.addDocument(d);
- }
-
- // Delete 5 docs:
- int deleteID = nextID;
- for(int j=0; j<5; j++) {
- modifier.deleteDocuments(new Term("id", ""+deleteID));
- deleteID -= 2;
- }
-
+ try {
+ while(System.currentTimeMillis() < stopTime && !anyErrors()) {
+ doWork();
count++;
}
-
- } catch (Exception e) {
- System.out.println(e.toString());
- e.printStackTrace();
+ } catch (Throwable e) {
+ e.printStackTrace(System.out);
failed = true;
}
}
+
+ private boolean anyErrors() {
+ for(int i=0;i stopTime) {
- break;
- }
- }
- } catch (Exception e) {
- System.out.println(e.toString());
- e.printStackTrace();
- failed = true;
+ public void doWork() throws Exception {
+ // Add 10 docs:
+ for(int j=0; j<10; j++) {
+ Document d = new Document();
+ int n = RANDOM.nextInt();
+ d.add(new Field("id", Integer.toString(nextID++), Field.Store.YES, Field.Index.UN_TOKENIZED));
+ d.add(new Field("contents", English.intToEnglish(n), Field.Store.NO, Field.Index.TOKENIZED));
+ writer.addDocument(d);
}
+
+ // Delete 5 docs:
+ int deleteID = nextID-1;
+ for(int j=0; j<5; j++) {
+ writer.deleteDocuments(new Term("id", ""+deleteID));
+ deleteID -= 2;
+ }
}
}
+ private static class SearcherThread extends TimedThread {
+ private Directory directory;
+
+ public SearcherThread(Directory directory, TimedThread[] threads) {
+ super(threads);
+ this.directory = directory;
+ }
+
+ public void doWork() throws Throwable {
+ for (int i=0; i<100; i++)
+ (new IndexSearcher(directory)).close();
+ count += 100;
+ }
+ }
+
/*
Run one indexer and 2 searchers against single index as
stress test.
*/
- public void runStressTest(Directory directory) throws Exception {
- IndexWriter modifier = new IndexWriter(directory, ANALYZER, true);
+ public void runStressTest(Directory directory, boolean autoCommit, MergeScheduler mergeScheduler) throws Exception {
+ IndexWriter modifier = new IndexWriter(directory, autoCommit, ANALYZER, true);
+ modifier.setMaxBufferedDocs(10);
+
+ TimedThread[] threads = new TimedThread[4];
+
+ if (mergeScheduler != null)
+ modifier.setMergeScheduler(mergeScheduler);
+
// One modifier that writes 10 docs then removes 5, over
// and over:
- IndexerThread indexerThread = new IndexerThread(modifier);
+ IndexerThread indexerThread = new IndexerThread(modifier, threads);
+ threads[0] = indexerThread;
indexerThread.start();
- IndexerThread indexerThread2 = new IndexerThread(modifier);
+ IndexerThread indexerThread2 = new IndexerThread(modifier, threads);
+ threads[2] = indexerThread2;
indexerThread2.start();
- // Two searchers that constantly just re-instantiate the searcher:
- SearcherThread searcherThread1 = new SearcherThread(directory);
+ // Two searchers that constantly just re-instantiate the
+ // searcher:
+ SearcherThread searcherThread1 = new SearcherThread(directory, threads);
+ threads[3] = searcherThread1;
searcherThread1.start();
- SearcherThread searcherThread2 = new SearcherThread(directory);
+ SearcherThread searcherThread2 = new SearcherThread(directory, threads);
+ threads[3] = searcherThread2;
searcherThread2.start();
indexerThread.join();
@@ -144,6 +158,7 @@
assertTrue("hit unexpected exception in indexer2", !indexerThread2.failed);
assertTrue("hit unexpected exception in search1", !searcherThread1.failed);
assertTrue("hit unexpected exception in search2", !searcherThread2.failed);
+
//System.out.println(" Writer: " + indexerThread.count + " iterations");
//System.out.println("Searcher 1: " + searcherThread1.count + " searchers created");
//System.out.println("Searcher 2: " + searcherThread2.count + " searchers created");
@@ -155,25 +170,38 @@
*/
public void testStressIndexAndSearching() throws Exception {
- // First in a RAM directory:
+ // RAMDir
Directory directory = new MockRAMDirectory();
- runStressTest(directory);
+ runStressTest(directory, true, null);
directory.close();
- // Second in an FSDirectory:
+ // FSDir
String tempDir = System.getProperty("java.io.tmpdir");
File dirPath = new File(tempDir, "lucene.test.stress");
directory = FSDirectory.getDirectory(dirPath);
- runStressTest(directory);
+ runStressTest(directory, true, null);
directory.close();
- rmDir(dirPath);
- }
- private void rmDir(File dir) {
- File[] files = dir.listFiles();
- for (int i = 0; i < files.length; i++) {
- files[i].delete();
- }
- dir.delete();
+ // With ConcurrentMergeScheduler, in RAMDir
+ directory = new MockRAMDirectory();
+ runStressTest(directory, true, new ConcurrentMergeScheduler());
+ directory.close();
+
+ // With ConcurrentMergeScheduler, in FSDir
+ directory = FSDirectory.getDirectory(dirPath);
+ runStressTest(directory, true, new ConcurrentMergeScheduler());
+ directory.close();
+
+ // With ConcurrentMergeScheduler and autoCommit=false, in RAMDir
+ directory = new MockRAMDirectory();
+ runStressTest(directory, false, new ConcurrentMergeScheduler());
+ directory.close();
+
+ // With ConcurrentMergeScheduler and autoCommit=false, in FSDir
+ directory = FSDirectory.getDirectory(dirPath);
+ runStressTest(directory, false, new ConcurrentMergeScheduler());
+ directory.close();
+
+ _TestUtil.rmDir(dirPath);
}
}
Index: src/test/org/apache/lucene/index/TestDocumentWriter.java
===================================================================
--- src/test/org/apache/lucene/index/TestDocumentWriter.java (revision 575729)
+++ src/test/org/apache/lucene/index/TestDocumentWriter.java (working copy)
@@ -62,7 +62,7 @@
IndexWriter writer = new IndexWriter(dir, analyzer, true);
writer.addDocument(testDoc);
writer.flush();
- SegmentInfo info = writer.segmentInfos.info(writer.segmentInfos.size()-1);
+ SegmentInfo info = writer.newestSegment();
writer.close();
//After adding the document, we should be able to read it back in
SegmentReader reader = SegmentReader.get(info);
@@ -123,7 +123,7 @@
writer.addDocument(doc);
writer.flush();
- SegmentInfo info = writer.segmentInfos.info(writer.segmentInfos.size()-1);
+ SegmentInfo info = writer.newestSegment();
writer.close();
SegmentReader reader = SegmentReader.get(info);
@@ -156,7 +156,7 @@
writer.addDocument(doc);
writer.flush();
- SegmentInfo info = writer.segmentInfos.info(writer.segmentInfos.size()-1);
+ SegmentInfo info = writer.newestSegment();
writer.close();
SegmentReader reader = SegmentReader.get(info);
Index: src/test/org/apache/lucene/index/TestIndexWriterMerging.java
===================================================================
--- src/test/org/apache/lucene/index/TestIndexWriterMerging.java (revision 575729)
+++ src/test/org/apache/lucene/index/TestIndexWriterMerging.java (working copy)
@@ -16,7 +16,7 @@
*/
import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.store.MockRAMDirectory;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
@@ -37,8 +37,8 @@
int num=100;
- Directory indexA = new RAMDirectory();
- Directory indexB = new RAMDirectory();
+ Directory indexA = new MockRAMDirectory();
+ Directory indexB = new MockRAMDirectory();
fillIndex(indexA, 0, num);
boolean fail = verifyIndex(indexA, 0);
@@ -54,7 +54,7 @@
fail("Index b is invalid");
}
- Directory merged = new RAMDirectory();
+ Directory merged = new MockRAMDirectory();
IndexWriter writer = new IndexWriter(merged, new StandardAnalyzer(), true);
writer.setMergeFactor(2);
@@ -85,6 +85,7 @@
System.out.println("Document " + (i + startAt) + " is returning document " + temp.getField("count").stringValue());
}
}
+ reader.close();
return fail;
}
Index: src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java
===================================================================
--- src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java (revision 575729)
+++ src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java (working copy)
@@ -272,7 +272,6 @@
writer.addIndexesNoOptimize(new Directory[] { aux, aux });
assertEquals(1020, writer.docCount());
- assertEquals(2, writer.getSegmentCount());
assertEquals(1000, writer.getDocCount(0));
writer.close();
@@ -373,7 +372,7 @@
writer = newWriter(dir, true);
writer.setMaxBufferedDocs(1000);
- // add 1000 documents
+ // add 1000 documents in 1 segment
addDocs(writer, 1000);
assertEquals(1000, writer.docCount());
assertEquals(1, writer.getSegmentCount());
Index: src/test/org/apache/lucene/util/_TestUtil.java
===================================================================
--- src/test/org/apache/lucene/util/_TestUtil.java (revision 575729)
+++ src/test/org/apache/lucene/util/_TestUtil.java (working copy)
@@ -19,6 +19,9 @@
import java.io.File;
import java.io.IOException;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.MergeScheduler;
+import org.apache.lucene.index.ConcurrentMergeScheduler;
public class _TestUtil {
@@ -37,4 +40,13 @@
public static void rmDir(String dir) throws IOException {
rmDir(new File(dir));
}
+
+ public static void syncConcurrentMerges(IndexWriter writer) {
+ syncConcurrentMerges(writer.getMergeScheduler());
+ }
+
+ public static void syncConcurrentMerges(MergeScheduler ms) {
+ if (ms instanceof ConcurrentMergeScheduler)
+ ((ConcurrentMergeScheduler) ms).sync();
+ }
}
Index: src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
===================================================================
--- src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (revision 0)
+++ src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (revision 0)
@@ -0,0 +1,277 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.store.Directory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.ArrayList;
+
+/** A {@link MergeScheduler} that runs each merge using a
+ * separate thread, up until a maximum number of threads
+ * ({@link #setMaxThreadCount}) at which points merges are
+ * run in the foreground, serially. This is a simple way
+ * to use concurrency in the indexing process without
+ * having to create and manage application level
+ * threads. */
+
+public class ConcurrentMergeScheduler implements MergeScheduler {
+
+ public static boolean VERBOSE = false;
+
+ private int mergeThreadPriority = -1;
+
+ private List mergeThreads = new ArrayList();
+ private int maxThreadCount = 3;
+
+ private List exceptions = new ArrayList();
+ private Directory dir;
+
+ /** Sets the max # simultaneous threads that may be
+ * running. If a merge is necessary yet we already have
+ * this many threads running, the merge is returned back
+ * to IndexWriter so that it runs in the "foreground". */
+ public void setMaxThreadCount(int count) {
+ if (count < 1)
+ throw new IllegalArgumentException("count should be at least 1");
+ maxThreadCount = count;
+ }
+
+ /** Get the max # simultaneous threads that may be
+ * running. @see #setMaxThreadCount. */
+ public int getMaxThreadCount() {
+ return maxThreadCount;
+ }
+
+ /** Return the priority that merge threads run at. By
+ * default the priority is 1 plus the priority of (ie,
+ * slightly higher priority than) the first thread that
+ * calls merge. */
+ public synchronized int getMergeThreadPriority() {
+ initMergeThreadPriority();
+ return mergeThreadPriority;
+ }
+
+ /** Return the priority that merge threads run at. */
+ public synchronized void setMergeThreadPriority(int pri) {
+ mergeThreadPriority = pri;
+
+ final int numThreads = mergeThreads.size();
+ for(int i=0;i 0) {
+ if (VERBOSE) {
+ message("now wait for threads; currently " + mergeThreads.size() + " still running");
+ for(int i=0;iExpert: a MergePolicy determines the sequence of
+ * primitive merge operations to be used for overall merge
+ * and optimize operations.
+ *
+ * Whenever the segments in an index have been altered by
+ * {@link IndexWriter}, either the addition of a newly
+ * flushed segment, addition of many segments from
+ * addIndexes* calls, or a previous merge that may now need
+ * to cascade, {@link IndexWriter} invokes {@link
+ * #findMerges} to give the MergePolicy a chance to pick
+ * merges that are now required. This method returns a
+ * {@link MergeSpecification} instance describing the set of
+ * merges that should be done, or null if no merges are
+ * necessary. When IndexWriter.optimize is called, it calls
+ * {@link #findMergesForOptimize} and the MergePolicy should
+ * then return the necessary merges.
+ *
+ * Note that the policy can return more than one merge at
+ * a time. In this case, if the writer is using {@link
+ * SerialMergeScheduler}, the merges will be run
+ * sequentially but if it is using {@link
+ * ConcurrentMergeScheduler} they will be run concurrently.
+ *
+ * The default MergePolicy is {@link
+ * LogByteSizeMergePolicy}.
+ */
+
+public interface MergePolicy {
+
+ /** OneMerge provides the information necessary to perform
+ * an individual primitive merge operation, resulting in
+ * a single new segment. The merge spec includes the
+ * subset of segments to be merged as well as whether the
+ * new segment should use the compound file format. */
+
+ public static class OneMerge {
+
+ SegmentInfo info; // used by IndexWriter
+ boolean mergeDocStores; // used by IndexWriter
+ boolean optimize; // used by IndexWriter
+ SegmentInfos segmentsClone; // used by IndexWriter
+ boolean increfDone; // used by IndexWriter
+ boolean registerDone; // used by IndexWriter
+ long mergeGen; // used by IndexWriter
+ boolean isExternal; // used by IndexWriter
+
+ final SegmentInfos segments;
+ final boolean useCompoundFile;
+ boolean aborted;
+ Throwable error;
+
+ public OneMerge(SegmentInfos segments, boolean useCompoundFile) {
+ if (0 == segments.size())
+ throw new RuntimeException("segments must include at least one segment");
+ this.segments = segments;
+ this.useCompoundFile = useCompoundFile;
+ }
+
+ /** Record that an exception occurred while executing
+ * this merge */
+ public synchronized void setException(Throwable error) {
+ this.error = error;
+ }
+
+ /** Retrieve previous exception set by {@link
+ * #setException}. */
+ public synchronized Throwable getException() {
+ return error;
+ }
+
+ /** Mark this merge as aborted. If this is called
+ * before the merge is committed then the merge will
+ * not be committed. */
+ public synchronized void abort() {
+ aborted = true;
+ }
+
+ /** Returns true if this merge was aborted. */
+ public synchronized boolean isAborted() {
+ return aborted;
+ }
+
+ public String segString(Directory dir) {
+ StringBuffer b = new StringBuffer();
+ final int numSegments = segments.size();
+ for(int i=0;i 0) b.append(" ");
+ b.append(segments.info(i).segString(dir));
+ }
+ if (info != null)
+ b.append(" into " + info.name);
+ if (optimize)
+ b.append(" [optimize]");
+ return b.toString();
+ }
+ }
+
+ /**
+ * A MergeSpecification instance provides the information
+ * necessary to perform multiple merges. It simply
+ * contains a list of {@link OneMerge} instances.
+ */
+
+ public static class MergeSpecification implements Cloneable {
+
+ /**
+ * The subset of segments to be included in the primitive merge.
+ */
+
+ public List merges = new ArrayList();
+
+ public void add(OneMerge merge) {
+ merges.add(merge);
+ }
+
+ public String segString(Directory dir) {
+ StringBuffer b = new StringBuffer();
+ b.append("MergeSpec:\n");
+ final int count = merges.size();
+ for(int i=0;iThis class implements a {@link MergePolicy} that tries
+ * to merge segments into levels of exponentially
+ * increasing size, where each level has < mergeFactor
+ * segments in it. Whenever a given levle has mergeFactor
+ * segments or more in it, they will be merged.
+ *
+ * This class is abstract and requires a subclass to
+ * define the {@link #size} method which specifies how a
+ * segment's size is determined. {@link LogDocMergePolicy}
+ * is one subclass that measures size by document count in
+ * the segment. {@link LogByteSizeMergePolicy} is another
+ * subclass that measures size as the total byte size of the
+ * file(s) for the segment.
+ */
+
+public abstract class LogMergePolicy implements MergePolicy {
+
+ /** Defines the allowed range of log(size) for each
+ * level. A level is computed by taking the max segment
+ * log size, minuse LEVEL_LOG_SPAN, and finding all
+ * segments falling within that range. */
+ public static final double LEVEL_LOG_SPAN = 0.75;
+
+ /** Default merge factor, which is how many segments are
+ * merged at a time */
+ public static final int DEFAULT_MERGE_FACTOR = 10;
+
+ private int mergeFactor = DEFAULT_MERGE_FACTOR;
+
+ long minMergeSize;
+ long maxMergeSize;
+
+ private boolean useCompoundFile = true;
+ private boolean useCompoundDocStore = true;
+
+ /** Returns the number of segments that are merged at
+ * once and also controls the total number of segments
+ * allowed to accumulate in the index.
*/
+ public int getMergeFactor() {
+ return mergeFactor;
+ }
+
+ /** Determines how often segment indices are merged by
+ * addDocument(). With smaller values, less RAM is used
+ * while indexing, and searches on unoptimized indices are
+ * faster, but indexing speed is slower. With larger
+ * values, more RAM is used during indexing, and while
+ * searches on unoptimized indices are slower, indexing is
+ * faster. Thus larger values (> 10) are best for batch
+ * index creation, and smaller values (< 10) for indices
+ * that are interactively maintained. */
+ public void setMergeFactor(int mergeFactor) {
+ if (mergeFactor < 2)
+ throw new IllegalArgumentException("mergeFactor cannot be less than 2");
+ this.mergeFactor = mergeFactor;
+ }
+
+ // Javadoc inherited
+ public boolean useCompoundFile(SegmentInfos infos, SegmentInfo info) {
+ return useCompoundFile;
+ }
+
+ /** Sets whether compound file format should be used for
+ * newly flushed and newly merged segments. */
+ public void setUseCompoundFile(boolean useCompoundFile) {
+ this.useCompoundFile = useCompoundFile;
+ }
+
+ /** Returns true if newly flushed and newly merge segments
+ * are written in compound file format. @see
+ * #setUseCompoundFile */
+ public boolean getUseCompoundFile() {
+ return useCompoundFile;
+ }
+
+ // Javadoc inherited
+ public boolean useCompoundDocStore(SegmentInfos infos) {
+ return useCompoundDocStore;
+ }
+
+ /** Sets whether compound file format should be used for
+ * newly flushed and newly merged doc store
+ * segment files (term vectors and stored fields). */
+ public void setUseCompoundDocStore(boolean useCompoundDocStore) {
+ this.useCompoundDocStore = useCompoundDocStore;
+ }
+
+ /** Returns true if newly flushed and newly merge doc
+ * store segment files (term vectors and stored fields)
+ * are written in compound file format. @see
+ * #setUseCompoundDocStore */
+ public boolean getUseCompoundDocStore() {
+ return useCompoundDocStore;
+ }
+
+ public void close() {}
+
+ abstract protected long size(SegmentInfo info) throws IOException;
+
+ private boolean isOptimized(SegmentInfos infos, IndexWriter writer, int maxNumSegments, Set segmentsToOptimize) throws IOException {
+ final int numSegments = infos.size();
+ int numToOptimize = 0;
+ SegmentInfo optimizeInfo = null;
+ for(int i=0;i 0) {
+ final SegmentInfo info = infos.info(--numSegments);
+ if (segmentsToOptimize.contains(info)) {
+ numSegments++;
+ break;
+ }
+ }
+
+ if (numSegments > 0) {
+
+ spec = new MergeSpecification();
+ while (numSegments > 0) {
+
+ final int first;
+ if (numSegments > mergeFactor)
+ first = numSegments-mergeFactor;
+ else
+ first = 0;
+
+ if (numSegments > 1 || !isOptimized(writer, infos.info(0)))
+ spec.add(new OneMerge(infos.range(first, numSegments), useCompoundFile));
+
+ numSegments -= mergeFactor;
+ }
+
+ } else
+ spec = null;
+ } else
+ spec = null;
+
+ return spec;
+ }
+
+ /** Checks if any merges are now necessary and returns a
+ * {@link MergePolicy.MergeSpecification} if so. A merge
+ * is necessary when there are more than {@link
+ * #setMergeFactor} segments at a given level. When
+ * multiple levels have too many segments, this method
+ * will return multiple merges, allowing the {@link
+ * MergeScheduler} to use concurrency. */
+ public MergeSpecification findMerges(SegmentInfos infos, IndexWriter writer) throws IOException {
+
+ final int numSegments = infos.size();
+
+ // Compute levels, which is just log (base mergeFactor)
+ // of the size of each segment
+ float[] levels = new float[numSegments];
+ final float norm = (float) Math.log(mergeFactor);
+
+ final Directory directory = writer.getDirectory();
+
+ for(int i=0;i= maxMergeSize && info.dir != directory)
+ throw new IllegalArgumentException("Segment is too large (" + size + " vs max size " + maxMergeSize + ")");
+
+ // Floor tiny segments
+ if (size < 1)
+ size = 1;
+ levels[i] = (float) Math.log(size)/norm;
+ }
+
+ final float levelFloor;
+ if (minMergeSize <= 0)
+ levelFloor = (float) 0.0;
+ else
+ levelFloor = (float) (Math.log(minMergeSize)/norm);
+
+ // Now, we quantize the log values into levels. The
+ // first level is any segment whose log size is within
+ // LEVEL_LOG_SPAN of the max size, or, who has such as
+ // segment "to the right". Then, we find the max of all
+ // other segments and use that to define the next level
+ // segment, etc.
+
+ MergeSpecification spec = null;
+
+ int start = 0;
+ while(start < numSegments) {
+
+ // Find max level of all segments not already
+ // quantized.
+ float maxLevel = levels[start];
+ for(int i=1+start;i maxLevel)
+ maxLevel = level;
+ }
+
+ // Now search backwards for the rightmost segment that
+ // falls into this level:
+ float levelBottom;
+ if (maxLevel < levelFloor)
+ // All remaining segments fall into the min level
+ levelBottom = -1.0F;
+ else {
+ levelBottom = (float) (maxLevel - LEVEL_LOG_SPAN);
+
+ // Force a boundary at the level floor
+ if (levelBottom < levelFloor && maxLevel >= levelFloor)
+ levelBottom = levelFloor;
+ }
+
+ int upto = numSegments-1;
+ while(upto >= start) {
+ if (levels[upto] >= levelBottom) {
+ break;
+ }
+ upto--;
+ }
+
+ // Finally, record all merges that are viable at this level:
+ int end = start + mergeFactor;
+ while(end <= 1+upto) {
+ boolean anyTooLarge = false;
+ for(int i=start;i= maxMergeSize;
+
+ if (!anyTooLarge) {
+ if (spec == null)
+ spec = new MergeSpecification();
+ spec.add(new OneMerge(infos.range(start, end), useCompoundFile));
+ }
+ start = end;
+ end = start + mergeFactor;
+ }
+
+ start = 1+upto;
+ }
+
+ return spec;
+ }
+}
Property changes on: src/java/org/apache/lucene/index/LogMergePolicy.java
___________________________________________________________________
Name: svn:eol-style
+ native
Index: src/java/org/apache/lucene/index/SerialMergeScheduler.java
===================================================================
--- src/java/org/apache/lucene/index/SerialMergeScheduler.java (revision 0)
+++ src/java/org/apache/lucene/index/SerialMergeScheduler.java (revision 0)
@@ -0,0 +1,42 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+/** A {@link MergeScheduler} that simply does each merge
+ * sequentially, using the current thread. */
+public class SerialMergeScheduler implements MergeScheduler {
+
+ /** Just do the merges in sequence. We do this
+ * "synchronized" so that even if the application is using
+ * multiple threads, only one merge may run at a time. */
+ synchronized public void merge(IndexWriter writer)
+ throws CorruptIndexException, IOException {
+
+ while(true) {
+ MergePolicy.OneMerge merge = writer.getNextMerge();
+ if (merge == null)
+ break;
+ writer.merge(merge);
+ }
+ }
+
+ public void close() {}
+}
Property changes on: src/java/org/apache/lucene/index/SerialMergeScheduler.java
___________________________________________________________________
Name: svn:eol-style
+ native
Index: src/java/org/apache/lucene/index/DocumentsWriter.java
===================================================================
--- src/java/org/apache/lucene/index/DocumentsWriter.java (revision 575729)
+++ src/java/org/apache/lucene/index/DocumentsWriter.java (working copy)
@@ -113,6 +113,7 @@
private int nextDocID; // Next docID to be added
private int numDocsInRAM; // # docs buffered in RAM
+ private int numDocsInStore; // # docs written to doc stores
private int nextWriteDocID; // Next docID to be written
// Max # ThreadState instances; if there are more threads
@@ -238,6 +239,7 @@
String s = docStoreSegment;
docStoreSegment = null;
docStoreOffset = 0;
+ numDocsInStore = 0;
return s;
} else {
return null;
@@ -245,7 +247,12 @@
}
private List files = null; // Cached list of files we've created
+ private List abortedFiles = null; // List of files that were written before last abort()
+ List abortedFiles() {
+ return abortedFiles;
+ }
+
/* Returns list of files in use by this instance,
* including any flushed segments. */
List files() {
@@ -278,6 +285,9 @@
* docs added since last flush. */
synchronized void abort() throws IOException {
+ if (infoStream != null)
+ infoStream.println("docWriter: now abort");
+
// Forcefully remove waiting ThreadStates from line
for(int i=0;i
+ also trigger one or more segment merges which by default
+ run (blocking) with the current thread (see below for changing the {@link
+ MergeScheduler}).
The optional autoCommit argument to the
@@ -135,8 +144,21 @@
filesystems like NFS that do not support "delete on last
close" semantics, which Lucene's "point in time" search
normally relies on.
- */
+ Expert:
+ IndexWriter allows you to separately change
+ the {@link MergePolicy} and the {@link MergeScheduler}.
+ The {@link MergePolicy} is invoked whenever there are
+ changes to the segments in the index. Its role is to
+ select which merges to do, if any, and return a {@link
+ MergePolicy.MergeSpecification} describing the merges. It
+ also selects merges to do for optimize(). (The default is
+ {@link LogDocMergePolicy}. Then, the {@link
+ MergeScheduler} is invoked with the requested merges and
+ it decides when and how to run the merges. The default is
+ {@link SerialMergeScheduler}.
+*/
+
/*
* Clarification: Check Points (and commits)
* Being able to set autoCommit=false allows IndexWriter to flush and
@@ -177,9 +199,10 @@
public static final String WRITE_LOCK_NAME = "write.lock";
/**
- * Default value is 10. Change using {@link #setMergeFactor(int)}.
+ * @deprecated
+ * @see LogMergePolicy#DEFAULT_MERGE_FACTOR
*/
- public final static int DEFAULT_MERGE_FACTOR = 10;
+ public final static int DEFAULT_MERGE_FACTOR = LogMergePolicy.DEFAULT_MERGE_FACTOR;
/**
* Default value is 10. Change using {@link #setMaxBufferedDocs(int)}.
@@ -205,9 +228,10 @@
public final static int DEFAULT_MAX_BUFFERED_DELETE_TERMS = 1000;
/**
- * Default value is {@link Integer#MAX_VALUE}. Change using {@link #setMaxMergeDocs(int)}.
+ * @deprecated
+ * @see LogDocMergePolicy#DEFAULT_MAX_MERGE_DOCS
*/
- public final static int DEFAULT_MAX_MERGE_DOCS = Integer.MAX_VALUE;
+ public final static int DEFAULT_MAX_MERGE_DOCS = LogDocMergePolicy.DEFAULT_MAX_MERGE_DOCS;
/**
* Default value is 10,000. Change using {@link #setMaxFieldLength(int)}.
@@ -239,23 +263,31 @@
private boolean localAutoCommit; // saved autoCommit during local transaction
private boolean autoCommit = true; // false if we should commit only on close
- SegmentInfos segmentInfos = new SegmentInfos(); // the segments
+ private SegmentInfos segmentInfos = new SegmentInfos(); // the segments
private DocumentsWriter docWriter;
private IndexFileDeleter deleter;
+ private Set segmentsToOptimize = new HashSet(); // used by optimize to note those needing optimization
+
private Lock writeLock;
private int termIndexInterval = DEFAULT_TERM_INDEX_INTERVAL;
- /** Use compound file setting. Defaults to true, minimizing the number of
- * files used. Setting this to false may improve indexing performance, but
- * may also cause file handle problems.
- */
- private boolean useCompoundFile = true;
-
private boolean closeDir;
private boolean closed;
+ private boolean closing;
+ // Holds all SegmentInfo instances currently involved in
+ // merges
+ private HashSet mergingSegments = new HashSet();
+
+ private MergePolicy mergePolicy = new LogDocMergePolicy();
+ private MergeScheduler mergeScheduler = new SerialMergeScheduler();
+ private LinkedList pendingMerges = new LinkedList();
+ private Set runningMerges = new HashSet();
+ private List mergeExceptions = new ArrayList();
+ private long mergeGen;
+
/**
* Used internally to throw an {@link
* AlreadyClosedException} if this IndexWriter has been
@@ -268,23 +300,57 @@
}
}
- /** Get the current setting of whether to use the compound file format.
- * Note that this just returns the value you set with setUseCompoundFile(boolean)
- * or the default. You cannot use this to query the status of an existing index.
+ private void message(String message) {
+ infoStream.println("IW [" + Thread.currentThread().getName() + "]: " + message);
+ }
+
+ /**
+ * Casts current mergePolicy to LogMergePolicy, and throws
+ * an exception if the mergePolicy is not a LogMergePolicy.
+ */
+ private LogMergePolicy getLogMergePolicy() {
+ if (mergePolicy instanceof LogMergePolicy)
+ return (LogMergePolicy) mergePolicy;
+ else
+ throw new IllegalArgumentException("this method can only be called when the merge policy is the default LogMergePolicy");
+ }
+
+ private LogDocMergePolicy getLogDocMergePolicy() {
+ if (mergePolicy instanceof LogDocMergePolicy)
+ return (LogDocMergePolicy) mergePolicy;
+ else
+ throw new IllegalArgumentException("this method can only be called when the merge policy is LogDocMergePolicy");
+ }
+
+ /** Get the current setting of whether newly flushed
+ * segments will use the compound file format. Note that
+ * this just returns the value previously set with
+ * setUseCompoundFile(boolean), or the default value
+ * (true). You cannot use this to query the status of
+ * previously flushed segments.
+ *
+ * Note that this method is a convenience method: it
+ * just calls mergePolicy.getUseCompoundFile as long as
+ * mergePolicy is an instance of {@link LogMergePolicy}.
+ * Otherwise an IllegalArgumentException is thrown.
+ *
* @see #setUseCompoundFile(boolean)
*/
public boolean getUseCompoundFile() {
- ensureOpen();
- return useCompoundFile;
+ return getLogMergePolicy().getUseCompoundFile();
}
- /** Setting to turn on usage of a compound file. When on, multiple files
- * for each segment are merged into a single file once the segment creation
- * is finished. This is done regardless of what directory is in use.
+ /** Setting to turn on usage of a compound file. When on,
+ * multiple files for each segment are merged into a
+ * single file when a new segment is flushed.
+ *
+ * Note that this method is a convenience method: it
+ * just calls mergePolicy.setUseCompoundFile as long as
+ * mergePolicy is an instance of {@link LogMergePolicy}.
+ * Otherwise an IllegalArgumentException is thrown.
*/
public void setUseCompoundFile(boolean value) {
- ensureOpen();
- useCompoundFile = value;
+ getLogMergePolicy().setUseCompoundFile(value);
}
/** Expert: Set the Similarity implementation used by this IndexWriter.
@@ -635,6 +701,8 @@
deletionPolicy == null ? new KeepOnlyLastCommitDeletionPolicy() : deletionPolicy,
segmentInfos, infoStream, docWriter);
+ pushMaxBufferedDocs();
+
} catch (IOException e) {
this.writeLock.release();
this.writeLock = null;
@@ -642,26 +710,83 @@
}
}
+ /**
+ * Expert: set the merge policy used by this writer.
+ */
+ public void setMergePolicy(MergePolicy mp) {
+ ensureOpen();
+ if (mp == null)
+ throw new NullPointerException("MergePolicy must be non-null");
+
+ if (mergePolicy != mp)
+ mergePolicy.close();
+ mergePolicy = mp;
+ pushMaxBufferedDocs();
+ }
+
+ /**
+ * Expert: returns the current MergePolicy in use by this writer.
+ * @see #setMergePolicy
+ */
+ public MergePolicy getMergePolicy() {
+ ensureOpen();
+ return mergePolicy;
+ }
+
+ /**
+ * Expert: set the merge scheduler used by this writer.
+ */
+ public void setMergeScheduler(MergeScheduler mergeScheduler) throws CorruptIndexException, IOException {
+ ensureOpen();
+ if (mergeScheduler == null)
+ throw new NullPointerException("MergeScheduler must be non-null");
+
+ if (this.mergeScheduler != mergeScheduler) {
+ finishMerges(true);
+ this.mergeScheduler.close();
+ }
+ this.mergeScheduler = mergeScheduler;
+ }
+
+ /**
+ * Expert: returns the current MergePolicy in use by this
+ * writer.
+ * @see #setMergePolicy
+ */
+ public MergeScheduler getMergeScheduler() {
+ ensureOpen();
+ return mergeScheduler;
+ }
+
/** Determines the largest number of documents ever merged by addDocument().
* Small values (e.g., less than 10,000) are best for interactive indexing,
* as this limits the length of pauses while indexing to a few seconds.
* Larger values are best for batched indexing and speedier searches.
*
* The default value is {@link Integer#MAX_VALUE}.
+ *
+ *
Note that this method is a convenience method: it
+ * just calls mergePolicy.setMaxMergeDocs as long as
+ * mergePolicy is an instance of {@link LogMergePolicy}.
+ * Otherwise an IllegalArgumentException is thrown.
*/
public void setMaxMergeDocs(int maxMergeDocs) {
- ensureOpen();
- this.maxMergeDocs = maxMergeDocs;
+ getLogDocMergePolicy().setMaxMergeDocs(maxMergeDocs);
}
- /**
+ /**
* Returns the largest number of documents allowed in a
* single segment.
+ *
+ * Note that this method is a convenience method: it
+ * just calls mergePolicy.getMaxMergeDocs as long as
+ * mergePolicy is an instance of {@link LogMergePolicy}.
+ * Otherwise an IllegalArgumentException is thrown.
+ *
* @see #setMaxMergeDocs
*/
public int getMaxMergeDocs() {
- ensureOpen();
- return maxMergeDocs;
+ return getLogDocMergePolicy().getMaxMergeDocs();
}
/**
@@ -713,9 +838,30 @@
if (maxBufferedDocs < 2)
throw new IllegalArgumentException("maxBufferedDocs must at least be 2");
docWriter.setMaxBufferedDocs(maxBufferedDocs);
+ pushMaxBufferedDocs();
}
/**
+ * If we are flushing by doc count (not by RAM usage), and
+ * using LogDocMergePolicy then push maxBufferedDocs down
+ * as its minMergeDocs, to keep backwards compatibility.
+ */
+ private void pushMaxBufferedDocs() {
+ if (docWriter.getRAMBufferSizeMB() == 0.0) {
+ final MergePolicy mp = mergePolicy;
+ if (mp instanceof LogDocMergePolicy) {
+ LogDocMergePolicy lmp = (LogDocMergePolicy) mp;
+ final int maxBufferedDocs = docWriter.getMaxBufferedDocs();
+ if (lmp.getMinMergeDocs() != maxBufferedDocs) {
+ if (infoStream != null)
+ message("now push maxBufferedDocs " + maxBufferedDocs + " to LogDocMergePolicy");
+ lmp.setMinMergeDocs(maxBufferedDocs);
+ }
+ }
+ }
+ }
+
+ /**
* Returns 0 if this writer is flushing by RAM usage, else
* returns the number of buffered added documents that will
* trigger a flush.
@@ -784,24 +930,31 @@
* for batch index creation, and smaller values (< 10) for indices that are
* interactively maintained.
*
+ * Note that this method is a convenience method: it
+ * just calls mergePolicy.setMergeFactor as long as
+ * mergePolicy is an instance of {@link LogMergePolicy}.
+ * Otherwise an IllegalArgumentException is thrown.
+ *
* This must never be less than 2. The default value is 10.
*/
public void setMergeFactor(int mergeFactor) {
- ensureOpen();
- if (mergeFactor < 2)
- throw new IllegalArgumentException("mergeFactor cannot be less than 2");
- this.mergeFactor = mergeFactor;
+ getLogMergePolicy().setMergeFactor(mergeFactor);
}
/**
- * Returns the number of segments that are merged at once
- * and also controls the total number of segments allowed
- * to accumulate in the index.
+ *
Returns the number of segments that are merged at
+ * once and also controls the total number of segments
+ * allowed to accumulate in the index.
+ *
+ * Note that this method is a convenience method: it
+ * just calls mergePolicy.getMergeFactor as long as
+ * mergePolicy is an instance of {@link LogMergePolicy}.
+ * Otherwise an IllegalArgumentException is thrown.
+ *
* @see #setMergeFactor
*/
public int getMergeFactor() {
- ensureOpen();
- return mergeFactor;
+ return getLogMergePolicy().getMergeFactor();
}
/** If non-null, this will be the default infoStream used
@@ -910,15 +1063,75 @@
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
- public synchronized void close() throws CorruptIndexException, IOException {
- if (!closed) {
+ public void close() throws CorruptIndexException, IOException {
+ close(true);
+ }
+
+ /**
+ * Closes the index with or without waiting for currently
+ * running merges to finish. This is only meaningful when
+ * using a MergeScheduler that runs merges in background
+ * threads.
+ * @param waitForMerges if true, this call will block
+ * until all merges complete; else, it will abort all
+ * running merges and return right away
+ */
+ public void close(boolean waitForMerges) throws CorruptIndexException, IOException {
+ boolean doClose;
+ synchronized(this) {
+ // Ensure that only one thread actually gets to do the closing:
+ if (!closing) {
+ doClose = true;
+ closing = true;
+ } else
+ doClose = false;
+ }
+ if (doClose)
+ closeInternal(waitForMerges);
+ else
+ // Another thread beat us to it (is actually doing the
+ // close), so we will block until that other thread
+ // has finished closing
+ waitForClose();
+ }
+
+ synchronized private void waitForClose() {
+ while(!closed && closing) {
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+
+ private void closeInternal(boolean waitForMerges) throws CorruptIndexException, IOException {
+ try {
+
flush(true, true);
+ mergePolicy.close();
+
+ finishMerges(waitForMerges);
+
+ mergeScheduler.close();
+
if (commitPending) {
- segmentInfos.write(directory); // now commit changes
+ boolean success = false;
+ try {
+ segmentInfos.write(directory); // now commit changes
+ success = true;
+ } finally {
+ if (!success) {
+ if (infoStream != null)
+ message("hit exception committing segments file during close");
+ deletePartialSegmentsFile();
+ }
+ }
if (infoStream != null)
- infoStream.println("close: wrote segments file \"" + segmentInfos.getCurrentSegmentFileName() + "\"");
- deleter.checkpoint(segmentInfos, true);
+ message("close: wrote segments file \"" + segmentInfos.getCurrentSegmentFileName() + "\"");
+ synchronized(this) {
+ deleter.checkpoint(segmentInfos, true);
+ }
commitPending = false;
rollbackSegmentInfos = null;
}
@@ -930,17 +1143,31 @@
closed = true;
docWriter = null;
- if(closeDir)
+ synchronized(this) {
+ deleter.close();
+ }
+
+ if (closeDir)
directory.close();
+ } finally {
+ synchronized(this) {
+ if (!closed)
+ closing = false;
+ notifyAll();
+ }
}
}
/** Tells the docWriter to close its currently open shared
- * doc stores (stored fields & vectors files). */
- private void flushDocStores() throws IOException {
+ * doc stores (stored fields & vectors files).
+ * Return value specifices whether new doc store files are compound or not.
+ */
+ private synchronized boolean flushDocStores() throws IOException {
List files = docWriter.files();
+ boolean useCompoundDocStore = false;
+
if (files.size() > 0) {
String docStoreSegment;
@@ -949,20 +1176,25 @@
docStoreSegment = docWriter.closeDocStore();
success = true;
} finally {
- if (!success)
+ if (!success) {
+ if (infoStream != null)
+ message("hit exception closing doc store segment");
docWriter.abort();
+ }
}
- if (useCompoundFile && docStoreSegment != null) {
+ useCompoundDocStore = mergePolicy.useCompoundDocStore(segmentInfos);
+
+ if (useCompoundDocStore && docStoreSegment != null) {
// Now build compound doc store file
- checkpoint();
success = false;
final int numSegments = segmentInfos.size();
+ final String compoundFileName = docStoreSegment + "." + IndexFileNames.COMPOUND_FILE_STORE_EXTENSION;
try {
- CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, docStoreSegment + "." + IndexFileNames.COMPOUND_FILE_STORE_EXTENSION);
+ CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName);
final int size = files.size();
for(int i=0;i
*
- * The amount of free space required when a merge is
- * triggered is up to 1X the size of all segments being
- * merged, when no readers/searchers are open against the
- * index, and up to 2X the size of all segments being
- * merged when readers/searchers are open against the
- * index (see {@link #optimize()} for details). Most
- * merges are small (merging the smallest segments
- * together), but whenever a full merge occurs (all
- * segments in the index, which is the worst case for
- * temporary space usage) then the maximum free disk space
- * required is the same as {@link #optimize}.
+ * The amount of free space required when a merge is triggered is
+ * up to 1X the size of all segments being merged, when no
+ * readers/searchers are open against the index, and up to 2X the
+ * size of all segments being merged when readers/searchers are open
+ * against the index (see {@link #optimize()} for details). The
+ * sequence of primitive merge operations performed is governed by
+ * the merge policy.
*
*
Note that each term in the document can be no longer
* than 16383 characters, otherwise an
@@ -1105,14 +1340,27 @@
*/
public void addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException {
ensureOpen();
+ boolean doFlush = false;
boolean success = false;
try {
- success = docWriter.addDocument(doc, analyzer);
- } catch (IOException ioe) {
- deleter.refresh();
- throw ioe;
+ doFlush = docWriter.addDocument(doc, analyzer);
+ success = true;
+ } finally {
+ if (!success) {
+
+ if (infoStream != null)
+ message("hit exception adding document");
+
+ synchronized (this) {
+ // If docWriter has some aborted files that were
+ // never incref'd, then we clean them up here
+ final List files = docWriter.abortedFiles();
+ if (files != null)
+ deleter.deleteNewFiles(files);
+ }
+ }
}
- if (success)
+ if (doFlush)
flush(true, false);
}
@@ -1178,11 +1426,24 @@
throws CorruptIndexException, IOException {
ensureOpen();
boolean doFlush = false;
+ boolean success = false;
try {
doFlush = docWriter.updateDocument(term, doc, analyzer);
- } catch (IOException ioe) {
- deleter.refresh();
- throw ioe;
+ success = true;
+ } finally {
+ if (!success) {
+
+ if (infoStream != null)
+ message("hit exception updating document");
+
+ synchronized (this) {
+ // If docWriter has some aborted files that were
+ // never incref'd, then we clean them up here
+ final List files = docWriter.abortedFiles();
+ if (files != null)
+ deleter.deleteNewFiles(files);
+ }
+ }
}
if (doFlush)
flush(true, false);
@@ -1208,54 +1469,40 @@
}
final String newSegmentName() {
- return "_" + Integer.toString(segmentInfos.counter++, Character.MAX_RADIX);
+ // Cannot synchronize on IndexWriter because that causes
+ // deadlock
+ synchronized(segmentInfos) {
+ return "_" + Integer.toString(segmentInfos.counter++, Character.MAX_RADIX);
+ }
}
- /** Determines how often segment indices are merged by addDocument(). With
- * smaller values, less RAM is used while indexing, and searches on
- * unoptimized indices are faster, but indexing speed is slower. With larger
- * values, more RAM is used during indexing, and while searches on unoptimized
- * indices are slower, indexing is faster. Thus larger values (> 10) are best
- * for batch index creation, and smaller values (< 10) for indices that are
- * interactively maintained.
- *
- *
This must never be less than 2. The default value is {@link #DEFAULT_MERGE_FACTOR}.
-
- */
- private int mergeFactor = DEFAULT_MERGE_FACTOR;
-
/** Determines amount of RAM usage by the buffered docs at
* which point we trigger a flush to the index.
*/
private double ramBufferSize = DEFAULT_RAM_BUFFER_SIZE_MB*1024F*1024F;
- /** Determines the largest number of documents ever merged by addDocument().
- * Small values (e.g., less than 10,000) are best for interactive indexing,
- * as this limits the length of pauses while indexing to a few seconds.
- * Larger values are best for batched indexing and speedier searches.
- *
- *
The default value is {@link #DEFAULT_MAX_MERGE_DOCS}.
-
- */
- private int maxMergeDocs = DEFAULT_MAX_MERGE_DOCS;
-
/** If non-null, information about merges will be printed to this.
*/
private PrintStream infoStream = null;
-
private static PrintStream defaultInfoStream = null;
- /** Merges all segments together into a single segment,
- * optimizing an index for search.
+ /**
+ * Requests an "optimize" operation on an index, priming the index
+ * for the fastest available search. Traditionally this has meant
+ * merging all segments into a single segment as is done in the
+ * default merge policy, but individaul merge policies may implement
+ * optimize in different ways.
*
+ * @see LogMergePolicy#findMergesForOptimize
+ *
*
It is recommended that this method be called upon completion of indexing. In
* environments with frequent updates, optimize is best done during low volume times, if at all.
*
*
* See http://www.gossamer-threads.com/lists/lucene/java-dev/47895 for more discussion.
*
- * Note that this requires substantial temporary free
+ *
Note that this can require substantial temporary free
* space in the Directory (see LUCENE-764
* for details):
@@ -1293,7 +1540,7 @@
* The actual temporary usage could be much less than
* these figures (it depends on many factors).
*
- * Once the optimize completes, the total size of the
+ *
In general, once the optimize completes, the total size of the
* index will be less than the size of the starting index.
* It could be quite a bit smaller (if there were many
* pending deletes) or just slightly smaller.
@@ -1307,24 +1554,158 @@
* using compound file format. This will occur when the
* Exception is hit during conversion of the segment into
* compound format.
+ *
+ * This call will optimize those segments present in
+ * the index when the call started. If other threads are
+ * still adding documents and flushing segments, those
+ * newly created segments will not be optimized unless you
+ * call optimize again.
+ *
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
- public synchronized void optimize() throws CorruptIndexException, IOException {
+ public void optimize() throws CorruptIndexException, IOException {
+ optimize(true);
+ }
+
+ /** Just like {@link #optimize()}, except you can specify
+ * whether the call should block until the optimize
+ * completes. This is only meaningful with a
+ * {@link MergeScheduler} that is able to run merges in
+ * background threads. */
+ public void optimize(boolean doWait) throws CorruptIndexException, IOException {
ensureOpen();
flush();
- while (segmentInfos.size() > 1 ||
- (segmentInfos.size() == 1 &&
- (SegmentReader.hasDeletions(segmentInfos.info(0)) ||
- SegmentReader.hasSeparateNorms(segmentInfos.info(0)) ||
- segmentInfos.info(0).dir != directory ||
- (useCompoundFile &&
- !segmentInfos.info(0).getUseCompoundFile())))) {
- int minSegment = segmentInfos.size() - mergeFactor;
- mergeSegments(minSegment < 0 ? 0 : minSegment, segmentInfos.size());
+
+ if (infoStream != null)
+ message("optimize: index now " + segString());
+
+ synchronized(this) {
+ resetMergeExceptions();
+ segmentsToOptimize = new HashSet();
+ final int numSegments = segmentInfos.size();
+ for(int i=0;i 0) {
+ // Forward any exceptions in background merge
+ // threads to the current thread:
+ final int size = mergeExceptions.size();
+ for(int i=0;iautoCommit=true.
* @throws IOException if there is a low-level IO error
*/
- public synchronized void abort() throws IOException {
+ public void abort() throws IOException {
ensureOpen();
- if (!autoCommit) {
+ if (autoCommit)
+ throw new IllegalStateException("abort() can only be called when IndexWriter was opened with autoCommit=false");
- // Keep the same segmentInfos instance but replace all
- // of its SegmentInfo instances. This is so the next
- // attempt to commit using this instance of IndexWriter
- // will always write to a new generation ("write once").
- segmentInfos.clear();
- segmentInfos.addAll(rollbackSegmentInfos);
+ boolean doClose;
+ synchronized(this) {
+ // Ensure that only one thread actually gets to do the closing:
+ if (!closing) {
+ doClose = true;
+ closing = true;
+ } else
+ doClose = false;
+ }
- docWriter.abort();
+ if (doClose) {
- // Ask deleter to locate unreferenced files & remove
- // them:
- deleter.checkpoint(segmentInfos, false);
- deleter.refresh();
+ finishMerges(false);
+ // Must pre-close these two, in case they set
+ // commitPending=true, so that we can then set it to
+ // false before calling closeInternal
+ mergePolicy.close();
+ mergeScheduler.close();
+
+ synchronized(this) {
+ // Keep the same segmentInfos instance but replace all
+ // of its SegmentInfo instances. This is so the next
+ // attempt to commit using this instance of IndexWriter
+ // will always write to a new generation ("write
+ // once").
+ segmentInfos.clear();
+ segmentInfos.addAll(rollbackSegmentInfos);
+
+ docWriter.abort();
+
+ // Ask deleter to locate unreferenced files & remove
+ // them:
+ deleter.checkpoint(segmentInfos, false);
+ deleter.refresh();
+ finishMerges(false);
+ }
+
commitPending = false;
- docWriter.abort();
- close();
+ closeInternal(false);
+ } else
+ waitForClose();
+ }
+ private synchronized void finishMerges(boolean waitForMerges) {
+ if (!waitForMerges) {
+ // Abort all pending & running merges:
+ Iterator it = pendingMerges.iterator();
+ while(it.hasNext())
+ ((MergePolicy.OneMerge) it.next()).abort();
+
+ pendingMerges.clear();
+ it = runningMerges.iterator();
+ while(it.hasNext())
+ ((MergePolicy.OneMerge) it.next()).abort();
+
+ runningMerges.clear();
+ mergingSegments.clear();
+ notifyAll();
} else {
- throw new IllegalStateException("abort() can only be called when IndexWriter was opened with autoCommit=false");
+ while(pendingMerges.size() > 0 || runningMerges.size() > 0) {
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ }
+ }
+ assert 0 == mergingSegments.size();
}
}
@@ -1461,11 +1903,11 @@
* commit the change immediately. Else, we mark
* commitPending.
*/
- private void checkpoint() throws IOException {
+ private synchronized void checkpoint() throws IOException {
if (autoCommit) {
segmentInfos.write(directory);
if (infoStream != null)
- infoStream.println("checkpoint: wrote segments file \"" + segmentInfos.getCurrentSegmentFileName() + "\"");
+ message("checkpoint: wrote segments file \"" + segmentInfos.getCurrentSegmentFileName() + "\"");
} else {
commitPending = true;
}
@@ -1521,7 +1963,7 @@
throws CorruptIndexException, IOException {
ensureOpen();
- optimize(); // start with zero or 1 seg
+ flush();
int start = segmentInfos.size();
@@ -1538,15 +1980,8 @@
}
}
- // merge newly added segments in log(n) passes
- while (segmentInfos.size() > start+mergeFactor) {
- for (int base = start; base < segmentInfos.size(); base++) {
- int end = Math.min(segmentInfos.size(), base+mergeFactor);
- if (end-base > 1) {
- mergeSegments(base, end);
- }
- }
- }
+ optimize();
+
success = true;
} finally {
if (success) {
@@ -1555,8 +1990,11 @@
rollbackTransaction();
}
}
+ }
- optimize(); // final cleanup
+ private synchronized void resetMergeExceptions() {
+ mergeExceptions = new ArrayList();
+ mergeGen++;
}
/**
@@ -1578,40 +2016,10 @@
*/
public synchronized void addIndexesNoOptimize(Directory[] dirs)
throws CorruptIndexException, IOException {
- // Adding indexes can be viewed as adding a sequence of segments S to
- // a sequence of segments T. Segments in T follow the invariants but
- // segments in S may not since they could come from multiple indexes.
- // Here is the merge algorithm for addIndexesNoOptimize():
- //
- // 1 Flush ram.
- // 2 Consider a combined sequence with segments from T followed
- // by segments from S (same as current addIndexes(Directory[])).
- // 3 Assume the highest level for segments in S is h. Call
- // maybeMergeSegments(), but instead of starting w/ lowerBound = -1
- // and upperBound = maxBufferedDocs, start w/ lowerBound = -1 and
- // upperBound = upperBound of level h. After this, the invariants
- // are guaranteed except for the last < M segments whose levels <= h.
- // 4 If the invariants hold for the last < M segments whose levels <= h,
- // if some of those < M segments are from S (not merged in step 3),
- // properly copy them over*, otherwise done.
- // Otherwise, simply merge those segments. If the merge results in
- // a segment of level <= h, done. Otherwise, it's of level h+1 and call
- // maybeMergeSegments() starting w/ upperBound = upperBound of level h+1.
- //
- // * Ideally, we want to simply copy a segment. However, directory does
- // not support copy yet. In addition, source may use compound file or not
- // and target may use compound file or not. So we use mergeSegments() to
- // copy a segment, which may cause doc count to change because deleted
- // docs are garbage collected.
- // 1 flush ram
-
ensureOpen();
flush();
- // 2 copy segment infos and find the highest level from dirs
- int startUpperBound = docWriter.getMaxBufferedDocs();
-
/* new merge policy
if (startUpperBound == 0)
startUpperBound = 10;
@@ -1634,64 +2042,20 @@
for (int j = 0; j < sis.size(); j++) {
SegmentInfo info = sis.info(j);
segmentInfos.addElement(info); // add each info
-
- while (startUpperBound < info.docCount) {
- startUpperBound *= mergeFactor; // find the highest level from dirs
- if (startUpperBound > maxMergeDocs) {
- // upper bound cannot exceed maxMergeDocs
- throw new IllegalArgumentException("Upper bound cannot exceed maxMergeDocs");
- }
- }
}
}
- // 3 maybe merge segments starting from the highest level from dirs
- maybeMergeSegments(startUpperBound);
+ maybeMerge();
- // get the tail segments whose levels <= h
- int segmentCount = segmentInfos.size();
- int numTailSegments = 0;
- while (numTailSegments < segmentCount
- && startUpperBound >= segmentInfos.info(segmentCount - 1 - numTailSegments).docCount) {
- numTailSegments++;
- }
- if (numTailSegments == 0) {
- success = true;
- return;
- }
+ // If after merging there remain segments in the index
+ // that are in a different directory, just copy these
+ // over into our index. This is necessary (before
+ // finishing the transaction) to avoid leaving the
+ // index in an unusable (inconsistent) state.
+ copyExternalSegments();
- // 4 make sure invariants hold for the tail segments whose levels <= h
- if (checkNonDecreasingLevels(segmentCount - numTailSegments)) {
- // identify the segments from S to be copied (not merged in 3)
- int numSegmentsToCopy = 0;
- while (numSegmentsToCopy < segmentCount
- && directory != segmentInfos.info(segmentCount - 1 - numSegmentsToCopy).dir) {
- numSegmentsToCopy++;
- }
- if (numSegmentsToCopy == 0) {
- success = true;
- return;
- }
+ success = true;
- // copy those segments from S
- for (int i = segmentCount - numSegmentsToCopy; i < segmentCount; i++) {
- mergeSegments(i, i + 1);
- }
- if (checkNonDecreasingLevels(segmentCount - numSegmentsToCopy)) {
- success = true;
- return;
- }
- }
-
- // invariants do not hold, simply merge those segments
- mergeSegments(segmentCount - numTailSegments, segmentCount);
-
- // maybe merge segments again if necessary
- if (segmentInfos.info(segmentInfos.size() - 1).docCount > startUpperBound) {
- maybeMergeSegments(startUpperBound * mergeFactor);
- }
-
- success = true;
} finally {
if (success) {
commitTransaction();
@@ -1701,6 +2065,33 @@
}
}
+ /* If any of our segments are using a directory != ours
+ * then copy them over. Currently this is only used by
+ * addIndexesNoOptimize(). */
+ private synchronized void copyExternalSegments() throws CorruptIndexException, IOException {
+ final int numSegments = segmentInfos.size();
+ for(int i=0;iAfter this completes, the index is optimized.
* The provided IndexReaders are not closed.
@@ -1754,6 +2145,9 @@
} finally {
if (!success) {
+ if (infoStream != null)
+ message("hit exception in addIndexes during merge");
+
rollbackTransaction();
} else {
commitTransaction();
@@ -1765,7 +2159,7 @@
}
}
- if (useCompoundFile) {
+ if (mergePolicy instanceof LogMergePolicy && getUseCompoundFile()) {
boolean success = false;
@@ -1776,6 +2170,9 @@
info.setUseCompoundFile(true);
} finally {
if (!success) {
+ if (infoStream != null)
+ message("hit exception building compound file in addIndexes during merge");
+
rollbackTransaction();
} else {
commitTransaction();
@@ -1784,40 +2181,6 @@
}
}
- // Overview of merge policy:
- //
- // A flush is triggered either by close() or by the number of ram segments
- // reaching maxBufferedDocs. After a disk segment is created by the flush,
- // further merges may be triggered.
- //
- // LowerBound and upperBound set the limits on the doc count of a segment
- // which may be merged. Initially, lowerBound is set to 0 and upperBound
- // to maxBufferedDocs. Starting from the rightmost* segment whose doc count
- // > lowerBound and <= upperBound, count the number of consecutive segments
- // whose doc count <= upperBound.
- //
- // Case 1: number of worthy segments < mergeFactor, no merge, done.
- // Case 2: number of worthy segments == mergeFactor, merge these segments.
- // If the doc count of the merged segment <= upperBound, done.
- // Otherwise, set lowerBound to upperBound, and multiply upperBound
- // by mergeFactor, go through the process again.
- // Case 3: number of worthy segments > mergeFactor (in the case mergeFactor
- // M changes), merge the leftmost* M segments. If the doc count of
- // the merged segment <= upperBound, consider the merged segment for
- // further merges on this same level. Merge the now leftmost* M
- // segments, and so on, until number of worthy segments < mergeFactor.
- // If the doc count of all the merged segments <= upperBound, done.
- // Otherwise, set lowerBound to upperBound, and multiply upperBound
- // by mergeFactor, go through the process again.
- // Note that case 2 can be considerd as a special case of case 3.
- //
- // This merge policy guarantees two invariants if M does not change and
- // segment doc count is not reaching maxMergeDocs:
- // B for maxBufferedDocs, f(n) defined as ceil(log_M(ceil(n/B)))
- // 1: If i (left*) and i+1 (right*) are two consecutive segments of doc
- // counts x and y, then f(x) >= f(y).
- // 2: The number of committed segments on the same level (f(n)) <= M.
-
// This is called after pending added and deleted
// documents have been flushed to the Directory but before
// the change is committed (new segments_N file written).
@@ -1833,7 +2196,7 @@
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
- public final synchronized void flush() throws CorruptIndexException, IOException {
+ public final void flush() throws CorruptIndexException, IOException {
flush(true, false);
}
@@ -1845,9 +2208,15 @@
* @param flushDocStores if false we are allowed to keep
* doc stores open to share with the next segment
*/
- protected final synchronized void flush(boolean triggerMerge, boolean flushDocStores) throws CorruptIndexException, IOException {
+ protected final void flush(boolean triggerMerge, boolean flushDocStores) throws CorruptIndexException, IOException {
ensureOpen();
+ if (doFlush(flushDocStores) && triggerMerge)
+ maybeMerge();
+ }
+
+ private synchronized final boolean doFlush(boolean flushDocStores) throws CorruptIndexException, IOException {
+
// Make sure no threads are actively adding a document
docWriter.pauseAllThreads();
@@ -1877,10 +2246,14 @@
boolean flushDeletes = docWriter.hasDeletes();
if (infoStream != null)
- infoStream.println(" flush: flushDocs=" + flushDocs +
- " flushDeletes=" + flushDeletes +
- " flushDocStores=" + flushDocStores +
- " numDocs=" + numDocs);
+ message(" flush: segment=" + docWriter.getSegment() +
+ " docStoreSegment=" + docWriter.getDocStoreSegment() +
+ " docStoreOffset=" + docWriter.getDocStoreOffset() +
+ " flushDocs=" + flushDocs +
+ " flushDeletes=" + flushDeletes +
+ " flushDocStores=" + flushDocStores +
+ " numDocs=" + numDocs +
+ " numBufDelTerms=" + docWriter.getNumBufferedDeleteTerms());
int docStoreOffset = docWriter.getDocStoreOffset();
boolean docStoreIsCompoundFile = false;
@@ -1891,15 +2264,17 @@
if (flushDocStores && (!flushDocs || !docWriter.getSegment().equals(docWriter.getDocStoreSegment()))) {
// We must separately flush the doc store
if (infoStream != null)
- infoStream.println(" flush shared docStore segment " + docStoreSegment);
+ message(" flush shared docStore segment " + docStoreSegment);
- flushDocStores();
+ docStoreIsCompoundFile = flushDocStores();
flushDocStores = false;
- docStoreIsCompoundFile = useCompoundFile;
}
String segment = docWriter.getSegment();
+ // If we are flushing docs, segment must not be null:
+ assert segment != null || !flushDocs;
+
if (flushDocs || flushDeletes) {
SegmentInfos rollback = null;
@@ -1948,7 +2323,22 @@
success = true;
} finally {
if (!success) {
+
+ if (infoStream != null)
+ message("hit exception flushing segment " + segment);
+
if (flushDeletes) {
+
+ // Carefully check if any partial .del files
+ // should be removed:
+ final int size = rollback.size();
+ for(int i=0;i= 0) {
- SegmentInfo si = segmentInfos.info(minSegment);
+ final int numSegments = segmentInfos.size();
+
+ final int numSegmentsToMerge = merge.segments.size();
+ for(int i=0;i lowerBound && si.docCount <= upperBound) {
- // start from the rightmost* segment whose doc count is in bounds
- maxSegment = minSegment;
- } else if (si.docCount > upperBound) {
- // until the segment whose doc count exceeds upperBound
- break;
- }
+ if (first + i >= numSegments || !segmentInfos.info(first+i).equals(info)) {
+ if (segmentInfos.indexOf(info) == -1)
+ throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.name + ") that is not in the index");
+ else
+ throw new MergePolicy.MergeException("MergePolicy selected non-contiguous segments to merge (" + merge + " vs " + segString() + "), which IndexWriter (currently) cannot handle");
}
+ }
- minSegment++;
- maxSegment++;
- int numSegments = maxSegment - minSegment;
+ return first;
+ }
- if (numSegments < mergeFactor) {
- break;
- } else {
- boolean exceedsUpperLimit = false;
+ /* FIXME if we want to support non-contiguous segment merges */
+ synchronized private boolean commitMerge(MergePolicy.OneMerge merge) throws IOException {
- // number of merge-worthy segments may exceed mergeFactor when
- // mergeFactor and/or maxBufferedDocs change(s)
- while (numSegments >= mergeFactor) {
- // merge the leftmost* mergeFactor segments
+ assert merge.registerDone;
- int docCount = mergeSegments(minSegment, minSegment + mergeFactor);
- numSegments -= mergeFactor;
+ // If merge was explicitly aborted, or, if abort() or
+ // rollbackTransaction() had been called since our merge
+ // started (which results in an unqualified
+ // deleter.refresh() call that will remove any index
+ // file that current segments does not reference), we
+ // abort this merge
+ if (merge.isAborted()) {
- if (docCount > upperBound) {
- // continue to merge the rest of the worthy segments on this level
- minSegment++;
- exceedsUpperLimit = true;
- } else {
- // if the merged segment does not exceed upperBound, consider
- // this segment for further merges on this same level
- numSegments++;
+ if (infoStream != null) {
+ if (merge.isAborted())
+ message("commitMerge: skipping merge " + merge.segString(directory) + ": it was aborted");
+ }
+
+ assert merge.increfDone;
+ decrefMergeSegments(merge);
+ deleter.refresh(merge.info.name);
+ return false;
+ }
+
+ boolean success = false;
+
+ int start;
+
+ try {
+ SegmentInfos sourceSegmentsClone = merge.segmentsClone;
+ SegmentInfos sourceSegments = merge.segments;
+ final int numSegments = segmentInfos.size();
+
+ start = ensureContiguousMerge(merge);
+ if (infoStream != null)
+ message("commitMerge " + merge.segString(directory));
+
+ // Carefully merge deletes that occurred after we
+ // started merging:
+
+ BitVector deletes = null;
+ int docUpto = 0;
+
+ final int numSegmentsToMerge = sourceSegments.size();
+ for(int i=0;i minSegment; i--) // remove old infos & add new
- segmentInfos.remove(i);
+ /** Does fininishing for a merge, which is fast but holds
+ * the synchronized lock on IndexWriter instance. */
+ final synchronized void mergeFinish(MergePolicy.OneMerge merge) throws IOException {
- segmentInfos.set(minSegment, newSegment);
+ if (merge.increfDone)
+ decrefMergeSegments(merge);
- checkpoint();
+ assert merge.registerDone;
- success = true;
+ final SegmentInfos sourceSegments = merge.segments;
+ final SegmentInfos sourceSegmentsClone = merge.segmentsClone;
+ final int end = sourceSegments.size();
+ for(int i=0;i 0) {
if (infoStream != null)
- infoStream.println("flush " + docWriter.getNumBufferedDeleteTerms() + " buffered deleted terms on "
- + segmentInfos.size() + " segments.");
+ message("flush " + docWriter.getNumBufferedDeleteTerms() + " buffered deleted terms on "
+ + segmentInfos.size() + " segments.");
if (flushedNewSegment) {
IndexReader reader = null;
@@ -2341,29 +3081,6 @@
return delCount;
}
- private final boolean checkNonDecreasingLevels(int start) {
- int lowerBound = -1;
- int upperBound = docWriter.getMaxBufferedDocs();
-
- /* new merge policy
- if (upperBound == 0)
- upperBound = 10;
- */
-
- for (int i = segmentInfos.size() - 1; i >= start; i--) {
- int docCount = segmentInfos.info(i).docCount;
- if (docCount <= lowerBound) {
- return false;
- }
-
- while (docCount > upperBound) {
- lowerBound = upperBound;
- upperBound *= mergeFactor;
- }
- }
- return true;
- }
-
// For test purposes.
final synchronized int getBufferedDeleteTermsSize() {
return docWriter.getBufferedDeleteTerms().size();
@@ -2417,13 +3134,18 @@
return delCount;
}
+ // utility routines for tests
+ SegmentInfo newestSegment() {
+ return segmentInfos.info(segmentInfos.size()-1);
+ }
+
public synchronized String segString() {
StringBuffer buffer = new StringBuffer();
for(int i = 0; i < segmentInfos.size(); i++) {
if (i > 0) {
buffer.append(' ');
}
- buffer.append(segmentInfos.info(i).name + ":" + segmentInfos.info(i).docCount);
+ buffer.append(segmentInfos.info(i).segString(directory));
}
return buffer.toString();
Index: src/java/org/apache/lucene/index/IndexFileDeleter.java
===================================================================
--- src/java/org/apache/lucene/index/IndexFileDeleter.java (revision 575729)
+++ src/java/org/apache/lucene/index/IndexFileDeleter.java (working copy)
@@ -105,7 +105,7 @@
}
private void message(String message) {
- infoStream.println(this + " " + Thread.currentThread().getName() + ": " + message);
+ infoStream.println("Deleter [" + Thread.currentThread().getName() + "]: " + message);
}
/**
@@ -275,25 +275,59 @@
* Writer calls this when it has hit an error and had to
* roll back, to tell us that there may now be
* unreferenced files in the filesystem. So we re-list
- * the filesystem and delete such files:
+ * the filesystem and delete such files. If segmentName
+ * is non-null, we will only delete files corresponding to
+ * that segment.
*/
- public void refresh() throws IOException {
+ public void refresh(String segmentName) throws IOException {
String[] files = directory.list();
if (files == null)
throw new IOException("cannot read directory " + directory + ": list() returned null");
IndexFileNameFilter filter = IndexFileNameFilter.getFilter();
+ String segmentPrefix1;
+ String segmentPrefix2;
+ if (segmentName != null) {
+ segmentPrefix1 = segmentName + ".";
+ segmentPrefix2 = segmentName + "_";
+ } else {
+ segmentPrefix1 = null;
+ segmentPrefix2 = null;
+ }
+
for(int i=0;i 0;
return --count;
}
}
Index: src/java/org/apache/lucene/index/MergeScheduler.java
===================================================================
--- src/java/org/apache/lucene/index/MergeScheduler.java (revision 0)
+++ src/java/org/apache/lucene/index/MergeScheduler.java (revision 0)
@@ -0,0 +1,36 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+/** Expert: {@link IndexWriter} uses an instance
+ * implementing this interface to execute the merges
+ * selected by a {@link MergePolicy}. The default
+ * MergeScheduler is {@link SerialMergeScheduler}. */
+
+public interface MergeScheduler {
+
+ /** Run the merges provided by {@link IndexWriter#getNextMerge()}. */
+ void merge(IndexWriter writer)
+ throws CorruptIndexException, IOException;
+
+ /** Close this MergeScheduler. */
+ void close()
+ throws CorruptIndexException, IOException;
+}
Property changes on: src/java/org/apache/lucene/index/MergeScheduler.java
___________________________________________________________________
Name: svn:eol-style
+ native